--- /dev/null
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+
+import org.simantics.acorn.ClusterManager;
+import org.simantics.acorn.Persistable;
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.compressions.CompressionCodec;
+import org.simantics.compressions.Compressions;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.service.Bytes;
+import org.simantics.utils.datastructures.Pair;
+
+import gnu.trove.list.array.TByteArrayList;
+
+public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
+
+ // 500KB is a fine chunk
+ private static int MAX_CHUNK_SIZE = 500*1024;
+
+ int size = 0;
+ final private ClusterManager manager;
+ private boolean committed = false;
+
+ public int nextToProcess = 0;
+
+ public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
+
+ // Stub
+ public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) {
+ super(LRU, id, readDir, "clusterStream", offset, length, false, false);
+ this.manager = manager;
+ LRU.map(this);
+ }
+
+ // Creation
+ public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) {
+ super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
+ this.manager = manager;
+ LRU.insert(this, accessTime);
+ }
+
+ public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
+
+ if(VERIFY) verifyAccess();
+
+ makeResident(true);
+
+ ClusterUpdateOperation op = operations.get(chunkOffset);
+ if(op == null) throw new IllegalStateException("Cluster Update Operation " + ccsId + " was not found.");
+ if(op.ccs == null) throw new IllegalStateException("Cluster ChangeSet " + ccsId + " was not found.");
+
+ UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
+ if(proc.version != ClusterChange.VERSION)
+ return null;
+
+ // This cluster and CCS can still be under preparation => wait
+ clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
+
+ proc.process();
+
+ cancelForceResident();
+
+ return proc;
+
+ }
+
+ public void addOperation(ClusterUpdateOperation op) {
+ if(committed)
+ throw new IllegalStateException();
+ operations.add(op);
+ size += op.data.length;
+// if(isCommitted()) {
+// LRU.refresh(this);
+// }
+ }
+
+ public byte[] getOperation(int index) {
+ return operations.get(index).data;
+ }
+
+ public void commit() {
+ committed = true;
+ }
+
+ public boolean isCommitted() {
+ if(size > MAX_CHUNK_SIZE) committed = true;
+ return committed;
+ }
+
+ @Override
+ public boolean canBePersisted() {
+ if(!super.canBePersisted()) return false;
+ if(!isCommitted()) return false;
+ for(ClusterUpdateOperation op : operations) {
+ if(!op.finished) return false;
+ }
+ return true;
+ }
+
+ private static void writeLE(TByteArrayList bytes, int value) {
+
+ bytes.add( (byte) (value & 0xFF));
+ bytes.add((byte) ((value >>> 8) & 0xFF));
+ bytes.add((byte) ((value >>> 16) & 0xFF));
+ bytes.add((byte) ((value >>> 24) & 0xFF));
+
+ }
+
+ final public static void writeLE8(TByteArrayList bytes, long value) {
+
+ bytes.add( (byte) (value & 0xFF));
+ bytes.add((byte) ((value >>> 8) & 0xFF));
+ bytes.add((byte) ((value >>> 16) & 0xFF));
+ bytes.add((byte) ((value >>> 24) & 0xFF));
+ bytes.add((byte) ((value >>> 32) & 0xFF));
+ bytes.add((byte) ((value >>> 40) & 0xFF));
+ bytes.add((byte) ((value >>> 48) & 0xFF));
+ bytes.add((byte) ((value >>> 56) & 0xFF));
+
+ }
+
+ @Override
+ protected Pair<byte[], Integer> toBytes() {
+
+ assert(isCommitted());
+
+ TByteArrayList raw = new TByteArrayList();
+
+ writeLE(raw, operations.size());
+
+ for(ClusterUpdateOperation op : operations) {
+
+ writeLE(raw, op.data.length);
+ raw.add(op.data);
+ op.data = null;
+
+ writeLE(raw, op.ccs.statementMask.size());
+ raw.add(op.ccs.statementMask.toArray());
+ writeLE(raw, op.ccs.oldValueEx.size());
+ raw.add(op.ccs.oldValueEx.toArray());
+ writeLE(raw, op.ccs.oldValues.size());
+
+ for(byte[] oldValue : op.ccs.oldValues) {
+ int len = (oldValue != null ? oldValue.length : -1);
+ writeLE(raw, len);
+ if(oldValue != null) {
+ raw.add(oldValue);
+ }
+ }
+
+ }
+
+ byte[] raw_ = raw.toArray();
+ CompressionCodec codec = Compressions.get(Compressions.LZ4);
+ ByteBuffer input = ByteBuffer.wrap(raw_);
+ ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
+ int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
+
+ // We append inflated size - cannot prepend since decompression cannot handle offsets in input
+ final byte[] rawOutput = new byte[compressedSize+4];
+ output.get(rawOutput,0,compressedSize);
+ Bytes.writeLE(rawOutput, compressedSize, raw_.length);
+
+ release();
+
+ return Pair.make(rawOutput, rawOutput.length);
+
+ }
+
+ @Override
+ void release() {
+
+ for(ClusterUpdateOperation op : operations) {
+ op.data = null;
+ op.ccs = null;
+ }
+
+ }
+
+ static class StreamDecompressor {
+
+// byte[] decompressBuffer = new byte[1024*1024];
+
+ public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
+
+ int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
+
+ byte[] result = new byte[deflatedSize];
+
+// if(decompressBuffer.length < deflatedSize)
+// decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
+
+ CompressionCodec codec = Compressions.get(Compressions.LZ4);
+
+ ByteBuffer input = ByteBuffer.wrap(compressed);
+ ByteBuffer output = ByteBuffer.wrap(result);
+
+ int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
+ assert(decompressedSize == deflatedSize);
+
+ return result;
+
+ }
+
+
+ }
+
+ private static StreamDecompressor decompressor = new StreamDecompressor();
+
+ @Override
+ public void fromFile(byte[] data_) {
+
+ try {
+
+ byte[] data = decompressor.decompressBuffer(data_);
+
+ operations = new ArrayList<ClusterUpdateOperation>();
+
+ int offset = 0;
+ int opLen = Bytes.readLE4(data, offset);
+ offset += 4;
+
+ for(int i=0;i<opLen;i++) {
+
+ int len = Bytes.readLE4(data, offset);
+ offset += 4;
+ byte[] bytes = new byte[len];
+ System.arraycopy(data, offset, bytes, 0, len);
+ offset += len;
+
+ ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
+
+ String ccsKey = getKey() + "." + i;
+
+ op.ccs = new ClusterChangeSet(ccsKey, op.uid);
+ op.chunk = this;
+
+ int statementMaskLen = Bytes.readLE4(data, offset);
+ offset += 4;
+ op.ccs.statementMask = new TByteArrayList(statementMaskLen);
+ for(int j=0;j<statementMaskLen;j++)
+ op.ccs.statementMask.add(data[offset++]);
+
+ int oldValueExLen = Bytes.readLE4(data, offset);
+ offset += 4;
+ op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
+ for(int j=0;j<oldValueExLen;j++)
+ op.ccs.oldValueEx.add(data[offset++]);
+
+ int oldValuesSize = Bytes.readLE4(data, offset);
+ offset += 4;
+
+ op.ccs.oldValues = new ArrayList<>(oldValuesSize);
+ for(int j=0;j<oldValuesSize;j++) {
+
+ int oldValueSize = Bytes.readLE4(data, offset);
+ offset += 4;
+
+ if(oldValueSize == -1) {
+ op.ccs.oldValues.add(null);
+ } else {
+ byte[] oldValue = new byte[oldValueSize];
+ System.arraycopy(data, offset, oldValue, 0, oldValueSize);
+ offset += oldValueSize;
+ op.ccs.oldValues.add(oldValue);
+ }
+
+ }
+
+ operations.add(op);
+
+ }
+
+ } catch (IOException e) {
+
+ throw new IllegalStateException(e);
+
+ }
+
+ }
+
+ @Override
+ String getExtension() {
+ return "stream";
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterUpdateOperationChunk " + getKey();
+ }
+
+ @Override
+ protected boolean overwrite() {
+ return false;
+ }
+
+}
\ No newline at end of file