]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java
Merge commit 'b3da313'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterStreamChunk.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
7
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.exception.AcornAccessVerificationException;
11 import org.simantics.acorn.exception.IllegalAcornStateException;
12 import org.simantics.acorn.internal.ClusterChange;
13 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
14 import org.simantics.compressions.CompressionCodec;
15 import org.simantics.compressions.Compressions;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.service.Bytes;
18 import org.simantics.utils.datastructures.Pair;
19
20 import gnu.trove.list.array.TByteArrayList;
21
22 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
23
24         // 500KB is a fine chunk
25         private static int MAX_CHUNK_SIZE = 500*1024;
26
27         int size = 0;
28         final private ClusterManager manager;
29         private boolean committed = false;
30         
31         public int nextToProcess = 0;
32         
33         public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
34         
35         // Stub
36         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) throws AcornAccessVerificationException {
37                 super(LRU, id, readDir, "clusterStream", offset, length, false, false);
38                 this.manager = manager;
39                 LRU.map(this);
40         }
41         
42         // Creation
43         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) throws AcornAccessVerificationException {
44                 super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
45                 this.manager = manager;
46                 LRU.insert(this, accessTime);
47         }
48         
49         public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
50                 
51                 if(VERIFY) verifyAccess();
52                 
53                 makeResident(true);
54
55                 ClusterUpdateOperation op = operations.get(chunkOffset);
56                 if(op == null) throw new IllegalAcornStateException("Cluster Update Operation " + ccsId + " was not found.");
57                 if(op.ccs == null) throw new IllegalAcornStateException("Cluster ChangeSet " + ccsId + " was not found.");
58
59                 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
60                 if(proc.version != ClusterChange.VERSION)
61                         return null;
62
63                 // This cluster and CCS can still be under preparation => wait
64                 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
65
66                 proc.process();
67
68                 cancelForceResident();
69                 
70                 return proc;
71                 
72         }
73         
74         public void addOperation(ClusterUpdateOperation op) throws IllegalAcornStateException {
75                 if(committed)
76                         throw new IllegalAcornStateException("Cannot add operation " + op + " to " + this + " if commited == true");
77                 operations.add(op);
78                 size += op.data.length;
79 //              if(isCommitted()) {
80 //                      LRU.refresh(this);
81 //              }
82         }
83         
84         public byte[] getOperation(int index) {
85                 return operations.get(index).data;
86         }
87         
88         public void commit() {
89                 committed = true;
90         }
91         
92         public boolean isCommitted() {
93                 if(size > MAX_CHUNK_SIZE) committed = true;
94                 return committed;
95         }
96         
97         @Override
98         public boolean canBePersisted() throws AcornAccessVerificationException {
99                 if(!super.canBePersisted()) return false;
100                 if(!isCommitted()) return false;
101                 for(ClusterUpdateOperation op : operations) {
102                         if(!op.finished) return false;
103                 }
104                 return true;
105         }
106
107         private static void writeLE(TByteArrayList bytes, int value) {
108                 
109                 bytes.add( (byte) (value & 0xFF));
110                 bytes.add((byte) ((value >>> 8) & 0xFF));
111                 bytes.add((byte) ((value >>> 16) & 0xFF));
112                 bytes.add((byte) ((value >>> 24) & 0xFF));
113
114         }
115
116         final public static void writeLE8(TByteArrayList bytes, long value) {
117                 
118                 bytes.add( (byte) (value & 0xFF));
119                 bytes.add((byte) ((value >>> 8) & 0xFF));
120                 bytes.add((byte) ((value >>> 16) & 0xFF));
121                 bytes.add((byte) ((value >>> 24) & 0xFF));
122                 bytes.add((byte) ((value >>> 32) & 0xFF));
123                 bytes.add((byte) ((value >>> 40) & 0xFF));
124                 bytes.add((byte) ((value >>> 48) & 0xFF));
125                 bytes.add((byte) ((value >>> 56) & 0xFF));
126                 
127         }
128         
129         @Override
130         protected Pair<byte[], Integer> toBytes() {
131                 
132                 assert(isCommitted());
133                 
134                 TByteArrayList raw = new TByteArrayList();
135                 
136                 writeLE(raw, operations.size());
137                 
138                 for(ClusterUpdateOperation op : operations) {
139                         
140                         writeLE(raw, op.data.length);
141                         raw.add(op.data);
142                         op.data = null;
143                         
144                         writeLE(raw, op.ccs.statementMask.size());
145                         raw.add(op.ccs.statementMask.toArray());
146                         writeLE(raw, op.ccs.oldValueEx.size());
147                         raw.add(op.ccs.oldValueEx.toArray());
148                         writeLE(raw, op.ccs.oldValues.size());
149                         
150                         for(byte[] oldValue : op.ccs.oldValues) {
151                                 int len = (oldValue != null ? oldValue.length : -1);
152                                 writeLE(raw, len);
153                                 if(oldValue != null) {
154                                         raw.add(oldValue);
155                                 }
156                         }
157                         
158                 }
159                 
160                 byte[] raw_ = raw.toArray();
161                 CompressionCodec codec = Compressions.get(Compressions.LZ4);
162                 ByteBuffer input = ByteBuffer.wrap(raw_);
163                 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
164                 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
165
166                 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
167                 final byte[] rawOutput = new byte[compressedSize+4];
168                 output.get(rawOutput,0,compressedSize);
169                 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
170
171                 release();
172                 
173                 return Pair.make(rawOutput, rawOutput.length);
174                 
175         }
176         
177         @Override
178         void release() {
179                 
180                 for(ClusterUpdateOperation op : operations) {
181                         op.data = null;
182                         op.ccs = null;
183                 }
184
185         }
186
187         static class StreamDecompressor {
188                 
189 //              byte[] decompressBuffer = new byte[1024*1024];
190
191                 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
192                         
193                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
194                         
195                         byte[] result = new byte[deflatedSize];
196                         
197 //                      if(decompressBuffer.length < deflatedSize)
198 //                              decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
199                         
200                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
201                         
202                         ByteBuffer input = ByteBuffer.wrap(compressed);
203                         ByteBuffer output = ByteBuffer.wrap(result);
204
205                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
206                         assert(decompressedSize  == deflatedSize);
207                         
208                         return result;
209                         
210                 }
211
212                 
213         }
214         
215         private static StreamDecompressor decompressor = new StreamDecompressor();
216         
217         @Override
218         public void fromFile(byte[] data_) throws IllegalAcornStateException, AcornAccessVerificationException {
219
220                 try {
221                         
222                         byte[] data = decompressor.decompressBuffer(data_);
223                         
224                         operations = new ArrayList<ClusterUpdateOperation>();
225                         
226                         int offset = 0;
227                         int opLen = Bytes.readLE4(data, offset);
228                         offset += 4;
229                         
230                         for(int i=0;i<opLen;i++) {
231                                 
232                                 int len = Bytes.readLE4(data, offset);
233                                 offset += 4;
234                                 byte[] bytes  = new byte[len];
235                                 System.arraycopy(data, offset, bytes, 0, len);
236                                 offset += len;
237                                 
238                                 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
239                                 
240                                 String ccsKey = getKey() + "." + i;
241                                 
242                                 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
243                                 op.chunk = this;
244                                 
245                                 int statementMaskLen = Bytes.readLE4(data, offset);
246                                 offset += 4;
247                                 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
248                                 for(int j=0;j<statementMaskLen;j++)
249                                         op.ccs.statementMask.add(data[offset++]);
250                                 
251                                 int oldValueExLen = Bytes.readLE4(data, offset);
252                                 offset += 4;
253                                 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
254                                 for(int j=0;j<oldValueExLen;j++)
255                                         op.ccs.oldValueEx.add(data[offset++]);
256                                 
257                                 int oldValuesSize = Bytes.readLE4(data, offset);
258                                 offset += 4;
259                                 
260                                 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
261                                 for(int j=0;j<oldValuesSize;j++) {
262
263                                         int oldValueSize = Bytes.readLE4(data, offset);
264                                         offset += 4;
265
266                                         if(oldValueSize == -1) {
267                                                 op.ccs.oldValues.add(null);
268                                         } else {
269                                                 byte[] oldValue = new byte[oldValueSize];
270                                                 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
271                                                 offset += oldValueSize;
272                                                 op.ccs.oldValues.add(oldValue);
273                                         }
274
275                                 }
276                                 operations.add(op);
277                         }
278                 } catch (IOException e) {
279                         throw new IllegalAcornStateException(e);
280                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
281             throw e;
282         }
283         }
284
285         @Override
286         String getExtension() {
287                 return "stream";
288         }
289         
290         @Override
291         public String toString() {
292                 return  "ClusterUpdateOperationChunk " + getKey();
293         }
294         
295         @Override
296         protected boolean overwrite() {
297                 return false;
298         }
299         
300 }