1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.nio.file.Path;
6 import java.util.ArrayList;
8 import org.simantics.acorn.ClusterManager;
9 import org.simantics.acorn.Persistable;
10 import org.simantics.acorn.exception.AcornAccessVerificationException;
11 import org.simantics.acorn.exception.IllegalAcornStateException;
12 import org.simantics.acorn.internal.ClusterChange;
13 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
14 import org.simantics.compressions.CompressionCodec;
15 import org.simantics.compressions.Compressions;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.service.Bytes;
18 import org.simantics.utils.datastructures.Pair;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import gnu.trove.list.array.TByteArrayList;
24 public class ClusterStreamChunk extends LRUObject<String, ClusterStreamChunk> implements Persistable {
26 private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStreamChunk.class);
27 // 500KB is a fine chunk
28 private static int MAX_CHUNK_SIZE = 500*1024;
31 final private ClusterManager manager;
32 private boolean committed = false;
34 public int nextToProcess = 0;
36 public ArrayList<ClusterUpdateOperation> operations = new ArrayList<ClusterUpdateOperation>();
39 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, Path readDir, String id, int offset, int length) throws AcornAccessVerificationException {
40 super(LRU, manager.getFileCache(), id, readDir, "clusterStream", offset, length, false, false);
41 this.manager = manager;
46 public ClusterStreamChunk(ClusterManager manager, LRU<String, ClusterStreamChunk> LRU, String id) throws AcornAccessVerificationException {
47 super(LRU, manager.getFileCache(), id, LRU.getDirectory(), "clusterStream", true, true);
48 this.manager = manager;
49 LRU.insert(this, accessTime);
52 public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException {
54 if(VERIFY) verifyAccess();
58 ClusterUpdateOperation op = operations.get(chunkOffset);
59 if(op == null) throw new IllegalAcornStateException("Cluster Update Operation " + ccsId + " was not found.");
60 if(op.ccs == null) throw new IllegalAcornStateException("Cluster ChangeSet " + ccsId + " was not found.");
62 UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs);
63 if(proc.version != ClusterChange.VERSION)
66 // This cluster and CCS can still be under preparation => wait
67 clusters.clusterLRU.ensureUpdates(proc.getClusterUID());
71 cancelForceResident();
77 public void addOperation(ClusterUpdateOperation op) throws IllegalAcornStateException {
79 throw new IllegalAcornStateException("Cannot add operation " + op + " to " + this + " if commited == true");
81 size += op.data.length;
82 // if(isCommitted()) {
87 public byte[] getOperation(int index) {
88 return operations.get(index).data;
91 public void commit() {
95 public boolean isCommitted() {
96 if(size > MAX_CHUNK_SIZE) committed = true;
101 public boolean canBePersisted() throws AcornAccessVerificationException {
102 if(!super.canBePersisted()) return false;
103 if(!isCommitted()) return false;
104 for(ClusterUpdateOperation op : operations) {
105 if(!op.finished) return false;
110 private static void writeLE(TByteArrayList bytes, int value) {
112 bytes.add( (byte) (value & 0xFF));
113 bytes.add((byte) ((value >>> 8) & 0xFF));
114 bytes.add((byte) ((value >>> 16) & 0xFF));
115 bytes.add((byte) ((value >>> 24) & 0xFF));
119 final public static void writeLE8(TByteArrayList bytes, long value) {
121 bytes.add( (byte) (value & 0xFF));
122 bytes.add((byte) ((value >>> 8) & 0xFF));
123 bytes.add((byte) ((value >>> 16) & 0xFF));
124 bytes.add((byte) ((value >>> 24) & 0xFF));
125 bytes.add((byte) ((value >>> 32) & 0xFF));
126 bytes.add((byte) ((value >>> 40) & 0xFF));
127 bytes.add((byte) ((value >>> 48) & 0xFF));
128 bytes.add((byte) ((value >>> 56) & 0xFF));
133 protected Pair<byte[], Integer> toBytes() {
135 assert(isCommitted());
137 TByteArrayList raw = new TByteArrayList();
139 writeLE(raw, operations.size());
141 for(ClusterUpdateOperation op : operations) {
143 writeLE(raw, op.data.length);
147 writeLE(raw, op.ccs.statementMask.size());
148 raw.add(op.ccs.statementMask.toArray());
149 writeLE(raw, op.ccs.oldValueEx.size());
150 raw.add(op.ccs.oldValueEx.toArray());
151 writeLE(raw, op.ccs.oldValues.size());
153 for(byte[] oldValue : op.ccs.oldValues) {
154 int len = (oldValue != null ? oldValue.length : -1);
156 if(oldValue != null) {
163 byte[] raw_ = raw.toArray();
164 CompressionCodec codec = Compressions.get(Compressions.LZ4);
165 ByteBuffer input = ByteBuffer.wrap(raw_);
166 ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8);
167 int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0);
169 // We append inflated size - cannot prepend since decompression cannot handle offsets in input
170 final byte[] rawOutput = new byte[compressedSize+4];
171 output.get(rawOutput,0,compressedSize);
172 Bytes.writeLE(rawOutput, compressedSize, raw_.length);
176 return Pair.make(rawOutput, rawOutput.length);
183 for(ClusterUpdateOperation op : operations) {
190 static class StreamDecompressor {
192 // byte[] decompressBuffer = new byte[1024*1024];
194 public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException {
196 int deflatedSize = Bytes.readLE4(compressed, compressed.length-4);
198 byte[] result = new byte[deflatedSize];
200 // if(decompressBuffer.length < deflatedSize)
201 // decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)];
203 CompressionCodec codec = Compressions.get(Compressions.LZ4);
205 ByteBuffer input = ByteBuffer.wrap(compressed);
206 ByteBuffer output = ByteBuffer.wrap(result);
208 int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length);
209 assert(decompressedSize == deflatedSize);
218 private static StreamDecompressor decompressor = new StreamDecompressor();
221 public void fromFile(byte[] data_) throws IllegalAcornStateException, AcornAccessVerificationException {
225 byte[] data = decompressor.decompressBuffer(data_);
227 operations = new ArrayList<ClusterUpdateOperation>();
230 int opLen = Bytes.readLE4(data, offset);
233 for(int i=0;i<opLen;i++) {
235 int len = Bytes.readLE4(data, offset);
237 byte[] bytes = new byte[len];
238 System.arraycopy(data, offset, bytes, 0, len);
241 ClusterUpdateOperation op = new ClusterUpdateOperation(manager, bytes);
243 String ccsKey = getKey() + "." + i;
245 op.ccs = new ClusterChangeSet(ccsKey, op.uid);
248 int statementMaskLen = Bytes.readLE4(data, offset);
250 op.ccs.statementMask = new TByteArrayList(statementMaskLen);
251 for(int j=0;j<statementMaskLen;j++)
252 op.ccs.statementMask.add(data[offset++]);
254 int oldValueExLen = Bytes.readLE4(data, offset);
256 op.ccs.oldValueEx = new TByteArrayList(oldValueExLen);
257 for(int j=0;j<oldValueExLen;j++)
258 op.ccs.oldValueEx.add(data[offset++]);
260 int oldValuesSize = Bytes.readLE4(data, offset);
263 op.ccs.oldValues = new ArrayList<>(oldValuesSize);
264 for(int j=0;j<oldValuesSize;j++) {
266 int oldValueSize = Bytes.readLE4(data, offset);
269 if(oldValueSize == -1) {
270 op.ccs.oldValues.add(null);
272 byte[] oldValue = new byte[oldValueSize];
273 System.arraycopy(data, offset, oldValue, 0, oldValueSize);
274 offset += oldValueSize;
275 op.ccs.oldValues.add(oldValue);
281 } catch (IOException e) {
282 throw new IllegalAcornStateException(e);
283 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
289 String getExtension() {
294 public String toString() {
295 return "ClusterUpdateOperationChunk " + getKey();
299 protected boolean overwrite() {
304 public Logger getLogger() {