]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterStreamChunk.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java
new file mode 100644 (file)
index 0000000..23cbfb1
--- /dev/null
@@ -0,0 +1,302 @@
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+
+import org.simantics.acorn.ClusterManager;
+import org.simantics.acorn.Persistable;
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.compressions.CompressionCodec;
+import org.simantics.compressions.Compressions;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.service.Bytes;
+import org.simantics.utils.datastructures.Pair;
+
+import gnu.trove.list.array.TByteArrayList;
+
+public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
+
+       // 500KB is a fine chunk
+       private static int MAX_CHUNK_SIZE = 500*1024;
+
+       int size = 0;
+       final private ClusterManager manager;
+       private boolean committed = false;
+       
+       public int nextToProcess = 0;
+       
+       public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
+       
+       // Stub
+       public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) {
+               super(LRU, id, readDir, "clusterStream", offset, length, false, false);
+               this.manager = manager;
+               LRU.map(this);
+       }
+       
+       // Creation
+       public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) {
+               super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
+               this.manager = manager;
+               LRU.insert(this, accessTime);
+       }
+       
+       public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
+               
+               if(VERIFY) verifyAccess();
+               
+               makeResident(true);
+
+               ClusterUpdateOperation op = operations.get(chunkOffset);
+               if(op == null) throw new IllegalStateException("Cluster Update Operation " + ccsId + " was not found.");
+               if(op.ccs == null) throw new IllegalStateException("Cluster ChangeSet " + ccsId + " was not found.");
+
+               UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
+               if(proc.version != ClusterChange.VERSION)
+                       return null;
+
+               // This cluster and CCS can still be under preparation => wait
+               clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
+
+               proc.process();
+
+               cancelForceResident();
+               
+               return proc;
+               
+       }
+       
+       public void addOperation(ClusterUpdateOperation op) {
+               if(committed)
+                       throw new IllegalStateException();
+               operations.add(op);
+               size += op.data.length;
+//             if(isCommitted()) {
+//                     LRU.refresh(this);
+//             }
+       }
+       
+       public byte[] getOperation(int index) {
+               return operations.get(index).data;
+       }
+       
+       public void commit() {
+               committed = true;
+       }
+       
+       public boolean isCommitted() {
+               if(size > MAX_CHUNK_SIZE) committed = true;
+               return committed;
+       }
+       
+       @Override
+       public boolean canBePersisted() {
+               if(!super.canBePersisted()) return false;
+               if(!isCommitted()) return false;
+               for(ClusterUpdateOperation op : operations) {
+                       if(!op.finished) return false;
+               }
+               return true;
+       }
+
+       private static void writeLE(TByteArrayList bytes, int value) {
+               
+               bytes.add( (byte) (value & 0xFF));
+               bytes.add((byte) ((value >>> 8) & 0xFF));
+               bytes.add((byte) ((value >>> 16) & 0xFF));
+               bytes.add((byte) ((value >>> 24) & 0xFF));
+
+       }
+
+       final public static void writeLE8(TByteArrayList bytes, long value) {
+               
+               bytes.add( (byte) (value & 0xFF));
+               bytes.add((byte) ((value >>> 8) & 0xFF));
+               bytes.add((byte) ((value >>> 16) & 0xFF));
+               bytes.add((byte) ((value >>> 24) & 0xFF));
+               bytes.add((byte) ((value >>> 32) & 0xFF));
+               bytes.add((byte) ((value >>> 40) & 0xFF));
+               bytes.add((byte) ((value >>> 48) & 0xFF));
+               bytes.add((byte) ((value >>> 56) & 0xFF));
+               
+       }
+       
+       @Override
+       protected Pair<byte[], Integer> toBytes() {
+               
+               assert(isCommitted());
+               
+               TByteArrayList raw = new TByteArrayList();
+               
+               writeLE(raw, operations.size());
+               
+               for(ClusterUpdateOperation op : operations) {
+                       
+                       writeLE(raw, op.data.length);
+                       raw.add(op.data);
+                       op.data = null;
+                       
+                       writeLE(raw, op.ccs.statementMask.size());
+                       raw.add(op.ccs.statementMask.toArray());
+                       writeLE(raw, op.ccs.oldValueEx.size());
+                       raw.add(op.ccs.oldValueEx.toArray());
+                       writeLE(raw, op.ccs.oldValues.size());
+                       
+                       for(byte[] oldValue : op.ccs.oldValues) {
+                               int len = (oldValue != null ? oldValue.length : -1);
+                               writeLE(raw, len);
+                               if(oldValue != null) {
+                                       raw.add(oldValue);
+                               }
+                       }
+                       
+               }
+               
+               byte[] raw_ = raw.toArray();
+               CompressionCodec codec = Compressions.get(Compressions.LZ4);
+               ByteBuffer input = ByteBuffer.wrap(raw_);
+               ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
+               int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
+
+               // We append inflated size - cannot prepend since decompression cannot handle offsets in input
+               final byte[] rawOutput = new byte[compressedSize+4];
+               output.get(rawOutput,0,compressedSize);
+               Bytes.writeLE(rawOutput, compressedSize, raw_.length);
+
+               release();
+               
+               return Pair.make(rawOutput, rawOutput.length);
+               
+       }
+       
+       @Override
+       void release() {
+               
+               for(ClusterUpdateOperation op : operations) {
+                       op.data = null;
+                       op.ccs = null;
+               }
+
+       }
+
+       static class StreamDecompressor {
+               
+//             byte[] decompressBuffer = new byte[1024*1024];
+
+               public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
+                       
+                       int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
+                       
+                       byte[] result = new byte[deflatedSize];
+                       
+//                     if(decompressBuffer.length < deflatedSize)
+//                             decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
+                       
+                       CompressionCodec codec = Compressions.get(Compressions.LZ4);
+                       
+                       ByteBuffer input = ByteBuffer.wrap(compressed);
+                       ByteBuffer output = ByteBuffer.wrap(result);
+
+                       int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
+                       assert(decompressedSize  == deflatedSize);
+                       
+                       return result;
+                       
+               }
+
+               
+       }
+       
+       private static StreamDecompressor decompressor = new StreamDecompressor();
+       
+       @Override
+       public void fromFile(byte[] data_) {
+
+               try {
+                       
+                       byte[] data = decompressor.decompressBuffer(data_);
+                       
+                       operations = new ArrayList<ClusterUpdateOperation>();
+                       
+                       int offset = 0;
+                       int opLen = Bytes.readLE4(data, offset);
+                       offset += 4;
+                       
+                       for(int i=0;i<opLen;i++) {
+                               
+                               int len = Bytes.readLE4(data, offset);
+                               offset += 4;
+                               byte[] bytes  = new byte[len];
+                               System.arraycopy(data, offset, bytes, 0, len);
+                               offset += len;
+                               
+                               ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
+                               
+                               String ccsKey = getKey() + "." + i;
+                               
+                               op.ccs = new ClusterChangeSet(ccsKey, op.uid);
+                               op.chunk = this;
+                               
+                               int statementMaskLen = Bytes.readLE4(data, offset);
+                               offset += 4;
+                               op.ccs.statementMask = new TByteArrayList(statementMaskLen);
+                               for(int j=0;j<statementMaskLen;j++)
+                                       op.ccs.statementMask.add(data[offset++]);
+                               
+                               int oldValueExLen = Bytes.readLE4(data, offset);
+                               offset += 4;
+                               op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
+                               for(int j=0;j<oldValueExLen;j++)
+                                       op.ccs.oldValueEx.add(data[offset++]);
+                               
+                               int oldValuesSize = Bytes.readLE4(data, offset);
+                               offset += 4;
+                               
+                               op.ccs.oldValues = new ArrayList<>(oldValuesSize);
+                               for(int j=0;j<oldValuesSize;j++) {
+
+                                       int oldValueSize = Bytes.readLE4(data, offset);
+                                       offset += 4;
+
+                                       if(oldValueSize == -1) {
+                                               op.ccs.oldValues.add(null);
+                                       } else {
+                                               byte[] oldValue = new byte[oldValueSize];
+                                               System.arraycopy(data, offset, oldValue, 0, oldValueSize);
+                                               offset += oldValueSize;
+                                               op.ccs.oldValues.add(oldValue);
+                                       }
+
+                               }
+                               
+                               operations.add(op);
+                               
+                       }
+                       
+               } catch (IOException e) {
+                       
+                       throw new IllegalStateException(e);
+                       
+               }
+               
+       }
+
+       @Override
+       String getExtension() {
+               return "stream";
+       }
+       
+       @Override
+       public String toString() {
+               return  "ClusterUpdateOperationChunk " + getKey();
+       }
+       
+       @Override
+       protected boolean overwrite() {
+               return false;
+       }
+       
+}
\ No newline at end of file