X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2Finternal%2FClusterChange.java;fp=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2Finternal%2FClusterChange.java;h=b1fbb5d9cd66ebc0674fa0c5bf276bc38384bd21;hp=0000000000000000000000000000000000000000;hb=a0687ce02bac73aad9e0d7ddc85625016604f0db;hpb=be5eb160a811a04ecd6364ebb58f24e8218d3f9c diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/internal/ClusterChange.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/internal/ClusterChange.java new file mode 100644 index 000000000..b1fbb5d9c --- /dev/null +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/internal/ClusterChange.java @@ -0,0 +1,735 @@ +package org.simantics.acorn.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.simantics.acorn.internal.ClusterStream.ClusterEnum; +import org.simantics.acorn.internal.ClusterStream.Data; +import org.simantics.acorn.internal.ClusterStream.DebugInfo; +import org.simantics.acorn.internal.ClusterStream.OpEnum; +import org.simantics.acorn.internal.ClusterStream.StmEnum; +import org.simantics.compressions.Compressions; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.ClusterTraitsBase; +import org.simantics.db.procore.cluster.ClusterTraits; +import org.simantics.db.procore.cluster.ClusterTraitsSmall; +import org.simantics.db.service.Bytes; +import org.simantics.db.service.ClusterUID; +import org.simantics.utils.datastructures.Pair; + +import gnu.trove.map.hash.TIntByteHashMap; +import gnu.trove.map.hash.TLongIntHashMap; + + +public final class ClusterChange { + + public static final int VERSION = 1; + public static final byte ADD_OPERATION = 2; + public static final byte REMOVE_OPERATION = 3; + public static final byte DELETE_OPERATION = 5; + + public static final boolean DEBUG = false; + public static final boolean DEBUG_STAT = false; + public static final boolean DEBUG_CCS = false; + + private static DebugInfo sum = new DebugInfo(); + + public final TIntByteHashMap foreignTable = new TIntByteHashMap(); + private final DebugInfo info; +// private final GraphSession graphSession; + public final ClusterUID clusterUID; + private final int SIZE_OFFSET; +// private final int HEADER_SIZE; + // How much buffer is used before stream is flushed to server. The bigger the better. + public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14); + private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16; + private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36; + private int nextSize = MAX_FIXED_BYTES; + int byteIndex = 0; + private byte[] bytes = null; // Operation data. +// private final byte[] header; + private boolean flushed = false; + private ArrayList> stream; + +// public ClusterImpl clusterImpl; + + public ClusterChange( ArrayList> stream, ClusterUID clusterUID) { + this.clusterUID = clusterUID; + long[] longs = new long[ClusterUID.getLongLength()]; + clusterUID.toLong(longs, 0); + this.stream = stream; +// this.graphSession = clusterStream.graphSession; + info = new DebugInfo(); +// HEADER_SIZE = 8 + longs.length * 8; +// header = new byte[HEADER_SIZE]; + SIZE_OFFSET = 0; +// Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later. +// Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION); +// for (int i=0, offset=8; i>>8); + } + void flushCollect(Change c) { + throw new UnsupportedOperationException(); +// flushInternal(graphSession, clusterUID); +// if (DEBUG) +// printlnd("Cluster change data was flushed."); +// if (null != c) { +// if (DEBUG) +// printlnd("Clearing lookup for " + c.toString()); +// c.lookup1 = null; +// c.lookup2 = null; +// } +// if (null != clusterImpl) { +// clusterImpl.foreignLookup = null; +// } + } + + private final boolean checkBufferSpace(Change c) { +// clusterStream.changes.checkFlush(); + if(bytes == null) initBuffer(); + if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) { + return false; + } + flush(); +// initBuffer(); + return true; + } + + private final void checkBufferSpace(int size) { + if(bytes == null) initBuffer(); + if (bytes.length - byteIndex >= size) + return; + nextSize = Math.max(MAX_FIXED_BYTES, size); + flush(); + initBuffer(); + } + + public final void addChange(Change c) { + checkInitialization(); + checkBufferSpace(c); + byte operation = c.op0; + if(operation == ADD_OPERATION) + addStm(c, StmEnum.Add); + else if (operation == REMOVE_OPERATION) + addStm(c, StmEnum.Remove); + else if (operation == DELETE_OPERATION) { + if (DEBUG) + printlnd("Delete value offset=" + byteIndex + " " + c); + addByte(OpEnum.Delete.getOrMask()); + addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0)); + } + c.lastArg = 0; + } + + private final void addForeignLong(short index, ClusterUID clusterUID) { + byteIndex = clusterUID.toByte(bytes, byteIndex); + bytes[byteIndex++] = (byte)(index & 0xFF); + bytes[byteIndex++] = (byte)(index >>> 8); + } + + private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) { + assert(!clusterUID.equals(ClusterUID.Null)); + short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key); + if (clusterUID.equals(this.clusterUID)) { + bytes[byteIndex++] = (byte)(resourceIndex & 0xFF); + bytes[byteIndex++] = (byte)(resourceIndex >>> 8); + return ClusterEnum.Local; + } + + byte foreign = 0; + if(lookIndex > 0) { + if(lookup != null) + foreign = lookup[lookIndex]; + } else { + foreign = foreignTable.get(key); + } + if (0 != foreign) { + if (foreign > 256) + throw new RuntimeDatabaseException("Internal error, contact application support." + + "Too big foreing index=" + foreign + " max=256"); + --foreign; + bytes[byteIndex++] = foreign; + return ClusterEnum.ForeignShort; + } else { + byte position = (byte) (foreignTable.size() + 1); + if(lookup != null) + lookup[lookIndex] = position; + foreignTable.put(key, position); + if (DEBUG_STAT) + info.sForeign = foreignTable.size(); + if (clusterUID.equals(ClusterUID.Null)) + throw new RuntimeDatabaseException("Internal error, contact application support." + + "Cluster unique id not defined for foreing cluster."); + addForeignLong(resourceIndex, clusterUID); + return ClusterEnum.ForeignLong; + } + } + + private final void addByte(byte b) { + bytes[byteIndex++] = b; + } + + private final void addShort(short s) { + bytes[byteIndex++] = (byte)(s & 0xFF); + bytes[byteIndex++] = (byte)(s >>> 8); + } + +// private final void addShort(int s) { +// bytes[byteIndex++] = (byte) (s & 0xFF); +// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF); +// } + + private final void addInt(int i) { +// System.err.println("addInt " + i + " " + i); + bytes[byteIndex++] = (byte) (i & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF); + // buffer.asIntBuffer().put(i); + // buffer.position(buffer.position()+4); + } + +// private void addLong6(long l) { +//// System.err.println("addLong " + l); +// bytes[byteIndex++] = (byte) (l & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); +// // buffer.asLongBuffer().put(l); +// // buffer.position(buffer.position() + 6); +// } + + private void addLong7(long l) { + bytes[byteIndex++] = (byte) (l & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); + // buffer.asLongBuffer().put(l); + // buffer.position(buffer.position() + 7); + } + +// private void addLong(long l) { +// bytes[byteIndex++] = (byte) (l & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF); +// } + private final byte bufferPop() { + return bytes[--byteIndex]; + } + + final class DebugStm { + StmEnum e; + int r; + int p; + int o; + ClusterUID pc; + ClusterUID oc; + + DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) { + this.e = e; + this.r = r; + this.p = p; + this.o = o; + this.pc = pc; + this.oc = oc; + } + + @Override + public String toString() { + short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); + short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); + short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); + return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID + + " pk=" + p + " pi=" + pi + " pc=" + pc + + " ok=" + o + " oi=" + oi + " oc=" + oc; + } + + public String toString2() { + return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p + + " pc=" + pc + " o=" + o + " oc=" + oc; + } + + public String toString3() { + short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); + short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); + short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); + return "" + e + " ri=" + ri + + " pi=" + pi + " pc=" + pc + + " oi=" + oi + " oc=" + oc; + } + } + + private List debugStms = new ArrayList(); + + @SuppressWarnings("unused") + private final void addStm(Change c, StmEnum stmEnum) { + + if (DEBUG_STAT) + ++info.nStms; + if (DEBUG || DEBUG_CCS) { + DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2); + if (DEBUG_CCS) + debugStms.add(d); + if (DEBUG) { + printlnd(d.toString3() + " offset=" + byteIndex); + } + } + // int opPos = buffer.position(); + int opPos = byteIndex++; + // buffer.put((byte)0); // operation code + // addByte((byte)0); + + boolean done = true; + + ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1); + byte ab = 0; + + // ForeignShort = byte + // Local = short + // ForeignLong = 8 byte + if (a != ClusterEnum.ForeignShort) { + ab = bufferPop(); + done = false; + } + + ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2); + byte bb = 0; + if (b != ClusterEnum.ForeignShort) { + bb = bufferPop(); + done = false; + } + + int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); + if (ClusterTraitsSmall.isIllegalResourceIndex(ri)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); + bytes[byteIndex++] = (byte)ri; // index low byte + if(!done) { + Data data = ClusterEnum.getData(stmEnum, a, b); + int left = 6 - data.bits; + int op = ri >>> (8 + left); + ri >>>= 8; + ri &= (1 << left) - 1; + if (a != ClusterEnum.ForeignShort) { + ri |= ab << left; + left += 6; + } + if (b != ClusterEnum.ForeignShort) { + ri |= bb << left; + left += 6; + } + switch (data.bytes) { + default: + throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes); + case 2: + bytes[byteIndex++] = (byte)(ri & 0xFF); + bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF); + break; + case 1: + bytes[byteIndex++] = (byte)(ri & 0xFF); + break; + case 0: + break; + } + op |= data.mask; + this.bytes[opPos] = (byte)op; + } else { + if (stmEnum == StmEnum.Add) + bytes[opPos] = (byte)((ri >>> 8) + 64); + else + bytes[opPos] = (byte)((ri >>> 8) + 128); + } + if (DEBUG_STAT) { + if (a == ClusterEnum.Local && b == ClusterEnum.Local) { + ++info.nLocal; + } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) { + ++info.nPartly; + } else { + ++info.nForeign; + } + } + if (foreignTable.size() > 252) + flush(); +// throw new UnsupportedOperationException(); + //flushInternal(graphSession, clusterUID); + } + + private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) { + if (DEBUG) + printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length); + if (ClusterTraitsBase.isIllegalResourceIndex(ri)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); + if (value_offset > (1L << 58 - 1)) + throw new RuntimeDatabaseException("Illegal value offset=" + + value_offset); + if (size < 0 || size > MAX_FIXED_BYTES - 1) + throw new RuntimeDatabaseException("Illegal value size=" + size); + if (offset + size > bytes.length) + throw new RuntimeDatabaseException("Illegal value size=" + size); + checkBufferSpace(12 + size); + addByte(OpEnum.Modify.getOrMask()); + ri |= (value_offset >>> 56) << 14; // add top two bits + addShort((short) ri); + value_offset &= (1L << 56) - 1; + addLong7(value_offset); + addShort((short) size); + if (DEBUG) + System.out.println("Modify value fixed part end offset=" + byteIndex); + int copied = Math.min(size, this.bytes.length - byteIndex); + System.arraycopy(bytes, offset, this.bytes, byteIndex, copied); + byteIndex += size; + return copied; + } + +// private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) { +// checkBufferSpace(0); +// int current = Math.min(this.bytes.length - byteIndex - 12, left); +// if(current >= 0) { +// int written = modiValue(ri, voffset, bytes, offset, current); +// voffset += written; +// offset += written; +// left -= written; +// } +//// flushInternal(graphSession, clusterUID); +// while (left > 0) { +// int length = Math.min(left, (1 << 16) - 1); +// if (DEBUG) +// printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length); +// int psize = length + 12; +//// setHeaderVectorSize(psize); +// byte[] message = new byte[psize/*+HEADER_SIZE*/]; +//// System.arraycopy(header, 0, message, 0, HEADER_SIZE); +// int to = 0; +// Bytes.write(message, to++, OpEnum.Modify.getOrMask()); +// short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits +// Bytes.writeLE(message, to, index); to += 2; +// Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7; +// Bytes.writeLE(message, to, (short)length); to += 2; +// System.arraycopy(bytes, offset, message, to, length); +//// graphSession.updateCluster(new UpdateClusterFunction(message)); +// voffset += length; +// offset += length; +// left -= length; +// } +// } + + private final int setValueBig(int ri, byte[] bytes, int length_) { + checkBufferSpace(12); + int sum = 0; + int voffset = 0; + int offset = 0; + int left = length_; + while (left > 0) { + int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex); + if (DEBUG) + printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length); + int written = modiValue(ri, voffset, bytes, offset, length); + sum += written; + voffset += written; + offset += written; + left -= written; + checkBufferSpace(12); + } + return sum; + } + + private final int setValueSmall(int ri, byte[] bytes, int length) { + checkBufferSpace(5 + length); + int pos = byteIndex; + int i = length << 14 | ri; + if (length < 32) { + byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2); + addByte(op); + short s = (short) i; + addShort(s); + } else { + addByte(OpEnum.Set.getOrMask()); + addInt(i); + } + System.arraycopy(bytes, 0, this.bytes, byteIndex, length); + byteIndex += length; + int len = byteIndex - pos; + return len; + } + + final void setValue(short index, byte[] bytes) { + setValue(index, bytes, bytes.length); + } + + final public void setValue(short index, byte[] bytes, int length) { + checkInitialization(); + if (ClusterTraitsBase.isIllegalResourceIndex(index)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index); + if (DEBUG) + printlnd("Set value ri=" + index + + " len=" + length + + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length)))); + int len; + /* + * The limit for the cluster stream is (1<18)-1 but this avoids the + * conversion to big cluster. + */ + if (length > ClusterTraitsSmall.VALUE_SIZE_MAX) + len = setValueBig(index, bytes, length); + else + len = setValueSmall(index, bytes, length); + if (DEBUG_STAT) { + ++info.nValues; + info.sValues += len + length; + } + } + +// final void setValue(Change c, byte[] bytes, int length) { +// short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); +// setValue(ri, bytes, length); +// c.initValue(); +// } + +// final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) { +// checkInitialization(); +// int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); +// if (DEBUG) +// printlnd("Modify value ri=" + ri +// + " voff=" + voffset +// + " vlen=" + length +// + " blen=" + bytes.length +// + " boff=" + offset +// + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length)))); +// modiValueBig(ri, voffset, length, bytes, offset); +// c.init(); +// if (DEBUG_STAT) { +// ++info.nValues; +// info.sValues += length; +// } +// } + final void setImmutable(boolean immutable) { + checkInitialization(); +// clusterChange2.setImmutable(immutable); + } + final void undoValueEx(int resourceIndex) { + checkInitialization(); +// clusterChange2.undoValueEx(resourceIndex); + } + final void setDeleted(boolean deleted) { + checkInitialization(); +// clusterChange2.setDeleted(deleted); + } + final void corrupt() { + checkInitialization(); + addByte((byte)0); + } + + public byte[] getBytes() { + byte[] copy = new byte[byteIndex]; + System.arraycopy(bytes, 0, copy, 0, byteIndex); + return copy; + } + + /** + * @param graphSession + * @param clusterId + * @return true if actually flushed something + */ + final boolean flush(/*GraphSession graphSession,*/ ClusterUID clusterUID) { + throw new UnsupportedOperationException(); +// if (byteIndex > 0) { +// if(DebugPolicy.REPORT_CLUSTER_STREAM) +// System.err.println("Flush cluster change set stream " + this); +// setHeaderVectorSize(byteIndex); +// byte[] copy = new byte[byteIndex + HEADER_SIZE]; +// System.arraycopy(header, 0, copy, 0, HEADER_SIZE); +// System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex); +// UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy); +// if (DEBUG_CCS) { +// for (DebugStm stm : debugStms) +// printlnd(stm.toString2()); +// debugStms.clear(); +// } +// if (DEBUG_STAT) { +// info.tot = updateClusterFunction.operation.length; +// printlnd("ReallyFlush: " + info.toString()); +// sum.add(info); +// printlnd("ReallyFlush sum: " + sum.toString()); +// } +// // long start = System.nanoTime(); +// graphSession.updateCluster(updateClusterFunction); +// // long duration = System.nanoTime() - start; +// // duration2 += duration; +// // System.err.println("updateCluster " + 1e-9*duration); +// // System.err.println("updateCluster total " + 1e-9*duration2); +// clear(); +// clusterChange2.flush(graphSession); +// return true; +// } else if (clusterChange2.isDirty()) { +// clusterChange2.flush(graphSession); +// clear(); +// return true; +// } else if (flushed) { +// flushed = false; +// return true; +// } else { +// return true; +// } + } + + final void flushInternal(ClusterUID clusterUID) { + throw new UnsupportedOperationException(); +// flush(graphSession, clusterUID); +// flushed = true; + } + + final class ForeignTable { + private final TLongIntHashMap table = new TLongIntHashMap(); + + private long createKey(short index, long cluster) { + assert (cluster <= (1L << 48) - 1); + return (cluster << 14) | index; + } + + public int get(short index, long cluster) { + int value = table.get(createKey(index, cluster)); + if (DEBUG) + printlnd("ForeignTable get c=" + clusterUID + " i=" + + (value - 1) + " r=" + index + " rc=" + cluster); + return value; + } + + public int put(short index, long cluster, int value) { + if (DEBUG) + printlnd("ForeignTable put c=" + clusterUID + " i=" + + (value - 1) + " r=" + index + " rc=" + cluster); + return table.put(createKey(index, cluster), value); + } + + public int size() { + return table.size(); + } + + public void clear() { + table.clear(); + } + } + + @Override + public int hashCode() { + return 31*clusterUID.hashCode(); + } + + @Override + public boolean equals(Object object) { + if (this == object) + return true; + else if (object == null) + return false; + else if (!(object instanceof ClusterChange)) + return false; + ClusterChange r = (ClusterChange)object; + return r.clusterUID.equals(clusterUID); + } + + public void flush() { + + if(byteIndex > 0) { + + final ClusterUID cuid = clusterUID; + + byte[] block = getBytes(); + byte[] raw = new byte[block.length + 28]; + Bytes.writeLE(raw, 0, 1); + System.arraycopy(cuid.asBytes(), 0, raw, 4, 16); + Bytes.writeLE(raw, 20, block.length); + System.arraycopy(block, 0, raw, 24, block.length); + Bytes.writeLE(raw, 24+block.length, 0); + + ByteBuffer rawBB = ByteBuffer.wrap(raw); + ByteBuffer outputBB = ByteBuffer.allocate(raw.length + raw.length/8); + //outputBB.order(ByteOrder.LITTLE_ENDIAN); + int compressedSize = Compressions.get(Compressions.LZ4).compressBuffer(rawBB, 0, raw.length, outputBB, 0); + + byte[] data_ = null; + if(compressedSize < raw.length) { + data_ = new byte[compressedSize]; + outputBB.get(data_,0,compressedSize); + } else { + data_ = raw; + } + + byte[] data = new byte[data_.length+24]; + Bytes.writeLE(data, 0, 0); + Bytes.writeLE(data, 4, 0); + Bytes.writeLE(data, 8, raw.length); + Bytes.writeLE(data, 12, raw.length); + Bytes.writeLE(data, 16, data_.length); + System.arraycopy(data_, 0, data, 20, data_.length); + Bytes.writeLE(data, 20+data_.length, 0); + + stream.add(Pair.make(clusterUID, data)); + clear(); + initBuffer(); + + } + } + +}