1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.exception.AcornAccessVerificationException;
12 import org.simantics.acorn.exception.IllegalAcornStateException;
13 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.compressions.CompressionCodec;
16 import org.simantics.compressions.Compressions;
17 import org.simantics.db.ClusterCreator;
18 import org.simantics.db.exception.DatabaseException;
19 import org.simantics.db.exception.SDBException;
20 import org.simantics.db.service.Bytes;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.datastructures.Pair;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
28 private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfo.class);
29 final private ClusterManager manager;
30 private ClusterImpl cluster;
31 public int changeSetId;
32 private ClusterUpdateState updateState;
34 public static final String COMPRESSION = "LZ4";
37 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) throws AcornAccessVerificationException {
38 super(LRU, manager.getFileCache(), uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
39 this.manager = manager;
45 public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) throws AcornAccessVerificationException, IllegalAcornStateException {
46 super(LRU, manager.getFileCache(), cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
47 this.manager = manager;
48 this.cluster = cluster;
49 LRU.insert(this, accessTime);
53 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
55 // Updates have been ensured at this point
61 ClusterTables tables = cluster.store();
62 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
64 } catch (IOException e) {
66 } catch (Throwable t) {
67 throw new IllegalAcornStateException(t);
72 // Ensure pending updates here - this may take some time
73 LRU.waitPending(this, false);
79 ClusterTables tables = cluster.store();
80 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
82 byte[] data = readFile();
83 ClusterTables tables = new ClusterTables();
84 loadCluster(getKey(), manager.support, data, tables);
85 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
88 } catch (IOException e) {
90 } catch (Throwable t) {
91 throw new IllegalAcornStateException(t);
98 static class ClusterDecompressor {
100 byte[] decompressBuffer = new byte[1024*1024];
102 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
104 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
106 if(decompressBuffer.length < deflatedSize)
107 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
109 CompressionCodec codec = Compressions.get(Compressions.LZ4);
111 ByteBuffer input = ByteBuffer.wrap(compressed);
112 ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
114 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
115 assert(decompressedSize <= decompressBuffer.length);
117 int byteLength = Bytes.readLE4(decompressBuffer, 0);
118 int intLength = Bytes.readLE4(decompressBuffer, 4);
119 int longLength = Bytes.readLE4(decompressBuffer, 8);
121 byte[] bytes = new byte[byteLength];
122 int[] ints = new int[intLength];
123 long[] longs = new long[longLength];
125 System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
127 int offset = 12+byteLength;
128 for(int i=0;i<intLength;i++,offset+=4)
129 ints[i] = Bytes.readLE4(decompressBuffer, offset);
130 for(int i=0;i<longLength;i++,offset+=8)
131 longs[i] = Bytes.readLE8(decompressBuffer, offset);
133 ClusterTables result = new ClusterTables();
134 result.bytes = bytes;
136 result.longs = longs;
144 private static ClusterDecompressor decompressor = new ClusterDecompressor();
146 public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
148 ClusterTables ts = decompressor.readCluster(uid, data);
149 tables.bytes = ts.bytes;
150 tables.ints = ts.ints;
151 tables.longs = ts.longs;
155 public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
157 ClusterTables tables = decompressor.readCluster(uid, data);
159 return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
160 } catch (DatabaseException e) {
168 public void fromFile(byte[] data) {
171 cluster = loadCluster(getKey(), manager.support, data);
172 } catch (IOException e) {
179 protected Pair<byte[],Integer> toBytes() throws IllegalAcornStateException {
183 if(cluster instanceof ClusterSmall) {
184 raw = cluster.storeBytes();
187 ClusterTables tables = cluster.store();
189 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)];
191 Bytes.writeLE(raw, 0, tables.bytes.length);
192 Bytes.writeLE(raw, 4, tables.ints.length);
193 Bytes.writeLE(raw, 8, tables.longs.length);
195 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
196 int offset = 12+tables.bytes.length;
197 for(int i=0;i<tables.ints.length;i++,offset+=4)
198 Bytes.writeLE(raw, offset, tables.ints[i]);
199 for(int i=0;i<tables.longs.length;i++,offset+=8)
200 Bytes.writeLE8(raw, offset, tables.longs[i]);
204 byte[] result = new byte[raw.length + raw.length/8];
206 CompressionCodec codec = Compressions.get(Compressions.LZ4);
207 ByteBuffer input = ByteBuffer.wrap(raw);
208 ByteBuffer output = ByteBuffer.wrap(result);
209 //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
210 int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
212 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
213 // final byte[] rawOutput = new byte[compressedSize+4];
214 // output.get(rawOutput,0,compressedSize);
215 // Bytes.writeLE(rawOutput, compressedSize, raw.length);
216 // return Pair.make(rawOutput, rawOutput.length);
218 Bytes.writeLE(result, compressedSize, raw.length);
219 return Pair.make(result, compressedSize+4);
221 } catch (Throwable t) {
222 throw new IllegalAcornStateException(t);
236 String getExtension() {
240 public void scheduleUpdate() throws AcornAccessVerificationException {
242 if(VERIFY) verifyAccess();
244 if(updateState == null)
245 updateState = new ClusterUpdateState(getKey());
246 updateState.incRef();
250 public ClusterImpl getForUpdate() throws SDBException {
253 assert(updateState != null);
256 updateState.beginUpdate();
258 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
260 } catch (Throwable t) {
261 throw new IllegalAcornStateException(t);
267 public void update(ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
269 if(VERIFY) verifyAccess();
274 updateState.endUpdate();
275 updateState = updateState.decRef();
279 public ClusterImpl getCluster() throws AcornAccessVerificationException, IllegalAcornStateException {
281 if(VERIFY) verifyAccess();
290 public boolean canBePersisted() throws AcornAccessVerificationException {
292 if(VERIFY) verifyAccess();
294 ClusterUpdateState state = getUpdateState();
295 if(state != null) return false;
297 if(!super.canBePersisted()) return false;
298 if(updateState == null) return true;
299 else return !updateState.inUpdate;
303 private ClusterUpdateState getUpdateState() throws AcornAccessVerificationException {
305 if(VERIFY) verifyAccess();
311 private ClusterUpdateState getUpdateStateWithoutMutex() throws IllegalAcornStateException {
315 return getUpdateState();
316 } catch (Throwable t) {
317 throw new IllegalAcornStateException(t);
325 * This method blocks - no locks here
327 public void waitForUpdates() throws IllegalAcornStateException {
329 ClusterUpdateState state = getUpdateStateWithoutMutex();
331 long start = System.nanoTime();
332 state.waitForUpdates();
333 long duration = System.nanoTime() - start;
334 if (LOGGER.isDebugEnabled())
335 LOGGER.debug("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
340 protected boolean overwrite() {
345 public Logger getLogger() {