--- /dev/null
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+import org.simantics.acorn.ClusterManager;
+import org.simantics.acorn.Persistable;
+import org.simantics.acorn.cluster.ClusterImpl;
+import org.simantics.acorn.cluster.ClusterSmall;
+import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.compressions.CompressionCodec;
+import org.simantics.compressions.Compressions;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.service.Bytes;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Pair;
+
+public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
+
+ final private ClusterManager manager;
+ private ClusterImpl cluster;
+ public int changeSetId;
+ private ClusterUpdateState updateState;
+
+ public static final String COMPRESSION = "LZ4";
+
+ // Stub
+ public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) {
+ super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
+ this.manager = manager;
+ this.cluster = null;
+ LRU.map(this);
+ }
+
+ // New
+ public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) {
+ super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
+ this.manager = manager;
+ this.cluster = cluster;
+ LRU.insert(this, accessTime);
+ LRU.swap(getKey());
+ }
+
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+
+ // Updates have been ensured at this point
+
+ acquireMutex();
+
+ try {
+ if(isResident()) {
+ ClusterTables tables = cluster.store();
+ return creator.create(uid, tables.bytes, tables.ints, tables.longs);
+ }
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ // Ensure pending updates here - this may take some time
+ LRU.waitPending(this, false);
+
+ acquireMutex();
+ try {
+
+ if(isResident()) {
+ ClusterTables tables = cluster.store();
+ return creator.create(uid, tables.bytes, tables.ints, tables.longs);
+ } else {
+ byte[] data = readFile();
+ ClusterTables tables = new ClusterTables();
+ loadCluster(getKey(), manager.support, data, tables);
+ return creator.create(uid, tables.bytes, tables.ints, tables.longs);
+ }
+
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ }
+
+ static class ClusterDecompressor {
+
+ byte[] decompressBuffer = new byte[1024*1024];
+
+ public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
+
+ int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
+
+ if(decompressBuffer.length < deflatedSize)
+ decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
+
+ CompressionCodec codec = Compressions.get(Compressions.LZ4);
+
+ ByteBuffer input = ByteBuffer.wrap(compressed);
+ ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
+
+ int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
+ assert(decompressedSize <= decompressBuffer.length);
+
+ int byteLength = Bytes.readLE4(decompressBuffer, 0);
+ int intLength = Bytes.readLE4(decompressBuffer, 4);
+ int longLength = Bytes.readLE4(decompressBuffer, 8);
+
+ byte[] bytes = new byte[byteLength];
+ int[] ints = new int[intLength];
+ long[] longs = new long[longLength];
+
+ System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
+
+ int offset = 12+byteLength;
+ for(int i=0;i<intLength;i++,offset+=4)
+ ints[i] = Bytes.readLE4(decompressBuffer, offset);
+ for(int i=0;i<longLength;i++,offset+=8)
+ longs[i] = Bytes.readLE8(decompressBuffer, offset);
+
+ ClusterTables result = new ClusterTables();
+ result.bytes = bytes;
+ result.ints = ints;
+ result.longs = longs;
+ return result;
+
+ }
+
+
+ }
+
+ private static ClusterDecompressor decompressor = new ClusterDecompressor();
+
+ public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
+
+ ClusterTables ts = decompressor.readCluster(uid, data);
+ tables.bytes = ts.bytes;
+ tables.ints = ts.ints;
+ tables.longs = ts.longs;
+
+ }
+
+ public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
+
+ ClusterTables tables = decompressor.readCluster(uid, data);
+ try {
+ return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ }
+
+ @Override
+ public void fromFile(byte[] data) {
+
+ try {
+ cluster = loadCluster(getKey(), manager.support, data);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ protected Pair<byte[],Integer> toBytes() {
+
+ try {
+
+ byte[] raw = null;
+
+ if(cluster instanceof ClusterSmall) {
+ raw = cluster.storeBytes();
+ } else {
+
+ ClusterTables tables = cluster.store();
+
+ raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)];
+
+ Bytes.writeLE(raw, 0, tables.bytes.length);
+ Bytes.writeLE(raw, 4, tables.ints.length);
+ Bytes.writeLE(raw, 8, tables.longs.length);
+
+ System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
+ int offset = 12+tables.bytes.length;
+ for(int i=0;i<tables.ints.length;i++,offset+=4)
+ Bytes.writeLE(raw, offset, tables.ints[i]);
+ for(int i=0;i<tables.longs.length;i++,offset+=8)
+ Bytes.writeLE8(raw, offset, tables.longs[i]);
+
+ }
+
+ byte[] result = new byte[raw.length + raw.length/8];
+
+ CompressionCodec codec = Compressions.get(Compressions.LZ4);
+ ByteBuffer input = ByteBuffer.wrap(raw);
+ ByteBuffer output = ByteBuffer.wrap(result);
+ //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
+ int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
+
+ // We append inflated size - cannot prepend since decompression cannot handle offsets in input
+// final byte[] rawOutput = new byte[compressedSize+4];
+// output.get(rawOutput,0,compressedSize);
+// Bytes.writeLE(rawOutput, compressedSize, raw.length);
+// return Pair.make(rawOutput, rawOutput.length);
+
+ Bytes.writeLE(result, compressedSize, raw.length);
+ return Pair.make(result, compressedSize+4);
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ release();
+ }
+
+ }
+
+ @Override
+ void release() {
+ cluster = null;
+ }
+
+
+ @Override
+ String getExtension() {
+ return "cluster";
+ }
+
+ public void scheduleUpdate() {
+
+ if(VERIFY) verifyAccess();
+
+ if(updateState == null)
+ updateState = new ClusterUpdateState(getKey());
+ updateState.incRef();
+
+ }
+
+ public ClusterImpl getForUpdate() {
+
+ try {
+
+ acquireMutex();
+
+ assert(updateState != null);
+
+ makeResident();
+ setDirty(true);
+ updateState.beginUpdate();
+ return cluster;
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ releaseMutex();
+
+ }
+
+ }
+
+ public void update(ClusterImpl clu) {
+
+ if(VERIFY) verifyAccess();
+
+ makeResident();
+
+ cluster = clu;
+ updateState.endUpdate();
+ updateState = updateState.decRef();
+
+ }
+
+ public ClusterImpl getCluster() {
+
+ if(VERIFY) verifyAccess();
+
+ makeResident();
+
+ return cluster;
+
+ }
+
+ @Override
+ public boolean canBePersisted() {
+
+ if(VERIFY) verifyAccess();
+
+ ClusterUpdateState state = getUpdateState();
+ if(state != null) return false;
+
+ if(!super.canBePersisted()) return false;
+ if(updateState == null) return true;
+ else return !updateState.inUpdate;
+
+ }
+
+ private ClusterUpdateState getUpdateState() {
+
+ if(VERIFY) verifyAccess();
+
+ return updateState;
+
+ }
+
+ private ClusterUpdateState getUpdateStateWithoutMutex() {
+
+ try {
+ acquireMutex();
+ return getUpdateState();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ }
+
+ /*
+ * This method blocks - no locks here
+ */
+ public void waitForUpdates() {
+
+ ClusterUpdateState state = getUpdateStateWithoutMutex();
+ if(state != null) {
+ long start = System.nanoTime();
+ state.waitForUpdates();
+ long duration = System.nanoTime() - start;
+ System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
+ }
+
+ }
+
+ @Override
+ protected boolean overwrite() {
+ return true;
+ }
+
+}
\ No newline at end of file