package org.simantics.acorn.lru; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import org.simantics.acorn.ClusterManager; import org.simantics.acorn.Persistable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.UndoClusterUpdateProcessor; import org.simantics.compressions.CompressionCodec; import org.simantics.compressions.Compressions; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.Bytes; import org.simantics.utils.datastructures.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gnu.trove.list.array.TByteArrayList; public class ClusterStreamChunk extends LRUObject implements Persistable { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStreamChunk.class); // 500KB is a fine chunk private static int MAX_CHUNK_SIZE = 500*1024; int size = 0; final private ClusterManager manager; private boolean committed = false; public int nextToProcess = 0; public ArrayList operations = new ArrayList(); // Stub public ClusterStreamChunk(ClusterManager manager, LRU LRU, Path readDir, String id, int offset, int length) throws AcornAccessVerificationException { super(LRU, id, readDir, "clusterStream", offset, length, false, false); this.manager = manager; LRU.map(this); } // Creation public ClusterStreamChunk(ClusterManager manager, LRU LRU, String id) throws AcornAccessVerificationException { super(LRU, id, LRU.getDirectory(), "clusterStream", true, true); this.manager = manager; LRU.insert(this, accessTime); } public UndoClusterUpdateProcessor getUndoProcessor(ClusterManager clusters, int chunkOffset, String ccsId) throws DatabaseException { if(VERIFY) verifyAccess(); makeResident(true); ClusterUpdateOperation op = operations.get(chunkOffset); if(op == null) throw new IllegalAcornStateException("Cluster Update Operation " + ccsId + " was not found."); if(op.ccs == null) throw new IllegalAcornStateException("Cluster ChangeSet " + ccsId + " was not found."); UndoClusterUpdateProcessor proc = new UndoClusterUpdateProcessor(clusters, this, op.ccs); // This cluster and CCS can still be under preparation => wait clusters.clusterLRU.ensureUpdates(proc.getClusterUID()); proc.process(); cancelForceResident(); return proc; } public void addOperation(ClusterUpdateOperation op) throws IllegalAcornStateException { if(committed) throw new IllegalAcornStateException("Cannot add operation " + op + " to " + this + " if commited == true"); operations.add(op); size += op.data.length; // if(isCommitted()) { // LRU.refresh(this); // } } public byte[] getOperation(int index) { return operations.get(index).data; } public void commit() { committed = true; } public boolean isCommitted() { if(size > MAX_CHUNK_SIZE) committed = true; return committed; } @Override public boolean canBePersisted() throws AcornAccessVerificationException { if(!super.canBePersisted()) return false; if(!isCommitted()) return false; for(ClusterUpdateOperation op : operations) { if(!op.finished) return false; } return true; } private static void writeLE(TByteArrayList bytes, int value) { bytes.add( (byte) (value & 0xFF)); bytes.add((byte) ((value >>> 8) & 0xFF)); bytes.add((byte) ((value >>> 16) & 0xFF)); bytes.add((byte) ((value >>> 24) & 0xFF)); } final public static void writeLE8(TByteArrayList bytes, long value) { bytes.add( (byte) (value & 0xFF)); bytes.add((byte) ((value >>> 8) & 0xFF)); bytes.add((byte) ((value >>> 16) & 0xFF)); bytes.add((byte) ((value >>> 24) & 0xFF)); bytes.add((byte) ((value >>> 32) & 0xFF)); bytes.add((byte) ((value >>> 40) & 0xFF)); bytes.add((byte) ((value >>> 48) & 0xFF)); bytes.add((byte) ((value >>> 56) & 0xFF)); } @Override protected Pair toBytes() { assert(isCommitted()); TByteArrayList raw = new TByteArrayList(); writeLE(raw, operations.size()); for(ClusterUpdateOperation op : operations) { writeLE(raw, op.data.length); raw.add(op.data); op.data = null; writeLE(raw, op.ccs.statementMask.size()); raw.add(op.ccs.statementMask.toArray()); writeLE(raw, op.ccs.oldValueEx.size()); raw.add(op.ccs.oldValueEx.toArray()); writeLE(raw, op.ccs.oldValues.size()); for(byte[] oldValue : op.ccs.oldValues) { int len = (oldValue != null ? oldValue.length : -1); writeLE(raw, len); if(oldValue != null) { raw.add(oldValue); } } } byte[] raw_ = raw.toArray(); CompressionCodec codec = Compressions.get(Compressions.LZ4); ByteBuffer input = ByteBuffer.wrap(raw_); ByteBuffer output = ByteBuffer.allocate(raw_.length + raw_.length/8); int compressedSize = codec.compressBuffer(input, 0, raw_.length, output, 0); // We append inflated size - cannot prepend since decompression cannot handle offsets in input final byte[] rawOutput = new byte[compressedSize+4]; output.get(rawOutput,0,compressedSize); Bytes.writeLE(rawOutput, compressedSize, raw_.length); release(); return Pair.make(rawOutput, rawOutput.length); } @Override void release() { for(ClusterUpdateOperation op : operations) { op.data = null; op.ccs = null; } } static class StreamDecompressor { // byte[] decompressBuffer = new byte[1024*1024]; public synchronized byte[] decompressBuffer(byte[] compressed) throws IOException { int deflatedSize = Bytes.readLE4(compressed, compressed.length-4); byte[] result = new byte[deflatedSize]; // if(decompressBuffer.length < deflatedSize) // decompressBuffer = new byte[Math.max(3*decompressBuffer.length / 2, deflatedSize)]; CompressionCodec codec = Compressions.get(Compressions.LZ4); ByteBuffer input = ByteBuffer.wrap(compressed); ByteBuffer output = ByteBuffer.wrap(result); int decompressedSize = codec.decompressBuffer(input, 0, compressed.length-4, output, 0, result.length); assert(decompressedSize == deflatedSize); return result; } } private static StreamDecompressor decompressor = new StreamDecompressor(); @Override public void fromFile(byte[] data_) throws IllegalAcornStateException, AcornAccessVerificationException { try { byte[] data = decompressor.decompressBuffer(data_); operations = new ArrayList(); int offset = 0; int opLen = Bytes.readLE4(data, offset); offset += 4; for(int i=0;i(oldValuesSize); for(int j=0;j