]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/ClusterStreamChunk.java
Merge "Replace embedded pdfbox jars with external plug-ins."
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / ClusterStreamChunk.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
7
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.internal.ClusterChange;
11 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
12 import org.simantics.compressions.CompressionCodec;
13 import org.simantics.compressions.Compressions;
14 import org.simantics.db.exception.DatabaseException;
15 import org.simantics.db.service.Bytes;
16 import org.simantics.utils.datastructures.Pair;
17
18 import gnu.trove.list.array.TByteArrayList;
19
20 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
21
22         // 500KB is a fine chunk
23         private static int MAX_CHUNK_SIZE = 500*1024;
24
25         int size = 0;
26         final private ClusterManager manager;
27         private boolean committed = false;
28         
29         public int nextToProcess = 0;
30         
31         public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
32         
33         // Stub
34         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) {
35                 super(LRU, id, readDir, "clusterStream", offset, length, false, false);
36                 this.manager = manager;
37                 LRU.map(this);
38         }
39         
40         // Creation
41         public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) {
42                 super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
43                 this.manager = manager;
44                 LRU.insert(this, accessTime);
45         }
46         
47         public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
48                 
49                 if(VERIFY) verifyAccess();
50                 
51                 makeResident(true);
52
53                 ClusterUpdateOperation op = operations.get(chunkOffset);
54                 if(op == null) throw new IllegalStateException("Cluster Update Operation " + ccsId + " was not found.");
55                 if(op.ccs == null) throw new IllegalStateException("Cluster ChangeSet " + ccsId + " was not found.");
56
57                 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
58                 if(proc.version != ClusterChange.VERSION)
59                         return null;
60
61                 // This cluster and CCS can still be under preparation => wait
62                 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
63
64                 proc.process();
65
66                 cancelForceResident();
67                 
68                 return proc;
69                 
70         }
71         
72         public void addOperation(ClusterUpdateOperation op) {
73                 if(committed)
74                         throw new IllegalStateException();
75                 operations.add(op);
76                 size += op.data.length;
77 //              if(isCommitted()) {
78 //                      LRU.refresh(this);
79 //              }
80         }
81         
82         public byte[] getOperation(int index) {
83                 return operations.get(index).data;
84         }
85         
86         public void commit() {
87                 committed = true;
88         }
89         
90         public boolean isCommitted() {
91                 if(size > MAX_CHUNK_SIZE) committed = true;
92                 return committed;
93         }
94         
95         @Override
96         public boolean canBePersisted() {
97                 if(!super.canBePersisted()) return false;
98                 if(!isCommitted()) return false;
99                 for(ClusterUpdateOperation op : operations) {
100                         if(!op.finished) return false;
101                 }
102                 return true;
103         }
104
105         private static void writeLE(TByteArrayList bytes, int value) {
106                 
107                 bytes.add( (byte) (value & 0xFF));
108                 bytes.add((byte) ((value >>> 8) & 0xFF));
109                 bytes.add((byte) ((value >>> 16) & 0xFF));
110                 bytes.add((byte) ((value >>> 24) & 0xFF));
111
112         }
113
114         final public static void writeLE8(TByteArrayList bytes, long value) {
115                 
116                 bytes.add( (byte) (value & 0xFF));
117                 bytes.add((byte) ((value >>> 8) & 0xFF));
118                 bytes.add((byte) ((value >>> 16) & 0xFF));
119                 bytes.add((byte) ((value >>> 24) & 0xFF));
120                 bytes.add((byte) ((value >>> 32) & 0xFF));
121                 bytes.add((byte) ((value >>> 40) & 0xFF));
122                 bytes.add((byte) ((value >>> 48) & 0xFF));
123                 bytes.add((byte) ((value >>> 56) & 0xFF));
124                 
125         }
126         
127         @Override
128         protected Pair<byte[], Integer> toBytes() {
129                 
130                 assert(isCommitted());
131                 
132                 TByteArrayList raw = new TByteArrayList();
133                 
134                 writeLE(raw, operations.size());
135                 
136                 for(ClusterUpdateOperation op : operations) {
137                         
138                         writeLE(raw, op.data.length);
139                         raw.add(op.data);
140                         op.data = null;
141                         
142                         writeLE(raw, op.ccs.statementMask.size());
143                         raw.add(op.ccs.statementMask.toArray());
144                         writeLE(raw, op.ccs.oldValueEx.size());
145                         raw.add(op.ccs.oldValueEx.toArray());
146                         writeLE(raw, op.ccs.oldValues.size());
147                         
148                         for(byte[] oldValue : op.ccs.oldValues) {
149                                 int len = (oldValue != null ? oldValue.length : -1);
150                                 writeLE(raw, len);
151                                 if(oldValue != null) {
152                                         raw.add(oldValue);
153                                 }
154                         }
155                         
156                 }
157                 
158                 byte[] raw_ = raw.toArray();
159                 CompressionCodec codec = Compressions.get(Compressions.LZ4);
160                 ByteBuffer input = ByteBuffer.wrap(raw_);
161                 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
162                 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
163
164                 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
165                 final byte[] rawOutput = new byte[compressedSize+4];
166                 output.get(rawOutput,0,compressedSize);
167                 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
168
169                 release();
170                 
171                 return Pair.make(rawOutput, rawOutput.length);
172                 
173         }
174         
175         @Override
176         void release() {
177                 
178                 for(ClusterUpdateOperation op : operations) {
179                         op.data = null;
180                         op.ccs = null;
181                 }
182
183         }
184
185         static class StreamDecompressor {
186                 
187 //              byte[] decompressBuffer = new byte[1024*1024];
188
189                 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
190                         
191                         int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
192                         
193                         byte[] result = new byte[deflatedSize];
194                         
195 //                      if(decompressBuffer.length < deflatedSize)
196 //                              decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
197                         
198                         CompressionCodec codec = Compressions.get(Compressions.LZ4);
199                         
200                         ByteBuffer input = ByteBuffer.wrap(compressed);
201                         ByteBuffer output = ByteBuffer.wrap(result);
202
203                         int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
204                         assert(decompressedSize  == deflatedSize);
205                         
206                         return result;
207                         
208                 }
209
210                 
211         }
212         
213         private static StreamDecompressor decompressor = new StreamDecompressor();
214         
215         @Override
216         public void fromFile(byte[] data_) {
217
218                 try {
219                         
220                         byte[] data = decompressor.decompressBuffer(data_);
221                         
222                         operations = new ArrayList<ClusterUpdateOperation>();
223                         
224                         int offset = 0;
225                         int opLen = Bytes.readLE4(data, offset);
226                         offset += 4;
227                         
228                         for(int i=0;i<opLen;i++) {
229                                 
230                                 int len = Bytes.readLE4(data, offset);
231                                 offset += 4;
232                                 byte[] bytes  = new byte[len];
233                                 System.arraycopy(data, offset, bytes, 0, len);
234                                 offset += len;
235                                 
236                                 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
237                                 
238                                 String ccsKey = getKey() + "." + i;
239                                 
240                                 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
241                                 op.chunk = this;
242                                 
243                                 int statementMaskLen = Bytes.readLE4(data, offset);
244                                 offset += 4;
245                                 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
246                                 for(int j=0;j<statementMaskLen;j++)
247                                         op.ccs.statementMask.add(data[offset++]);
248                                 
249                                 int oldValueExLen = Bytes.readLE4(data, offset);
250                                 offset += 4;
251                                 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
252                                 for(int j=0;j<oldValueExLen;j++)
253                                         op.ccs.oldValueEx.add(data[offset++]);
254                                 
255                                 int oldValuesSize = Bytes.readLE4(data, offset);
256                                 offset += 4;
257                                 
258                                 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
259                                 for(int j=0;j<oldValuesSize;j++) {
260
261                                         int oldValueSize = Bytes.readLE4(data, offset);
262                                         offset += 4;
263
264                                         if(oldValueSize == -1) {
265                                                 op.ccs.oldValues.add(null);
266                                         } else {
267                                                 byte[] oldValue = new byte[oldValueSize];
268                                                 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
269                                                 offset += oldValueSize;
270                                                 op.ccs.oldValues.add(oldValue);
271                                         }
272
273                                 }
274                                 
275                                 operations.add(op);
276                                 
277                         }
278                         
279                 } catch (IOException e) {
280                         
281                         throw new IllegalStateException(e);
282                         
283                 }
284                 
285         }
286
287         @Override
288         String getExtension() {
289                 return "stream";
290         }
291         
292         @Override
293         public String toString() {
294                 return  "ClusterUpdateOperationChunk " + getKey();
295         }
296         
297         @Override
298         protected boolean overwrite() {
299                 return false;
300         }
301         
302 }