1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
12 import org.simantics.acorn.internal.ClusterSupport2;
13 import org.simantics.compressions.CompressionCodec;
14 import org.simantics.compressions.Compressions;
15 import org.simantics.db.ClusterCreator;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.service.Bytes;
18 import org.simantics.db.service.ClusterUID;
19 import org.simantics.utils.datastructures.Pair;
21 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
23 final private ClusterManager manager;
24 private ClusterImpl cluster;
25 public int changeSetId;
26 private ClusterUpdateState updateState;
28 public static final String COMPRESSION = "LZ4";
31 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) {
32 super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
33 this.manager = manager;
39 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) {
40 super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
41 this.manager = manager;
42 this.cluster = cluster;
43 LRU.insert(this, accessTime);
47 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
49 // Updates have been ensured at this point
55 ClusterTables tables = cluster.store();
56 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
58 } catch (IOException e) {
59 throw new DatabaseException(e);
60 } catch (Throwable t) {
61 throw new IllegalStateException(t);
66 // Ensure pending updates here - this may take some time
67 LRU.waitPending(this, false);
73 ClusterTables tables = cluster.store();
74 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
76 byte[] data = readFile();
77 ClusterTables tables = new ClusterTables();
78 loadCluster(getKey(), manager.support, data, tables);
79 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
82 } catch (IOException e) {
83 throw new DatabaseException(e);
84 } catch (Throwable t) {
85 throw new IllegalStateException(t);
92 static class ClusterDecompressor {
94 byte[] decompressBuffer = new byte[1024*1024];
96 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
98 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
100 if(decompressBuffer.length < deflatedSize)
101 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
103 CompressionCodec codec = Compressions.get(Compressions.LZ4);
105 ByteBuffer input = ByteBuffer.wrap(compressed);
106 ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
108 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
109 assert(decompressedSize <= decompressBuffer.length);
111 int byteLength = Bytes.readLE4(decompressBuffer, 0);
112 int intLength = Bytes.readLE4(decompressBuffer, 4);
113 int longLength = Bytes.readLE4(decompressBuffer, 8);
115 byte[] bytes = new byte[byteLength];
116 int[] ints = new int[intLength];
117 long[] longs = new long[longLength];
119 System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
121 int offset = 12+byteLength;
122 for(int i=0;i<intLength;i++,offset+=4)
123 ints[i] = Bytes.readLE4(decompressBuffer, offset);
124 for(int i=0;i<longLength;i++,offset+=8)
125 longs[i] = Bytes.readLE8(decompressBuffer, offset);
127 ClusterTables result = new ClusterTables();
128 result.bytes = bytes;
130 result.longs = longs;
138 private static ClusterDecompressor decompressor = new ClusterDecompressor();
140 public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
142 ClusterTables ts = decompressor.readCluster(uid, data);
143 tables.bytes = ts.bytes;
144 tables.ints = ts.ints;
145 tables.longs = ts.longs;
149 public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
151 ClusterTables tables = decompressor.readCluster(uid, data);
153 return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
154 } catch (DatabaseException e) {
162 public void fromFile(byte[] data) {
165 cluster = loadCluster(getKey(), manager.support, data);
166 } catch (IOException e) {
173 protected Pair<byte[],Integer> toBytes() {
179 if(cluster instanceof ClusterSmall) {
180 raw = cluster.storeBytes();
183 ClusterTables tables = cluster.store();
185 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)];
187 Bytes.writeLE(raw, 0, tables.bytes.length);
188 Bytes.writeLE(raw, 4, tables.ints.length);
189 Bytes.writeLE(raw, 8, tables.longs.length);
191 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
192 int offset = 12+tables.bytes.length;
193 for(int i=0;i<tables.ints.length;i++,offset+=4)
194 Bytes.writeLE(raw, offset, tables.ints[i]);
195 for(int i=0;i<tables.longs.length;i++,offset+=8)
196 Bytes.writeLE8(raw, offset, tables.longs[i]);
200 byte[] result = new byte[raw.length + raw.length/8];
202 CompressionCodec codec = Compressions.get(Compressions.LZ4);
203 ByteBuffer input = ByteBuffer.wrap(raw);
204 ByteBuffer output = ByteBuffer.wrap(result);
205 //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
206 int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
208 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
209 // final byte[] rawOutput = new byte[compressedSize+4];
210 // output.get(rawOutput,0,compressedSize);
211 // Bytes.writeLE(rawOutput, compressedSize, raw.length);
212 // return Pair.make(rawOutput, rawOutput.length);
214 Bytes.writeLE(result, compressedSize, raw.length);
215 return Pair.make(result, compressedSize+4);
217 } catch (Throwable t) {
218 throw new IllegalStateException(t);
232 String getExtension() {
236 public void scheduleUpdate() {
238 if(VERIFY) verifyAccess();
240 if(updateState == null)
241 updateState = new ClusterUpdateState(getKey());
242 updateState.incRef();
246 public ClusterImpl getForUpdate() {
252 assert(updateState != null);
256 updateState.beginUpdate();
259 } catch (Throwable t) {
260 throw new IllegalStateException(t);
269 public void update(ClusterImpl clu) {
271 if(VERIFY) verifyAccess();
276 updateState.endUpdate();
277 updateState = updateState.decRef();
281 public ClusterImpl getCluster() {
283 if(VERIFY) verifyAccess();
292 public boolean canBePersisted() {
294 if(VERIFY) verifyAccess();
296 ClusterUpdateState state = getUpdateState();
297 if(state != null) return false;
299 if(!super.canBePersisted()) return false;
300 if(updateState == null) return true;
301 else return !updateState.inUpdate;
305 private ClusterUpdateState getUpdateState() {
307 if(VERIFY) verifyAccess();
313 private ClusterUpdateState getUpdateStateWithoutMutex() {
317 return getUpdateState();
318 } catch (Throwable t) {
319 throw new IllegalStateException(t);
327 * This method blocks - no locks here
329 public void waitForUpdates() {
331 ClusterUpdateState state = getUpdateStateWithoutMutex();
333 long start = System.nanoTime();
334 state.waitForUpdates();
335 long duration = System.nanoTime() - start;
336 System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
342 protected boolean overwrite() {