]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterInfo.java
Merge commit 'fd452722e97db9cf876f4f03a9e44fe750625a92'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterInfo.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
12 import org.simantics.acorn.internal.ClusterSupport2;
13 import org.simantics.compressions.CompressionCodec;
14 import org.simantics.compressions.Compressions;
15 import org.simantics.db.ClusterCreator;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.service.Bytes;
18 import org.simantics.db.service.ClusterUID;
19 import org.simantics.utils.datastructures.Pair;
20
21 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
22         
23         final private ClusterManager manager;
24         private ClusterImpl cluster;
25         public int changeSetId;
26         private ClusterUpdateState updateState;
27
28         public static final String COMPRESSION = "LZ4";
29         
30         // Stub
31         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) {
32                 super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
33                 this.manager = manager;
34                 this.cluster = null;
35                 LRU.map(this);
36         }
37         
38         // New
39         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) {
40                 super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
41                 this.manager = manager;
42                 this.cluster = cluster;
43                 LRU.insert(this, accessTime);
44                 LRU.swap(getKey());
45         }
46         
47         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
48                 
49                 // Updates have been ensured at this point
50                         
51                 acquireMutex();
52
53                 try {
54                         if(isResident()) {
55                                 ClusterTables tables = cluster.store();
56                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
57                         }
58                 } catch (IOException e) {
59                         throw new DatabaseException(e);
60                 } catch (Throwable t) {
61                         throw new IllegalStateException(t);
62                 } finally {
63                         releaseMutex();
64                 }
65
66                 // Ensure pending updates here - this may take some time
67                 LRU.waitPending(this, false);
68
69                 acquireMutex();
70                 try {
71
72                         if(isResident()) {
73                                 ClusterTables tables = cluster.store();
74                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
75                         } else {
76                                 byte[] data = readFile();
77                                 ClusterTables tables = new ClusterTables();
78                                 loadCluster(getKey(), manager.support, data, tables);
79                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
80                         }
81
82                 } catch (IOException e) {
83                         throw new DatabaseException(e);
84                 } catch (Throwable t) {
85                         throw new IllegalStateException(t);
86                 } finally {
87                         releaseMutex();
88                 }
89                 
90         }
91         
92         static class ClusterDecompressor {
93                 
94                 byte[] decompressBuffer = new byte[1024*1024];
95
96                 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
97                         
98                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
99                         
100                         if(decompressBuffer.length < deflatedSize)
101                                 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
102                         
103                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
104                         
105                         ByteBuffer input = ByteBuffer.wrap(compressed);
106                         ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
107
108                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
109                         assert(decompressedSize <= decompressBuffer.length);
110                         
111                         int byteLength = Bytes.readLE4(decompressBuffer, 0);
112                         int intLength = Bytes.readLE4(decompressBuffer, 4);
113                         int longLength = Bytes.readLE4(decompressBuffer, 8);
114                         
115                         byte[] bytes = new byte[byteLength];
116                         int[] ints = new int[intLength];
117                         long[] longs = new long[longLength];
118                         
119                         System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
120                         
121                         int offset = 12+byteLength;
122                         for(int i=0;i<intLength;i++,offset+=4)
123                                 ints[i] = Bytes.readLE4(decompressBuffer, offset);
124                         for(int i=0;i<longLength;i++,offset+=8)
125                                 longs[i] = Bytes.readLE8(decompressBuffer, offset);
126                         
127                         ClusterTables result = new ClusterTables();
128                         result.bytes = bytes;
129                         result.ints = ints;
130                         result.longs = longs;
131                         return result;
132                         
133                 }
134
135                 
136         }
137         
138         private static ClusterDecompressor decompressor = new ClusterDecompressor();
139         
140         public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
141
142                 ClusterTables ts = decompressor.readCluster(uid, data);
143                 tables.bytes = ts.bytes;
144                 tables.ints = ts.ints;
145                 tables.longs = ts.longs;
146
147         }
148         
149         public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
150                 
151                 ClusterTables tables = decompressor.readCluster(uid, data);
152                 try {
153                         return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
154                 } catch (DatabaseException e) {
155                         e.printStackTrace();
156                         return null;
157                 }
158                 
159         }
160         
161         @Override
162         public void fromFile(byte[] data) {
163                 
164                 try {
165                         cluster = loadCluster(getKey(), manager.support, data);
166                 } catch (IOException e) {
167                         e.printStackTrace();
168                 }
169                 
170         }
171
172         @Override
173         protected Pair<byte[],Integer> toBytes() {
174                 
175                 try {
176
177                         byte[] raw = null;
178                         
179                         if(cluster instanceof ClusterSmall) {
180                                 raw = cluster.storeBytes();
181                         } else {
182                         
183                                 ClusterTables tables = cluster.store();
184                                 
185                                 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)]; 
186         
187                                 Bytes.writeLE(raw, 0, tables.bytes.length);
188                                 Bytes.writeLE(raw, 4, tables.ints.length);
189                                 Bytes.writeLE(raw, 8, tables.longs.length);
190         
191                                 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
192                                 int offset = 12+tables.bytes.length;
193                                 for(int i=0;i<tables.ints.length;i++,offset+=4)
194                                         Bytes.writeLE(raw, offset, tables.ints[i]);
195                                 for(int i=0;i<tables.longs.length;i++,offset+=8)
196                                         Bytes.writeLE8(raw, offset, tables.longs[i]);
197                                 
198                         }
199
200                         byte[] result = new byte[raw.length + raw.length/8];
201                         
202                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
203                         ByteBuffer input = ByteBuffer.wrap(raw);
204                         ByteBuffer output = ByteBuffer.wrap(result);
205                         //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
206                         int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
207
208                         // We append inflated size - cannot prepend since decompression cannot handle offsets in input
209 //                      final byte[] rawOutput = new byte[compressedSize+4];
210 //                      output.get(rawOutput,0,compressedSize);
211 //                      Bytes.writeLE(rawOutput, compressedSize, raw.length);
212 //                      return Pair.make(rawOutput, rawOutput.length);
213
214                         Bytes.writeLE(result, compressedSize, raw.length);
215                         return Pair.make(result, compressedSize+4);
216
217                 } catch (Throwable t) {
218                         throw new IllegalStateException(t);
219                 } finally {
220                         release();
221                 }
222                 
223         }
224         
225         @Override
226         void release() {
227                 cluster = null;
228         }
229
230
231         @Override
232         String getExtension() {
233                 return "cluster";
234         }
235         
236         public void scheduleUpdate() {
237
238                 if(VERIFY) verifyAccess();
239
240                 if(updateState == null)
241                         updateState = new ClusterUpdateState(getKey());
242                 updateState.incRef();
243
244         }
245         
246         public ClusterImpl getForUpdate() {
247
248                 try {
249                         
250                         acquireMutex();
251
252                         assert(updateState != null);
253                         
254                         makeResident();
255                         setDirty(true);
256                         updateState.beginUpdate();
257                         return cluster;
258                         
259                 } catch (Throwable t) {
260                         throw new IllegalStateException(t);
261                 } finally {
262                         
263                         releaseMutex();
264                         
265                 }
266
267         }
268         
269         public void update(ClusterImpl clu) {
270                 
271                 if(VERIFY) verifyAccess();
272                 
273                 makeResident();
274                 
275                 cluster = clu;
276                 updateState.endUpdate();
277                 updateState = updateState.decRef();
278
279         }
280         
281         public ClusterImpl getCluster() {
282
283                 if(VERIFY) verifyAccess();
284
285                 makeResident();
286                 
287                 return cluster;
288                 
289         }
290         
291         @Override
292         public boolean canBePersisted() {
293                 
294                 if(VERIFY) verifyAccess();
295
296                 ClusterUpdateState state = getUpdateState();
297                 if(state != null) return false;
298                 
299                 if(!super.canBePersisted()) return false;
300                 if(updateState == null) return true;
301                 else return !updateState.inUpdate;
302                 
303         }
304
305         private ClusterUpdateState getUpdateState() {
306                 
307                 if(VERIFY) verifyAccess();
308                 
309                 return updateState;
310                 
311         }
312
313         private ClusterUpdateState getUpdateStateWithoutMutex() {
314                 
315                 try {
316                         acquireMutex();
317                         return getUpdateState();
318                 } catch (Throwable t) {
319                         throw new IllegalStateException(t);
320                 } finally {
321                         releaseMutex();
322                 }
323                 
324         }
325
326         /*
327          * This method blocks - no locks here
328          */
329         public void waitForUpdates() {
330                 
331                 ClusterUpdateState state = getUpdateStateWithoutMutex();
332                 if(state != null) {
333                         long start = System.nanoTime();
334                         state.waitForUpdates();
335                         long duration = System.nanoTime() - start;
336                         System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
337                 }
338                 
339         }
340         
341         @Override
342         protected boolean overwrite() {
343                 return true;
344         }
345         
346 }