]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterInfo.java
Replace System.err and System.out with SLF4J Logging
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterInfo.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6
7 import org.simantics.acorn.ClusterManager;
8 import org.simantics.acorn.Persistable;
9 import org.simantics.acorn.cluster.ClusterImpl;
10 import org.simantics.acorn.cluster.ClusterSmall;
11 import org.simantics.acorn.exception.AcornAccessVerificationException;
12 import org.simantics.acorn.exception.IllegalAcornStateException;
13 import org.simantics.acorn.cluster.ClusterImpl.ClusterTables;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.compressions.CompressionCodec;
16 import org.simantics.compressions.Compressions;
17 import org.simantics.db.ClusterCreator;
18 import org.simantics.db.exception.DatabaseException;
19 import org.simantics.db.exception.SDBException;
20 import org.simantics.db.service.Bytes;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.datastructures.Pair;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class ClusterInfo extends LRUObject<ClusterUID, ClusterInfo> implements Persistable {
27
28     private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfo.class);
29         final private ClusterManager manager;
30         private ClusterImpl cluster;
31         public int changeSetId;
32         private ClusterUpdateState updateState;
33
34         public static final String COMPRESSION = "LZ4";
35         
36         // Stub
37         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, Path readDirectory, ClusterUID uid, int offset, int length) throws AcornAccessVerificationException {
38                 super(LRU, uid, readDirectory, uid.toString() + ".cluster", offset, length, false, false);
39                 this.manager = manager;
40                 this.cluster = null;
41                 LRU.map(this);
42         }
43         
44         // New
45         public ClusterInfo(ClusterManager manager, LRU<ClusterUID, ClusterInfo> LRU, ClusterImpl cluster) throws AcornAccessVerificationException, IllegalAcornStateException {
46                 super(LRU, cluster.getClusterUID(), LRU.getDirectory(), cluster.getClusterUID().toString() + ".cluster", true, true);
47                 this.manager = manager;
48                 this.cluster = cluster;
49                 LRU.insert(this, accessTime);
50                 LRU.swap(getKey());
51         }
52         
53         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
54                 
55                 // Updates have been ensured at this point
56                         
57                 acquireMutex();
58
59                 try {
60                         if(isResident()) {
61                                 ClusterTables tables = cluster.store();
62                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
63                         }
64                 } catch (IOException e) {
65                         throw e;
66                 } catch (Throwable t) {
67                         throw new IllegalAcornStateException(t);
68                 } finally {
69                         releaseMutex();
70                 }
71
72                 // Ensure pending updates here - this may take some time
73                 LRU.waitPending(this, false);
74
75                 acquireMutex();
76                 try {
77
78                         if(isResident()) {
79                                 ClusterTables tables = cluster.store();
80                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs); 
81                         } else {
82                                 byte[] data = readFile();
83                                 ClusterTables tables = new ClusterTables();
84                                 loadCluster(getKey(), manager.support, data, tables);
85                                 return creator.create(uid, tables.bytes, tables.ints, tables.longs);
86                         }
87
88                 } catch (IOException e) {
89                         throw e;
90                 } catch (Throwable t) {
91                         throw new IllegalAcornStateException(t);
92                 } finally {
93                         releaseMutex();
94                 }
95                 
96         }
97         
98         static class ClusterDecompressor {
99                 
100                 byte[] decompressBuffer = new byte[1024*1024];
101
102                 public synchronized ClusterTables readCluster(ClusterUID uid, byte[] compressed) throws IOException {
103                         
104                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
105                         
106                         if(decompressBuffer.length < deflatedSize)
107                                 decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
108                         
109                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
110                         
111                         ByteBuffer input = ByteBuffer.wrap(compressed);
112                         ByteBuffer output = ByteBuffer.wrap(decompressBuffer);
113
114                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, decompressBuffer.length);
115                         assert(decompressedSize <= decompressBuffer.length);
116                         
117                         int byteLength = Bytes.readLE4(decompressBuffer, 0);
118                         int intLength = Bytes.readLE4(decompressBuffer, 4);
119                         int longLength = Bytes.readLE4(decompressBuffer, 8);
120                         
121                         byte[] bytes = new byte[byteLength];
122                         int[] ints = new int[intLength];
123                         long[] longs = new long[longLength];
124                         
125                         System.arraycopy(decompressBuffer, 12, bytes, 0, byteLength);
126                         
127                         int offset = 12+byteLength;
128                         for(int i=0;i<intLength;i++,offset+=4)
129                                 ints[i] = Bytes.readLE4(decompressBuffer, offset);
130                         for(int i=0;i<longLength;i++,offset+=8)
131                                 longs[i] = Bytes.readLE8(decompressBuffer, offset);
132                         
133                         ClusterTables result = new ClusterTables();
134                         result.bytes = bytes;
135                         result.ints = ints;
136                         result.longs = longs;
137                         return result;
138                         
139                 }
140
141                 
142         }
143         
144         private static ClusterDecompressor decompressor = new ClusterDecompressor();
145         
146         public void loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data, ClusterTables tables) throws IOException {
147
148                 ClusterTables ts = decompressor.readCluster(uid, data);
149                 tables.bytes = ts.bytes;
150                 tables.ints = ts.ints;
151                 tables.longs = ts.longs;
152
153         }
154         
155         public ClusterImpl loadCluster(ClusterUID uid, ClusterSupport2 support, byte[] data) throws IOException {
156                 
157                 ClusterTables tables = decompressor.readCluster(uid, data);
158                 try {
159                         return ClusterImpl.make(support, tables.longs, tables.ints, tables.bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
160                 } catch (DatabaseException e) {
161                         e.printStackTrace();
162                         return null;
163                 }
164                 
165         }
166         
167         @Override
168         public void fromFile(byte[] data) {
169                 
170                 try {
171                         cluster = loadCluster(getKey(), manager.support, data);
172                 } catch (IOException e) {
173                         e.printStackTrace();
174                 }
175                 
176         }
177
178         @Override
179         protected Pair<byte[],Integer> toBytes() throws IllegalAcornStateException {
180                 try {
181                         byte[] raw = null;
182                         
183                         if(cluster instanceof ClusterSmall) {
184                                 raw = cluster.storeBytes();
185                         } else {
186                         
187                                 ClusterTables tables = cluster.store();
188                                 
189                                 raw = new byte[12 + tables.bytes.length + (tables.ints.length<<2) + (tables.longs.length<<3)]; 
190         
191                                 Bytes.writeLE(raw, 0, tables.bytes.length);
192                                 Bytes.writeLE(raw, 4, tables.ints.length);
193                                 Bytes.writeLE(raw, 8, tables.longs.length);
194         
195                                 System.arraycopy(tables.bytes, 0, raw, 12, tables.bytes.length);
196                                 int offset = 12+tables.bytes.length;
197                                 for(int i=0;i<tables.ints.length;i++,offset+=4)
198                                         Bytes.writeLE(raw, offset, tables.ints[i]);
199                                 for(int i=0;i<tables.longs.length;i++,offset+=8)
200                                         Bytes.writeLE8(raw, offset, tables.longs[i]);
201                                 
202                         }
203
204                         byte[] result = new byte[raw.length + raw.length/8];
205                         
206                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
207                         ByteBuffer input = ByteBuffer.wrap(raw);
208                         ByteBuffer output = ByteBuffer.wrap(result);
209                         //ByteBuffer output = ByteBuffer.allocate(raw.length + raw.length/8);
210                         int compressedSize = codec.compressBuffer(input, 0, raw.length, output, 0);
211
212                         // We append inflated size - cannot prepend since decompression cannot handle offsets in input
213 //                      final byte[] rawOutput = new byte[compressedSize+4];
214 //                      output.get(rawOutput,0,compressedSize);
215 //                      Bytes.writeLE(rawOutput, compressedSize, raw.length);
216 //                      return Pair.make(rawOutput, rawOutput.length);
217
218                         Bytes.writeLE(result, compressedSize, raw.length);
219                         return Pair.make(result, compressedSize+4);
220
221                 } catch (Throwable t) {
222                         throw new IllegalAcornStateException(t);
223                 } finally {
224                         release();
225                 }
226                 
227         }
228         
229         @Override
230         void release() {
231                 cluster = null;
232         }
233
234
235         @Override
236         String getExtension() {
237                 return "cluster";
238         }
239         
240         public void scheduleUpdate() throws AcornAccessVerificationException {
241
242                 if(VERIFY) verifyAccess();
243
244                 if(updateState == null)
245                         updateState = new ClusterUpdateState(getKey());
246                 updateState.incRef();
247
248         }
249         
250         public ClusterImpl getForUpdate() throws SDBException {
251                 acquireMutex();
252                 try {
253                         assert(updateState != null);
254                         makeResident();
255                         setDirty(true);
256                         updateState.beginUpdate();
257                         return cluster;
258                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
259                     throw e;
260                 } catch (Throwable t) {
261                         throw new IllegalAcornStateException(t);
262                 } finally {
263                         releaseMutex();
264                 }
265         }
266         
267         public void update(ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
268                 
269                 if(VERIFY) verifyAccess();
270                 
271                 makeResident();
272                 
273                 cluster = clu;
274                 updateState.endUpdate();
275                 updateState = updateState.decRef();
276
277         }
278         
279         public ClusterImpl getCluster() throws AcornAccessVerificationException, IllegalAcornStateException {
280
281                 if(VERIFY) verifyAccess();
282
283                 makeResident();
284                 
285                 return cluster;
286                 
287         }
288         
289         @Override
290         public boolean canBePersisted() throws AcornAccessVerificationException {
291                 
292                 if(VERIFY) verifyAccess();
293
294                 ClusterUpdateState state = getUpdateState();
295                 if(state != null) return false;
296                 
297                 if(!super.canBePersisted()) return false;
298                 if(updateState == null) return true;
299                 else return !updateState.inUpdate;
300                 
301         }
302
303         private ClusterUpdateState getUpdateState() throws AcornAccessVerificationException {
304                 
305                 if(VERIFY) verifyAccess();
306                 
307                 return updateState;
308                 
309         }
310
311         private ClusterUpdateState getUpdateStateWithoutMutex() throws IllegalAcornStateException {
312                 
313                 try {
314                         acquireMutex();
315                         return getUpdateState();
316                 } catch (Throwable t) {
317                         throw new IllegalAcornStateException(t);
318                 } finally {
319                         releaseMutex();
320                 }
321                 
322         }
323
324         /*
325          * This method blocks - no locks here
326          */
327         public void waitForUpdates() throws IllegalAcornStateException {
328                 
329                 ClusterUpdateState state = getUpdateStateWithoutMutex();
330                 if(state != null) {
331                         long start = System.nanoTime();
332                         state.waitForUpdates();
333                         long duration = System.nanoTime() - start;
334                         if (LOGGER.isDebugEnabled())
335                             LOGGER.debug("Wait updates to cluster " + getKey() + " for " + (1e-6 * duration) + "ms.");
336                 }
337         }
338         
339         @Override
340         protected boolean overwrite() {
341                 return true;
342         }
343
344         @Override
345         public Logger getLogger() {
346             return LOGGER;
347         }
348 }