]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java
Replace System.err and System.out with SLF4J Logging
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterStreamChunk.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
7
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.exception.AcornAccessVerificationException;
11 import org.simantics.acorn.exception.IllegalAcornStateException;
12 import org.simantics.acorn.internal.ClusterChange;
13 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
14 import org.simantics.compressions.CompressionCodec;
15 import org.simantics.compressions.Compressions;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.service.Bytes;
18 import org.simantics.utils.datastructures.Pair;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import gnu.trove.list.array.TByteArrayList;
23
24 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
25
26     private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStreamChunk.class);
27         // 500KB is a fine chunk
28         private static int MAX_CHUNK_SIZE = 500*1024;
29
30         int size = 0;
31         final private ClusterManager manager;
32         private boolean committed = false;
33         
34         public int nextToProcess = 0;
35         
36         public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
37         
38         // Stub
39         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) throws AcornAccessVerificationException {
40                 super(LRU, id, readDir, "clusterStream", offset, length, false, false);
41                 this.manager = manager;
42                 LRU.map(this);
43         }
44         
45         // Creation
46         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) throws AcornAccessVerificationException {
47                 super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
48                 this.manager = manager;
49                 LRU.insert(this, accessTime);
50         }
51         
52         public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
53                 
54                 if(VERIFY) verifyAccess();
55                 
56                 makeResident(true);
57
58                 ClusterUpdateOperation op = operations.get(chunkOffset);
59                 if(op == null) throw new IllegalAcornStateException("Cluster Update Operation " + ccsId + " was not found.");
60                 if(op.ccs == null) throw new IllegalAcornStateException("Cluster ChangeSet " + ccsId + " was not found.");
61
62                 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
63                 if(proc.version != ClusterChange.VERSION)
64                         return null;
65
66                 // This cluster and CCS can still be under preparation => wait
67                 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
68
69                 proc.process();
70
71                 cancelForceResident();
72                 
73                 return proc;
74                 
75         }
76         
77         public void addOperation(ClusterUpdateOperation op) throws IllegalAcornStateException {
78                 if(committed)
79                         throw new IllegalAcornStateException("Cannot add operation " + op + " to " + this + " if commited == true");
80                 operations.add(op);
81                 size += op.data.length;
82 //              if(isCommitted()) {
83 //                      LRU.refresh(this);
84 //              }
85         }
86         
87         public byte[] getOperation(int index) {
88                 return operations.get(index).data;
89         }
90         
91         public void commit() {
92                 committed = true;
93         }
94         
95         public boolean isCommitted() {
96                 if(size > MAX_CHUNK_SIZE) committed = true;
97                 return committed;
98         }
99         
100         @Override
101         public boolean canBePersisted() throws AcornAccessVerificationException {
102                 if(!super.canBePersisted()) return false;
103                 if(!isCommitted()) return false;
104                 for(ClusterUpdateOperation op : operations) {
105                         if(!op.finished) return false;
106                 }
107                 return true;
108         }
109
110         private static void writeLE(TByteArrayList bytes, int value) {
111                 
112                 bytes.add( (byte) (value & 0xFF));
113                 bytes.add((byte) ((value >>> 8) & 0xFF));
114                 bytes.add((byte) ((value >>> 16) & 0xFF));
115                 bytes.add((byte) ((value >>> 24) & 0xFF));
116
117         }
118
119         final public static void writeLE8(TByteArrayList bytes, long value) {
120                 
121                 bytes.add( (byte) (value & 0xFF));
122                 bytes.add((byte) ((value >>> 8) & 0xFF));
123                 bytes.add((byte) ((value >>> 16) & 0xFF));
124                 bytes.add((byte) ((value >>> 24) & 0xFF));
125                 bytes.add((byte) ((value >>> 32) & 0xFF));
126                 bytes.add((byte) ((value >>> 40) & 0xFF));
127                 bytes.add((byte) ((value >>> 48) & 0xFF));
128                 bytes.add((byte) ((value >>> 56) & 0xFF));
129                 
130         }
131         
132         @Override
133         protected Pair<byte[], Integer> toBytes() {
134                 
135                 assert(isCommitted());
136                 
137                 TByteArrayList raw = new TByteArrayList();
138                 
139                 writeLE(raw, operations.size());
140                 
141                 for(ClusterUpdateOperation op : operations) {
142                         
143                         writeLE(raw, op.data.length);
144                         raw.add(op.data);
145                         op.data = null;
146                         
147                         writeLE(raw, op.ccs.statementMask.size());
148                         raw.add(op.ccs.statementMask.toArray());
149                         writeLE(raw, op.ccs.oldValueEx.size());
150                         raw.add(op.ccs.oldValueEx.toArray());
151                         writeLE(raw, op.ccs.oldValues.size());
152                         
153                         for(byte[] oldValue : op.ccs.oldValues) {
154                                 int len = (oldValue != null ? oldValue.length : -1);
155                                 writeLE(raw, len);
156                                 if(oldValue != null) {
157                                         raw.add(oldValue);
158                                 }
159                         }
160                         
161                 }
162                 
163                 byte[] raw_ = raw.toArray();
164                 CompressionCodec codec = Compressions.get(Compressions.LZ4);
165                 ByteBuffer input = ByteBuffer.wrap(raw_);
166                 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
167                 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
168
169                 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
170                 final byte[] rawOutput = new byte[compressedSize+4];
171                 output.get(rawOutput,0,compressedSize);
172                 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
173
174                 release();
175                 
176                 return Pair.make(rawOutput, rawOutput.length);
177                 
178         }
179         
180         @Override
181         void release() {
182                 
183                 for(ClusterUpdateOperation op : operations) {
184                         op.data = null;
185                         op.ccs = null;
186                 }
187
188         }
189
190         static class StreamDecompressor {
191                 
192 //              byte[] decompressBuffer = new byte[1024*1024];
193
194                 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
195                         
196                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
197                         
198                         byte[] result = new byte[deflatedSize];
199                         
200 //                      if(decompressBuffer.length < deflatedSize)
201 //                              decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
202                         
203                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
204                         
205                         ByteBuffer input = ByteBuffer.wrap(compressed);
206                         ByteBuffer output = ByteBuffer.wrap(result);
207
208                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
209                         assert(decompressedSize  == deflatedSize);
210                         
211                         return result;
212                         
213                 }
214
215                 
216         }
217         
218         private static StreamDecompressor decompressor = new StreamDecompressor();
219         
220         @Override
221         public void fromFile(byte[] data_) throws IllegalAcornStateException, AcornAccessVerificationException {
222
223                 try {
224                         
225                         byte[] data = decompressor.decompressBuffer(data_);
226                         
227                         operations = new ArrayList<ClusterUpdateOperation>();
228                         
229                         int offset = 0;
230                         int opLen = Bytes.readLE4(data, offset);
231                         offset += 4;
232                         
233                         for(int i=0;i<opLen;i++) {
234                                 
235                                 int len = Bytes.readLE4(data, offset);
236                                 offset += 4;
237                                 byte[] bytes  = new byte[len];
238                                 System.arraycopy(data, offset, bytes, 0, len);
239                                 offset += len;
240                                 
241                                 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
242                                 
243                                 String ccsKey = getKey() + "." + i;
244                                 
245                                 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
246                                 op.chunk = this;
247                                 
248                                 int statementMaskLen = Bytes.readLE4(data, offset);
249                                 offset += 4;
250                                 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
251                                 for(int j=0;j<statementMaskLen;j++)
252                                         op.ccs.statementMask.add(data[offset++]);
253                                 
254                                 int oldValueExLen = Bytes.readLE4(data, offset);
255                                 offset += 4;
256                                 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
257                                 for(int j=0;j<oldValueExLen;j++)
258                                         op.ccs.oldValueEx.add(data[offset++]);
259                                 
260                                 int oldValuesSize = Bytes.readLE4(data, offset);
261                                 offset += 4;
262                                 
263                                 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
264                                 for(int j=0;j<oldValuesSize;j++) {
265
266                                         int oldValueSize = Bytes.readLE4(data, offset);
267                                         offset += 4;
268
269                                         if(oldValueSize == -1) {
270                                                 op.ccs.oldValues.add(null);
271                                         } else {
272                                                 byte[] oldValue = new byte[oldValueSize];
273                                                 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
274                                                 offset += oldValueSize;
275                                                 op.ccs.oldValues.add(oldValue);
276                                         }
277
278                                 }
279                                 operations.add(op);
280                         }
281                 } catch (IOException e) {
282                         throw new IllegalAcornStateException(e);
283                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
284             throw e;
285         }
286         }
287
288         @Override
289         String getExtension() {
290                 return "stream";
291         }
292         
293         @Override
294         public String toString() {
295                 return  "ClusterUpdateOperationChunk " + getKey();
296         }
297         
298         @Override
299         protected boolean overwrite() {
300                 return false;
301         }
302
303         @Override
304         public Logger getLogger() {
305             return LOGGER;
306         }
307 }