Acorn: Fix WriteRunnable.runReally() and other fixes
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterInfo.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.exception.AcornAccessVerificationException;
12 import org.simantics.acorn.exception.IllegalAcornStateException;
13 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.compressions.CompressionCodec;
16 import org.simantics.compressions.Compressions;
17 import org.simantics.db.ClusterCreator;
18 import org.simantics.db.exception.DatabaseException;
19 import org.simantics.db.exception.SDBException;
20 import org.simantics.db.service.Bytes;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.datastructures.Pair;
23
24 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
25         
26         final private ClusterManager manager;
27         private ClusterImpl cluster;
28         public int changeSetId;
29         private ClusterUpdateState updateState;
30
31         public static final String COMPRESSION = "LZ4";
32         
33         // Stub
34         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) throws AcornAccessVerificationException {
35                 super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
36                 this.manager = manager;
37                 this.cluster = null;
38                 LRU.map(this);
39         }
40         
41         // New
42         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) throws AcornAccessVerificationException, IllegalAcornStateException {
43                 super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
44                 this.manager = manager;
45                 this.cluster = cluster;
46                 LRU.insert(this, accessTime);
47                 LRU.swap(getKey());
48         }
49         
50         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
51                 
52                 // Updates have been ensured at this point
53                         
54                 acquireMutex();
55
56                 try {
57                         if(isResident()) {
58                                 ClusterTables tables = cluster.store();
59                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
60                         }
61                 } catch (IOException e) {
62                         throw e;
63                 } catch (Throwable t) {
64                         throw new IllegalAcornStateException(t);
65                 } finally {
66                         releaseMutex();
67                 }
68
69                 // Ensure pending updates here - this may take some time
70                 LRU.waitPending(this, false);
71
72                 acquireMutex();
73                 try {
74
75                         if(isResident()) {
76                                 ClusterTables tables = cluster.store();
77                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
78                         } else {
79                                 byte[] data = readFile();
80                                 ClusterTables tables = new ClusterTables();
81                                 loadCluster(getKey(), manager.support, data, tables);
82                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
83                         }
84
85                 } catch (IOException e) {
86                         throw e;
87                 } catch (Throwable t) {
88                         throw new IllegalAcornStateException(t);
89                 } finally {
90                         releaseMutex();
91                 }
92                 
93         }
94         
95         static class ClusterDecompressor {
96                 
97                 byte[] decompressBuffer = new byte[1024*1024];
98
99                 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
100                         
101                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
102                         
103                         if(decompressBuffer.length < deflatedSize)
104                                 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
105                         
106                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
107                         
108                         ByteBuffer input = ByteBuffer.wrap(compressed);
109                         ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
110
111                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
112                         assert(decompressedSize <= decompressBuffer.length);
113                         
114                         int byteLength = Bytes.readLE4(decompressBuffer, 0);
115                         int intLength = Bytes.readLE4(decompressBuffer, 4);
116                         int longLength = Bytes.readLE4(decompressBuffer, 8);
117                         
118                         byte[] bytes = new byte[byteLength];
119                         int[] ints = new int[intLength];
120                         long[] longs = new long[longLength];
121                         
122                         System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
123                         
124                         int offset = 12+byteLength;
125                         for(int i=0;i<intLength;i++,offset+=4)
126                                 ints[i] = Bytes.readLE4(decompressBuffer, offset);
127                         for(int i=0;i<longLength;i++,offset+=8)
128                                 longs[i] = Bytes.readLE8(decompressBuffer, offset);
129                         
130                         ClusterTables result = new ClusterTables();
131                         result.bytes = bytes;
132                         result.ints = ints;
133                         result.longs = longs;
134                         return result;
135                         
136                 }
137
138                 
139         }
140         
141         private static ClusterDecompressor decompressor = new ClusterDecompressor();
142         
143         public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
144
145                 ClusterTables ts = decompressor.readCluster(uid, data);
146                 tables.bytes = ts.bytes;
147                 tables.ints = ts.ints;
148                 tables.longs = ts.longs;
149
150         }
151         
152         public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
153                 
154                 ClusterTables tables = decompressor.readCluster(uid, data);
155                 try {
156                         return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
157                 } catch (DatabaseException e) {
158                         e.printStackTrace();
159                         return null;
160                 }
161                 
162         }
163         
164         @Override
165         public void fromFile(byte[] data) {
166                 
167                 try {
168                         cluster = loadCluster(getKey(), manager.support, data);
169                 } catch (IOException e) {
170                         e.printStackTrace();
171                 }
172                 
173         }
174
175         @Override
176         protected Pair<byte[],Integer> toBytes() throws IllegalAcornStateException {
177                 try {
178                         byte[] raw = null;
179                         
180                         if(cluster instanceof ClusterSmall) {
181                                 raw = cluster.storeBytes();
182                         } else {
183                         
184                                 ClusterTables tables = cluster.store();
185                                 
186                                 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)]; 
187         
188                                 Bytes.writeLE(raw, 0, tables.bytes.length);
189                                 Bytes.writeLE(raw, 4, tables.ints.length);
190                                 Bytes.writeLE(raw, 8, tables.longs.length);
191         
192                                 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
193                                 int offset = 12+tables.bytes.length;
194                                 for(int i=0;i<tables.ints.length;i++,offset+=4)
195                                         Bytes.writeLE(raw, offset, tables.ints[i]);
196                                 for(int i=0;i<tables.longs.length;i++,offset+=8)
197                                         Bytes.writeLE8(raw, offset, tables.longs[i]);
198                                 
199                         }
200
201                         byte[] result = new byte[raw.length + raw.length/8];
202                         
203                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
204                         ByteBuffer input = ByteBuffer.wrap(raw);
205                         ByteBuffer output = ByteBuffer.wrap(result);
206                         //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
207                         int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
208
209                         // We append inflated size - cannot prepend since decompression cannot handle offsets in input
210 //                      final byte[] rawOutput = new byte[compressedSize+4];
211 //                      output.get(rawOutput,0,compressedSize);
212 //                      Bytes.writeLE(rawOutput, compressedSize, raw.length);
213 //                      return Pair.make(rawOutput, rawOutput.length);
214
215                         Bytes.writeLE(result, compressedSize, raw.length);
216                         return Pair.make(result, compressedSize+4);
217
218                 } catch (Throwable t) {
219                         throw new IllegalAcornStateException(t);
220                 } finally {
221                         release();
222                 }
223                 
224         }
225         
226         @Override
227         void release() {
228                 cluster = null;
229         }
230
231
232         @Override
233         String getExtension() {
234                 return "cluster";
235         }
236         
237         public void scheduleUpdate() throws AcornAccessVerificationException {
238
239                 if(VERIFY) verifyAccess();
240
241                 if(updateState == null)
242                         updateState = new ClusterUpdateState(getKey());
243                 updateState.incRef();
244
245         }
246         
247         public ClusterImpl getForUpdate() throws SDBException {
248                 try {
249                         acquireMutex();
250
251                         assert(updateState != null);
252                         
253                         makeResident();
254                         setDirty(true);
255                         updateState.beginUpdate();
256                         return cluster;
257                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
258                     throw e;
259                 } catch (Throwable t) {
260                         throw new IllegalAcornStateException(t);
261                 } finally {
262                         releaseMutex();
263                 }
264         }
265         
266         public void update(ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
267                 
268                 if(VERIFY) verifyAccess();
269                 
270                 makeResident();
271                 
272                 cluster = clu;
273                 updateState.endUpdate();
274                 updateState = updateState.decRef();
275
276         }
277         
278         public ClusterImpl getCluster() throws AcornAccessVerificationException, IllegalAcornStateException {
279
280                 if(VERIFY) verifyAccess();
281
282                 makeResident();
283                 
284                 return cluster;
285                 
286         }
287         
288         @Override
289         public boolean canBePersisted() throws AcornAccessVerificationException {
290                 
291                 if(VERIFY) verifyAccess();
292
293                 ClusterUpdateState state = getUpdateState();
294                 if(state != null) return false;
295                 
296                 if(!super.canBePersisted()) return false;
297                 if(updateState == null) return true;
298                 else return !updateState.inUpdate;
299                 
300         }
301
302         private ClusterUpdateState getUpdateState() throws AcornAccessVerificationException {
303                 
304                 if(VERIFY) verifyAccess();
305                 
306                 return updateState;
307                 
308         }
309
310         private ClusterUpdateState getUpdateStateWithoutMutex() throws IllegalAcornStateException {
311                 
312                 try {
313                         acquireMutex();
314                         return getUpdateState();
315                 } catch (Throwable t) {
316                         throw new IllegalAcornStateException(t);
317                 } finally {
318                         releaseMutex();
319                 }
320                 
321         }
322
323         /*
324          * This method blocks - no locks here
325          */
326         public void waitForUpdates() throws IllegalAcornStateException {
327                 
328                 ClusterUpdateState state = getUpdateStateWithoutMutex();
329                 if(state != null) {
330                         long start = System.nanoTime();
331                         state.waitForUpdates();
332                         long duration = System.nanoTime() - start;
333                         System.err.println("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
334                 }
335         }
336         
337         @Override
338         protected boolean overwrite() {
339                 return true;
340         }
341         
342 }