]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterInfo.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterInfo.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterInfo.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterInfo.java
new file mode 100644 (file)
index 0000000..1cd5822
--- /dev/null
@@ -0,0 +1,346 @@
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+import org.simantics.acorn.ClusterManager;
+import org.simantics.acorn.Persistable;
+import org.simantics.acorn.cluster.ClusterImpl;
+import org.simantics.acorn.cluster.ClusterSmall;
+import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.compressions.CompressionCodec;
+import org.simantics.compressions.Compressions;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.service.Bytes;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Pair;
+
+public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
+       
+       final private ClusterManager manager;
+       private ClusterImpl cluster;
+       public int changeSetId;
+       private ClusterUpdateState updateState;
+
+       public static final String COMPRESSION = "LZ4";
+       
+       // Stub
+       public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) {
+               super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
+               this.manager = manager;
+               this.cluster = null;
+               LRU.map(this);
+       }
+       
+       // New
+       public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) {
+               super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
+               this.manager = manager;
+               this.cluster = cluster;
+               LRU.insert(this, accessTime);
+               LRU.swap(getKey());
+       }
+       
+       public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+               
+               // Updates have been ensured at this point
+                       
+               acquireMutex();
+
+               try {
+                       if(isResident()) {
+                               ClusterTables tables = cluster.store();
+                               return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
+                       }
+               } catch (IOException e) {
+                       throw new DatabaseException(e);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+
+               // Ensure pending updates here - this may take some time
+               LRU.waitPending(this, false);
+
+               acquireMutex();
+               try {
+
+                       if(isResident()) {
+                               ClusterTables tables = cluster.store();
+                               return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
+                       } else {
+                               byte[] data = readFile();
+                               ClusterTables tables = new ClusterTables();
+                               loadCluster(getKey(), manager.support, data, tables);
+                               return creator.create(uid, tables.bytes, tables.ints, tables.longs);
+                       }
+
+               } catch (IOException e) {
+                       throw new DatabaseException(e);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+               
+       }
+       
+       static class ClusterDecompressor {
+               
+               byte[] decompressBuffer = new byte[1024*1024];
+
+               public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
+                       
+                       int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
+                       
+                       if(decompressBuffer.length < deflatedSize)
+                               decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
+                       
+                       CompressionCodec codec = Compressions.get(Compressions.LZ4);
+                       
+                       ByteBuffer input = ByteBuffer.wrap(compressed);
+                       ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
+
+                       int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
+                       assert(decompressedSize <= decompressBuffer.length);
+                       
+                       int byteLength = Bytes.readLE4(decompressBuffer, 0);
+                       int intLength = Bytes.readLE4(decompressBuffer, 4);
+                       int longLength = Bytes.readLE4(decompressBuffer, 8);
+                       
+                       byte[] bytes = new byte[byteLength];
+                       int[] ints = new int[intLength];
+                       long[] longs = new long[longLength];
+                       
+                       System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
+                       
+                       int offset = 12+byteLength;
+                       for(int i=0;i<intLength;i++,offset+=4)
+                               ints[i] = Bytes.readLE4(decompressBuffer, offset);
+                       for(int i=0;i<longLength;i++,offset+=8)
+                               longs[i] = Bytes.readLE8(decompressBuffer, offset);
+                       
+                       ClusterTables result = new ClusterTables();
+                       result.bytes = bytes;
+                       result.ints = ints;
+                       result.longs = longs;
+                       return result;
+                       
+               }
+
+               
+       }
+       
+       private static ClusterDecompressor decompressor = new ClusterDecompressor();
+       
+       public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
+
+               ClusterTables ts = decompressor.readCluster(uid, data);
+               tables.bytes = ts.bytes;
+               tables.ints = ts.ints;
+               tables.longs = ts.longs;
+
+       }
+       
+       public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
+               
+               ClusterTables tables = decompressor.readCluster(uid, data);
+               try {
+                       return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
+               } catch (DatabaseException e) {
+                       e.printStackTrace();
+                       return null;
+               }
+               
+       }
+       
+       @Override
+       public void fromFile(byte[] data) {
+               
+               try {
+                       cluster = loadCluster(getKey(), manager.support, data);
+               } catch (IOException e) {
+                       e.printStackTrace();
+               }
+               
+       }
+
+       @Override
+       protected Pair<byte[],Integer> toBytes() {
+               
+               try {
+
+                       byte[] raw = null;
+                       
+                       if(cluster instanceof ClusterSmall) {
+                               raw = cluster.storeBytes();
+                       } else {
+                       
+                               ClusterTables tables = cluster.store();
+                               
+                               raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)]; 
+       
+                               Bytes.writeLE(raw, 0, tables.bytes.length);
+                               Bytes.writeLE(raw, 4, tables.ints.length);
+                               Bytes.writeLE(raw, 8, tables.longs.length);
+       
+                               System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
+                               int offset = 12+tables.bytes.length;
+                               for(int i=0;i<tables.ints.length;i++,offset+=4)
+                                       Bytes.writeLE(raw, offset, tables.ints[i]);
+                               for(int i=0;i<tables.longs.length;i++,offset+=8)
+                                       Bytes.writeLE8(raw, offset, tables.longs[i]);
+                               
+                       }
+
+                       byte[] result = new byte[raw.length + raw.length/8];
+                       
+                       CompressionCodec codec = Compressions.get(Compressions.LZ4);
+                       ByteBuffer input = ByteBuffer.wrap(raw);
+                       ByteBuffer output = ByteBuffer.wrap(result);
+                       //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
+                       int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
+
+                       // We append inflated size - cannot prepend since decompression cannot handle offsets in input
+//                     final byte[] rawOutput = new byte[compressedSize+4];
+//                     output.get(rawOutput,0,compressedSize);
+//                     Bytes.writeLE(rawOutput, compressedSize, raw.length);
+//                     return Pair.make(rawOutput, rawOutput.length);
+
+                       Bytes.writeLE(result, compressedSize, raw.length);
+                       return Pair.make(result, compressedSize+4);
+
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       release();
+               }
+               
+       }
+       
+       @Override
+       void release() {
+               cluster = null;
+       }
+
+
+       @Override
+       String getExtension() {
+               return "cluster";
+       }
+       
+       public void scheduleUpdate() {
+
+               if(VERIFY) verifyAccess();
+
+               if(updateState == null)
+                       updateState = new ClusterUpdateState(getKey());
+               updateState.incRef();
+
+       }
+       
+       public ClusterImpl getForUpdate() {
+
+               try {
+                       
+                       acquireMutex();
+
+                       assert(updateState != null);
+                       
+                       makeResident();
+                       setDirty(true);
+                       updateState.beginUpdate();
+                       return cluster;
+                       
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       
+                       releaseMutex();
+                       
+               }
+
+       }
+       
+       public void update(ClusterImpl clu) {
+               
+               if(VERIFY) verifyAccess();
+               
+               makeResident();
+               
+               cluster = clu;
+               updateState.endUpdate();
+               updateState = updateState.decRef();
+
+       }
+       
+       public ClusterImpl getCluster() {
+
+               if(VERIFY) verifyAccess();
+
+               makeResident();
+               
+               return cluster;
+               
+       }
+       
+       @Override
+       public boolean canBePersisted() {
+               
+               if(VERIFY) verifyAccess();
+
+               ClusterUpdateState state = getUpdateState();
+               if(state != null) return false;
+               
+               if(!super.canBePersisted()) return false;
+               if(updateState == null) return true;
+               else return !updateState.inUpdate;
+               
+       }
+
+       private ClusterUpdateState getUpdateState() {
+               
+               if(VERIFY) verifyAccess();
+               
+               return updateState;
+               
+       }
+
+       private ClusterUpdateState getUpdateStateWithoutMutex() {
+               
+               try {
+                       acquireMutex();
+                       return getUpdateState();
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+               
+       }
+
+       /*
+        * This method blocks - no locks here
+        */
+       public void waitForUpdates() {
+               
+               ClusterUpdateState state = getUpdateStateWithoutMutex();
+               if(state != null) {
+                       long start = System.nanoTime();
+                       state.waitForUpdates();
+                       long duration = System.nanoTime() - start;
+                       System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
+               }
+               
+       }
+       
+       @Override
+       protected boolean overwrite() {
+               return true;
+       }
+       
+}
\ No newline at end of file