1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.exception.AcornAccessVerificationException;
12 import org.simantics.acorn.exception.IllegalAcornStateException;
13 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.compressions.CompressionCodec;
16 import org.simantics.compressions.Compressions;
17 import org.simantics.db.ClusterCreator;
18 import org.simantics.db.exception.DatabaseException;
19 import org.simantics.db.exception.SDBException;
20 import org.simantics.db.service.Bytes;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.datastructures.Pair;
24 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
26 final private ClusterManager manager;
27 private ClusterImpl cluster;
28 public int changeSetId;
29 private ClusterUpdateState updateState;
31 public static final String COMPRESSION = "LZ4";
34 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) throws AcornAccessVerificationException {
35 super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
36 this.manager = manager;
42 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) throws AcornAccessVerificationException, IllegalAcornStateException {
43 super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
44 this.manager = manager;
45 this.cluster = cluster;
46 LRU.insert(this, accessTime);
50 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
52 // Updates have been ensured at this point
58 ClusterTables tables = cluster.store();
59 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
61 } catch (IOException e) {
63 } catch (Throwable t) {
64 throw new IllegalAcornStateException(t);
69 // Ensure pending updates here - this may take some time
70 LRU.waitPending(this, false);
76 ClusterTables tables = cluster.store();
77 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
79 byte[] data = readFile();
80 ClusterTables tables = new ClusterTables();
81 loadCluster(getKey(), manager.support, data, tables);
82 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
85 } catch (IOException e) {
87 } catch (Throwable t) {
88 throw new IllegalAcornStateException(t);
95 static class ClusterDecompressor {
97 byte[] decompressBuffer = new byte[1024*1024];
99 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
101 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
103 if(decompressBuffer.length < deflatedSize)
104 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
106 CompressionCodec codec = Compressions.get(Compressions.LZ4);
108 ByteBuffer input = ByteBuffer.wrap(compressed);
109 ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
111 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
112 assert(decompressedSize <= decompressBuffer.length);
114 int byteLength = Bytes.readLE4(decompressBuffer, 0);
115 int intLength = Bytes.readLE4(decompressBuffer, 4);
116 int longLength = Bytes.readLE4(decompressBuffer, 8);
118 byte[] bytes = new byte[byteLength];
119 int[] ints = new int[intLength];
120 long[] longs = new long[longLength];
122 System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
124 int offset = 12+byteLength;
125 for(int i=0;i<intLength;i++,offset+=4)
126 ints[i] = Bytes.readLE4(decompressBuffer, offset);
127 for(int i=0;i<longLength;i++,offset+=8)
128 longs[i] = Bytes.readLE8(decompressBuffer, offset);
130 ClusterTables result = new ClusterTables();
131 result.bytes = bytes;
133 result.longs = longs;
141 private static ClusterDecompressor decompressor = new ClusterDecompressor();
143 public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
145 ClusterTables ts = decompressor.readCluster(uid, data);
146 tables.bytes = ts.bytes;
147 tables.ints = ts.ints;
148 tables.longs = ts.longs;
152 public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
154 ClusterTables tables = decompressor.readCluster(uid, data);
156 return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
157 } catch (DatabaseException e) {
165 public void fromFile(byte[] data) {
168 cluster = loadCluster(getKey(), manager.support, data);
169 } catch (IOException e) {
176 protected Pair<byte[],Integer> toBytes() throws IllegalAcornStateException {
180 if(cluster instanceof ClusterSmall) {
181 raw = cluster.storeBytes();
184 ClusterTables tables = cluster.store();
186 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)];
188 Bytes.writeLE(raw, 0, tables.bytes.length);
189 Bytes.writeLE(raw, 4, tables.ints.length);
190 Bytes.writeLE(raw, 8, tables.longs.length);
192 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
193 int offset = 12+tables.bytes.length;
194 for(int i=0;i<tables.ints.length;i++,offset+=4)
195 Bytes.writeLE(raw, offset, tables.ints[i]);
196 for(int i=0;i<tables.longs.length;i++,offset+=8)
197 Bytes.writeLE8(raw, offset, tables.longs[i]);
201 byte[] result = new byte[raw.length + raw.length/8];
203 CompressionCodec codec = Compressions.get(Compressions.LZ4);
204 ByteBuffer input = ByteBuffer.wrap(raw);
205 ByteBuffer output = ByteBuffer.wrap(result);
206 //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
207 int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
209 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
210 // final byte[] rawOutput = new byte[compressedSize+4];
211 // output.get(rawOutput,0,compressedSize);
212 // Bytes.writeLE(rawOutput, compressedSize, raw.length);
213 // return Pair.make(rawOutput, rawOutput.length);
215 Bytes.writeLE(result, compressedSize, raw.length);
216 return Pair.make(result, compressedSize+4);
218 } catch (Throwable t) {
219 throw new IllegalAcornStateException(t);
233 String getExtension() {
237 public void scheduleUpdate() throws AcornAccessVerificationException {
239 if(VERIFY) verifyAccess();
241 if(updateState == null)
242 updateState = new ClusterUpdateState(getKey());
243 updateState.incRef();
247 public ClusterImpl getForUpdate() throws SDBException {
250 assert(updateState != null);
253 updateState.beginUpdate();
255 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
257 } catch (Throwable t) {
258 throw new IllegalAcornStateException(t);
264 public void update(ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
266 if(VERIFY) verifyAccess();
271 updateState.endUpdate();
272 updateState = updateState.decRef();
276 public ClusterImpl getCluster() throws AcornAccessVerificationException, IllegalAcornStateException {
278 if(VERIFY) verifyAccess();
287 public boolean canBePersisted() throws AcornAccessVerificationException {
289 if(VERIFY) verifyAccess();
291 ClusterUpdateState state = getUpdateState();
292 if(state != null) return false;
294 if(!super.canBePersisted()) return false;
295 if(updateState == null) return true;
296 else return !updateState.inUpdate;
300 private ClusterUpdateState getUpdateState() throws AcornAccessVerificationException {
302 if(VERIFY) verifyAccess();
308 private ClusterUpdateState getUpdateStateWithoutMutex() throws IllegalAcornStateException {
312 return getUpdateState();
313 } catch (Throwable t) {
314 throw new IllegalAcornStateException(t);
322 * This method blocks - no locks here
324 public void waitForUpdates() throws IllegalAcornStateException {
326 ClusterUpdateState state = getUpdateStateWithoutMutex();
328 long start = System.nanoTime();
329 state.waitForUpdates();
330 long duration = System.nanoTime() - start;
331 System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
336 protected boolean overwrite() {