1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.internal.ClusterChange;
11 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
12 import org.simantics.compressions.CompressionCodec;
13 import org.simantics.compressions.Compressions;
14 import org.simantics.db.exception.DatabaseException;
15 import org.simantics.db.service.Bytes;
16 import org.simantics.utils.datastructures.Pair;
18 import gnu.trove.list.array.TByteArrayList;
20 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
22 // 500KB is a fine chunk
23 private static int MAX_CHUNK_SIZE = 500*1024;
26 final private ClusterManager manager;
27 private boolean committed = false;
29 public int nextToProcess = 0;
31 public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
34 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) {
35 super(LRU, id, readDir, "clusterStream", offset, length, false, false);
36 this.manager = manager;
41 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) {
42 super(LRU, id, LRU.getDirectory(), "clusterStream", true, true);
43 this.manager = manager;
44 LRU.insert(this, accessTime);
47 public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
49 if(VERIFY) verifyAccess();
53 ClusterUpdateOperation op = operations.get(chunkOffset);
54 if(op == null) throw new IllegalStateException("Cluster Update Operation " + ccsId + " was not found.");
55 if(op.ccs == null) throw new IllegalStateException("Cluster ChangeSet " + ccsId + " was not found.");
57 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
58 if(proc.version != ClusterChange.VERSION)
61 // This cluster and CCS can still be under preparation => wait
62 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
66 cancelForceResident();
72 public void addOperation(ClusterUpdateOperation op) {
74 throw new IllegalStateException();
76 size += op.data.length;
77 // if(isCommitted()) {
82 public byte[] getOperation(int index) {
83 return operations.get(index).data;
86 public void commit() {
90 public boolean isCommitted() {
91 if(size > MAX_CHUNK_SIZE) committed = true;
96 public boolean canBePersisted() {
97 if(!super.canBePersisted()) return false;
98 if(!isCommitted()) return false;
99 for(ClusterUpdateOperation op : operations) {
100 if(!op.finished) return false;
105 private static void writeLE(TByteArrayList bytes, int value) {
107 bytes.add( (byte) (value & 0xFF));
108 bytes.add((byte) ((value >>> 8) & 0xFF));
109 bytes.add((byte) ((value >>> 16) & 0xFF));
110 bytes.add((byte) ((value >>> 24) & 0xFF));
114 final public static void writeLE8(TByteArrayList bytes, long value) {
116 bytes.add( (byte) (value & 0xFF));
117 bytes.add((byte) ((value >>> 8) & 0xFF));
118 bytes.add((byte) ((value >>> 16) & 0xFF));
119 bytes.add((byte) ((value >>> 24) & 0xFF));
120 bytes.add((byte) ((value >>> 32) & 0xFF));
121 bytes.add((byte) ((value >>> 40) & 0xFF));
122 bytes.add((byte) ((value >>> 48) & 0xFF));
123 bytes.add((byte) ((value >>> 56) & 0xFF));
128 protected Pair<byte[], Integer> toBytes() {
130 assert(isCommitted());
132 TByteArrayList raw = new TByteArrayList();
134 writeLE(raw, operations.size());
136 for(ClusterUpdateOperation op : operations) {
138 writeLE(raw, op.data.length);
142 writeLE(raw, op.ccs.statementMask.size());
143 raw.add(op.ccs.statementMask.toArray());
144 writeLE(raw, op.ccs.oldValueEx.size());
145 raw.add(op.ccs.oldValueEx.toArray());
146 writeLE(raw, op.ccs.oldValues.size());
148 for(byte[] oldValue : op.ccs.oldValues) {
149 int len = (oldValue != null ? oldValue.length : -1);
151 if(oldValue != null) {
158 byte[] raw_ = raw.toArray();
159 CompressionCodec codec = Compressions.get(Compressions.LZ4);
160 ByteBuffer input = ByteBuffer.wrap(raw_);
161 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
162 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
164 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
165 final byte[] rawOutput = new byte[compressedSize+4];
166 output.get(rawOutput,0,compressedSize);
167 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
171 return Pair.make(rawOutput, rawOutput.length);
178 for(ClusterUpdateOperation op : operations) {
185 static class StreamDecompressor {
187 // byte[] decompressBuffer = new byte[1024*1024];
189 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
191 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
193 byte[] result = new byte[deflatedSize];
195 // if(decompressBuffer.length < deflatedSize)
196 // decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
198 CompressionCodec codec = Compressions.get(Compressions.LZ4);
200 ByteBuffer input = ByteBuffer.wrap(compressed);
201 ByteBuffer output = ByteBuffer.wrap(result);
203 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
204 assert(decompressedSize == deflatedSize);
213 private static StreamDecompressor decompressor = new StreamDecompressor();
216 public void fromFile(byte[] data_) {
220 byte[] data = decompressor.decompressBuffer(data_);
222 operations = new ArrayList<ClusterUpdateOperation>();
225 int opLen = Bytes.readLE4(data, offset);
228 for(int i=0;i<opLen;i++) {
230 int len = Bytes.readLE4(data, offset);
232 byte[] bytes = new byte[len];
233 System.arraycopy(data, offset, bytes, 0, len);
236 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
238 String ccsKey = getKey() + "." + i;
240 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
243 int statementMaskLen = Bytes.readLE4(data, offset);
245 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
246 for(int j=0;j<statementMaskLen;j++)
247 op.ccs.statementMask.add(data[offset++]);
249 int oldValueExLen = Bytes.readLE4(data, offset);
251 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
252 for(int j=0;j<oldValueExLen;j++)
253 op.ccs.oldValueEx.add(data[offset++]);
255 int oldValuesSize = Bytes.readLE4(data, offset);
258 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
259 for(int j=0;j<oldValuesSize;j++) {
261 int oldValueSize = Bytes.readLE4(data, offset);
264 if(oldValueSize == -1) {
265 op.ccs.oldValues.add(null);
267 byte[] oldValue = new byte[oldValueSize];
268 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
269 offset += oldValueSize;
270 op.ccs.oldValues.add(oldValue);
279 } catch (IOException e) {
281 throw new IllegalStateException(e);
288 String getExtension() {
293 public String toString() {
294 return "ClusterUpdateOperationChunk " + getKey();
298 protected boolean overwrite() {