1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.exception.AcornAccessVerificationException;
11 import org.simantics.acorn.exception.IllegalAcornStateException;
12 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
13 import org.simantics.compressions.CompressionCodec;
14 import org.simantics.compressions.Compressions;
15 import org.simantics.db.exception.DatabaseException;
16 import org.simantics.db.service.Bytes;
17 import org.simantics.utils.datastructures.Pair;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import gnu.trove.list.array.TByteArrayList;
23 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
25 private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStreamChunk.class);
26 // 500KB is a fine chunk
27 private static int MAX_CHUNK_SIZE = 500*1024;
30 final private ClusterManager manager;
31 private boolean committed = false;
33 public int nextToProcess = 0;
35 public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
38 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) throws AcornAccessVerificationException {
39 super(LRU, manager.getFileCache(), id, readDir, "clusterStream", offset, length, false, false);
40 this.manager = manager;
45 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) throws AcornAccessVerificationException {
46 super(LRU, manager.getFileCache(), id, LRU.getDirectory(), "clusterStream", true, true);
47 this.manager = manager;
48 LRU.insert(this, accessTime);
51 public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
53 if(VERIFY) verifyAccess();
57 ClusterUpdateOperation op = operations.get(chunkOffset);
58 if(op == null) throw new IllegalAcornStateException("Cluster Update Operation " + ccsId + " was not found.");
59 if(op.ccs == null) throw new IllegalAcornStateException("Cluster ChangeSet " + ccsId + " was not found.");
61 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
63 // This cluster and CCS can still be under preparation => wait
64 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
68 cancelForceResident();
74 public void addOperation(ClusterUpdateOperation op) throws IllegalAcornStateException {
76 throw new IllegalAcornStateException("Cannot add operation " + op + " to " + this + " if commited == true");
78 size += op.data.length;
79 // if(isCommitted()) {
84 public byte[] getOperation(int index) {
85 return operations.get(index).data;
88 public void commit() {
92 public boolean isCommitted() {
93 if(size > MAX_CHUNK_SIZE) committed = true;
98 public boolean canBePersisted() throws AcornAccessVerificationException {
99 if(!super.canBePersisted()) return false;
100 if(!isCommitted()) return false;
101 for(ClusterUpdateOperation op : operations) {
102 if(!op.finished) return false;
107 private static void writeLE(TByteArrayList bytes, int value) {
109 bytes.add( (byte) (value & 0xFF));
110 bytes.add((byte) ((value >>> 8) & 0xFF));
111 bytes.add((byte) ((value >>> 16) & 0xFF));
112 bytes.add((byte) ((value >>> 24) & 0xFF));
116 final public static void writeLE8(TByteArrayList bytes, long value) {
118 bytes.add( (byte) (value & 0xFF));
119 bytes.add((byte) ((value >>> 8) & 0xFF));
120 bytes.add((byte) ((value >>> 16) & 0xFF));
121 bytes.add((byte) ((value >>> 24) & 0xFF));
122 bytes.add((byte) ((value >>> 32) & 0xFF));
123 bytes.add((byte) ((value >>> 40) & 0xFF));
124 bytes.add((byte) ((value >>> 48) & 0xFF));
125 bytes.add((byte) ((value >>> 56) & 0xFF));
130 protected Pair<byte[], Integer> toBytes() {
132 assert(isCommitted());
134 TByteArrayList raw = new TByteArrayList();
136 writeLE(raw, operations.size());
138 for(ClusterUpdateOperation op : operations) {
140 writeLE(raw, op.data.length);
144 writeLE(raw, op.ccs.statementMask.size());
145 raw.add(op.ccs.statementMask.toArray());
146 writeLE(raw, op.ccs.oldValueEx.size());
147 raw.add(op.ccs.oldValueEx.toArray());
148 writeLE(raw, op.ccs.oldValues.size());
150 for(byte[] oldValue : op.ccs.oldValues) {
151 int len = (oldValue != null ? oldValue.length : -1);
153 if(oldValue != null) {
160 byte[] raw_ = raw.toArray();
161 CompressionCodec codec = Compressions.get(Compressions.LZ4);
162 ByteBuffer input = ByteBuffer.wrap(raw_);
163 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
164 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
166 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
167 final byte[] rawOutput = new byte[compressedSize+4];
168 output.get(rawOutput,0,compressedSize);
169 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
173 return Pair.make(rawOutput, rawOutput.length);
180 for(ClusterUpdateOperation op : operations) {
187 static class StreamDecompressor {
189 // byte[] decompressBuffer = new byte[1024*1024];
191 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
193 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
195 byte[] result = new byte[deflatedSize];
197 // if(decompressBuffer.length < deflatedSize)
198 // decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
200 CompressionCodec codec = Compressions.get(Compressions.LZ4);
202 ByteBuffer input = ByteBuffer.wrap(compressed);
203 ByteBuffer output = ByteBuffer.wrap(result);
205 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
206 assert(decompressedSize == deflatedSize);
215 private static StreamDecompressor decompressor = new StreamDecompressor();
218 public void fromFile(byte[] data_) throws IllegalAcornStateException, AcornAccessVerificationException {
222 byte[] data = decompressor.decompressBuffer(data_);
224 operations = new ArrayList<ClusterUpdateOperation>();
227 int opLen = Bytes.readLE4(data, offset);
230 for(int i=0;i<opLen;i++) {
232 int len = Bytes.readLE4(data, offset);
234 byte[] bytes = new byte[len];
235 System.arraycopy(data, offset, bytes, 0, len);
238 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
240 String ccsKey = getKey() + "." + i;
242 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
245 int statementMaskLen = Bytes.readLE4(data, offset);
247 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
248 for(int j=0;j<statementMaskLen;j++)
249 op.ccs.statementMask.add(data[offset++]);
251 int oldValueExLen = Bytes.readLE4(data, offset);
253 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
254 for(int j=0;j<oldValueExLen;j++)
255 op.ccs.oldValueEx.add(data[offset++]);
257 int oldValuesSize = Bytes.readLE4(data, offset);
260 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
261 for(int j=0;j<oldValuesSize;j++) {
263 int oldValueSize = Bytes.readLE4(data, offset);
266 if(oldValueSize == -1) {
267 op.ccs.oldValues.add(null);
269 byte[] oldValue = new byte[oldValueSize];
270 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
271 offset += oldValueSize;
272 op.ccs.oldValues.add(oldValue);
278 } catch (IOException e) {
279 throw new IllegalAcornStateException(e);
280 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
286 String getExtension() {
291 public String toString() {
292 return "ClusterUpdateOperationChunk " + getKey();
296 protected boolean overwrite() {
301 public Logger getLogger() {