--- /dev/null
+package org.simantics.acorn.internal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.simantics.acorn.internal.ClusterStream.ClusterEnum;
+import org.simantics.acorn.internal.ClusterStream.Data;
+import org.simantics.acorn.internal.ClusterStream.DebugInfo;
+import org.simantics.acorn.internal.ClusterStream.OpEnum;
+import org.simantics.acorn.internal.ClusterStream.StmEnum;
+import org.simantics.compressions.Compressions;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.procore.cluster.ClusterTraitsSmall;
+import org.simantics.db.service.Bytes;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Pair;
+
+import gnu.trove.map.hash.TIntByteHashMap;
+import gnu.trove.map.hash.TLongIntHashMap;
+
+
+public final class ClusterChange {
+
+ public static final int VERSION = 1;
+ public static final byte ADD_OPERATION = 2;
+ public static final byte REMOVE_OPERATION = 3;
+ public static final byte DELETE_OPERATION = 5;
+
+ public static final boolean DEBUG = false;
+ public static final boolean DEBUG_STAT = false;
+ public static final boolean DEBUG_CCS = false;
+
+ private static DebugInfo sum = new DebugInfo();
+
+ public final TIntByteHashMap foreignTable = new TIntByteHashMap();
+ private final DebugInfo info;
+// private final GraphSession graphSession;
+ public final ClusterUID clusterUID;
+ private final int SIZE_OFFSET;
+// private final int HEADER_SIZE;
+ // How much buffer is used before stream is flushed to server. The bigger the better.
+ public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
+ private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
+ private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
+ private int nextSize = MAX_FIXED_BYTES;
+ int byteIndex = 0;
+ private byte[] bytes = null; // Operation data.
+// private final byte[] header;
+ private boolean flushed = false;
+ private ArrayList<Pair<ClusterUID, byte[]>> stream;
+
+// public ClusterImpl clusterImpl;
+
+ public ClusterChange( ArrayList<Pair<ClusterUID, byte[]>> stream, ClusterUID clusterUID) {
+ this.clusterUID = clusterUID;
+ long[] longs = new long[ClusterUID.getLongLength()];
+ clusterUID.toLong(longs, 0);
+ this.stream = stream;
+// this.graphSession = clusterStream.graphSession;
+ info = new DebugInfo();
+// HEADER_SIZE = 8 + longs.length * 8;
+// header = new byte[HEADER_SIZE];
+ SIZE_OFFSET = 0;
+// Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
+// Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
+// for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
+// Bytes.writeLE(header, offset, longs[i]);
+ //initBuffer();
+// this.clusterStream = clusterStream;
+// this.clusterChange2 = new ClusterChange2(clusterUID, clusterImpl);
+// clusterStream.changes.add(this);
+ }
+
+// private void setHeaderVectorSize(int size) {
+// if (size < 0)
+// throw new RuntimeDatabaseException("Change set size can't be negative.");
+// int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
+// Bytes.writeLE(header, SIZE_OFFSET, len);
+// }
+ @Override
+ public String toString() {
+ return super.toString() + " cluster=" + clusterUID + " off=" + byteIndex;
+ }
+ final public void initBuffer() {
+ flushed = false;
+ if (null == bytes || bytes.length < nextSize) {
+ bytes = new byte[nextSize];
+ nextSize = MAX_FIXED_BYTES;
+ }
+ byteIndex = 0;
+ }
+ private final void clear() {
+// if(clusterImpl != null && clusterImpl.change != null)
+// clusterImpl.change.init();
+ foreignTable.clear();
+ //initBuffer();
+ bytes = null;
+ byteIndex = 0;
+ if (DEBUG_STAT)
+ info.clear();
+ }
+ private final void checkInitialization() {
+// if (0 == byteIndex)
+// clusterStream.changes.addChange(this);
+ }
+ private final void printlnd(String s) {
+ System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
+ }
+ public final void createResource(short index) {
+ checkInitialization();
+ if (DEBUG)
+ printlnd("New ri=" + index + " offset=" + byteIndex);
+ if (index > ClusterTraits.getMaxNumberOfResources())
+ throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
+ checkBufferSpace(null);
+ bytes[byteIndex++] = (byte)52;
+ bytes[byteIndex++] = (byte)index;
+ bytes[byteIndex++] = (byte)(index>>>8);
+ }
+ void flushCollect(Change c) {
+ throw new UnsupportedOperationException();
+// flushInternal(graphSession, clusterUID);
+// if (DEBUG)
+// printlnd("Cluster change data was flushed.");
+// if (null != c) {
+// if (DEBUG)
+// printlnd("Clearing lookup for " + c.toString());
+// c.lookup1 = null;
+// c.lookup2 = null;
+// }
+// if (null != clusterImpl) {
+// clusterImpl.foreignLookup = null;
+// }
+ }
+
+ private final boolean checkBufferSpace(Change c) {
+// clusterStream.changes.checkFlush();
+ if(bytes == null) initBuffer();
+ if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
+ return false;
+ }
+ flush();
+// initBuffer();
+ return true;
+ }
+
+ private final void checkBufferSpace(int size) {
+ if(bytes == null) initBuffer();
+ if (bytes.length - byteIndex >= size)
+ return;
+ nextSize = Math.max(MAX_FIXED_BYTES, size);
+ flush();
+ initBuffer();
+ }
+
+ public final void addChange(Change c) {
+ checkInitialization();
+ checkBufferSpace(c);
+ byte operation = c.op0;
+ if(operation == ADD_OPERATION)
+ addStm(c, StmEnum.Add);
+ else if (operation == REMOVE_OPERATION)
+ addStm(c, StmEnum.Remove);
+ else if (operation == DELETE_OPERATION) {
+ if (DEBUG)
+ printlnd("Delete value offset=" + byteIndex + " " + c);
+ addByte(OpEnum.Delete.getOrMask());
+ addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
+ }
+ c.lastArg = 0;
+ }
+
+ private final void addForeignLong(short index, ClusterUID clusterUID) {
+ byteIndex = clusterUID.toByte(bytes, byteIndex);
+ bytes[byteIndex++] = (byte)(index & 0xFF);
+ bytes[byteIndex++] = (byte)(index >>> 8);
+ }
+
+ private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
+ assert(!clusterUID.equals(ClusterUID.Null));
+ short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
+ if (clusterUID.equals(this.clusterUID)) {
+ bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
+ bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
+ return ClusterEnum.Local;
+ }
+
+ byte foreign = 0;
+ if(lookIndex > 0) {
+ if(lookup != null)
+ foreign = lookup[lookIndex];
+ } else {
+ foreign = foreignTable.get(key);
+ }
+ if (0 != foreign) {
+ if (foreign > 256)
+ throw new RuntimeDatabaseException("Internal error, contact application support." +
+ "Too big foreing index=" + foreign + " max=256");
+ --foreign;
+ bytes[byteIndex++] = foreign;
+ return ClusterEnum.ForeignShort;
+ } else {
+ byte position = (byte) (foreignTable.size() + 1);
+ if(lookup != null)
+ lookup[lookIndex] = position;
+ foreignTable.put(key, position);
+ if (DEBUG_STAT)
+ info.sForeign = foreignTable.size();
+ if (clusterUID.equals(ClusterUID.Null))
+ throw new RuntimeDatabaseException("Internal error, contact application support." +
+ "Cluster unique id not defined for foreing cluster.");
+ addForeignLong(resourceIndex, clusterUID);
+ return ClusterEnum.ForeignLong;
+ }
+ }
+
+ private final void addByte(byte b) {
+ bytes[byteIndex++] = b;
+ }
+
+ private final void addShort(short s) {
+ bytes[byteIndex++] = (byte)(s & 0xFF);
+ bytes[byteIndex++] = (byte)(s >>> 8);
+ }
+
+// private final void addShort(int s) {
+// bytes[byteIndex++] = (byte) (s & 0xFF);
+// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
+// }
+
+ private final void addInt(int i) {
+// System.err.println("addInt " + i + " " + i);
+ bytes[byteIndex++] = (byte) (i & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
+ // buffer.asIntBuffer().put(i);
+ // buffer.position(buffer.position()+4);
+ }
+
+// private void addLong6(long l) {
+//// System.err.println("addLong " + l);
+// bytes[byteIndex++] = (byte) (l & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+// // buffer.asLongBuffer().put(l);
+// // buffer.position(buffer.position() + 6);
+// }
+
+ private void addLong7(long l) {
+ bytes[byteIndex++] = (byte) (l & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
+ // buffer.asLongBuffer().put(l);
+ // buffer.position(buffer.position() + 7);
+ }
+
+// private void addLong(long l) {
+// bytes[byteIndex++] = (byte) (l & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
+// }
+ private final byte bufferPop() {
+ return bytes[--byteIndex];
+ }
+
+ final class DebugStm {
+ StmEnum e;
+ int r;
+ int p;
+ int o;
+ ClusterUID pc;
+ ClusterUID oc;
+
+ DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
+ this.e = e;
+ this.r = r;
+ this.p = p;
+ this.o = o;
+ this.pc = pc;
+ this.oc = oc;
+ }
+
+ @Override
+ public String toString() {
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
+ short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
+ short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
+ return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
+ + " pk=" + p + " pi=" + pi + " pc=" + pc
+ + " ok=" + o + " oi=" + oi + " oc=" + oc;
+ }
+
+ public String toString2() {
+ return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
+ + " pc=" + pc + " o=" + o + " oc=" + oc;
+ }
+
+ public String toString3() {
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
+ short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
+ short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
+ return "" + e + " ri=" + ri
+ + " pi=" + pi + " pc=" + pc
+ + " oi=" + oi + " oc=" + oc;
+ }
+ }
+
+ private List<DebugStm> debugStms = new ArrayList<DebugStm>();
+
+ @SuppressWarnings("unused")
+ private final void addStm(Change c, StmEnum stmEnum) {
+
+ if (DEBUG_STAT)
+ ++info.nStms;
+ if (DEBUG || DEBUG_CCS) {
+ DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
+ if (DEBUG_CCS)
+ debugStms.add(d);
+ if (DEBUG) {
+ printlnd(d.toString3() + " offset=" + byteIndex);
+ }
+ }
+ // int opPos = buffer.position();
+ int opPos = byteIndex++;
+ // buffer.put((byte)0); // operation code
+ // addByte((byte)0);
+
+ boolean done = true;
+
+ ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
+ byte ab = 0;
+
+ // ForeignShort = byte
+ // Local = short
+ // ForeignLong = 8 byte
+ if (a != ClusterEnum.ForeignShort) {
+ ab = bufferPop();
+ done = false;
+ }
+
+ ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
+ byte bb = 0;
+ if (b != ClusterEnum.ForeignShort) {
+ bb = bufferPop();
+ done = false;
+ }
+
+ int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+ if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
+ bytes[byteIndex++] = (byte)ri; // index low byte
+ if(!done) {
+ Data data = ClusterEnum.getData(stmEnum, a, b);
+ int left = 6 - data.bits;
+ int op = ri >>> (8 + left);
+ ri >>>= 8;
+ ri &= (1 << left) - 1;
+ if (a != ClusterEnum.ForeignShort) {
+ ri |= ab << left;
+ left += 6;
+ }
+ if (b != ClusterEnum.ForeignShort) {
+ ri |= bb << left;
+ left += 6;
+ }
+ switch (data.bytes) {
+ default:
+ throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
+ case 2:
+ bytes[byteIndex++] = (byte)(ri & 0xFF);
+ bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
+ break;
+ case 1:
+ bytes[byteIndex++] = (byte)(ri & 0xFF);
+ break;
+ case 0:
+ break;
+ }
+ op |= data.mask;
+ this.bytes[opPos] = (byte)op;
+ } else {
+ if (stmEnum == StmEnum.Add)
+ bytes[opPos] = (byte)((ri >>> 8) + 64);
+ else
+ bytes[opPos] = (byte)((ri >>> 8) + 128);
+ }
+ if (DEBUG_STAT) {
+ if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
+ ++info.nLocal;
+ } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
+ ++info.nPartly;
+ } else {
+ ++info.nForeign;
+ }
+ }
+ if (foreignTable.size() > 252)
+ flush();
+// throw new UnsupportedOperationException();
+ //flushInternal(graphSession, clusterUID);
+ }
+
+ private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
+ if (DEBUG)
+ printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
+ if (ClusterTraitsBase.isIllegalResourceIndex(ri))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
+ if (value_offset > (1L << 58 - 1))
+ throw new RuntimeDatabaseException("Illegal value offset="
+ + value_offset);
+ if (size < 0 || size > MAX_FIXED_BYTES - 1)
+ throw new RuntimeDatabaseException("Illegal value size=" + size);
+ if (offset + size > bytes.length)
+ throw new RuntimeDatabaseException("Illegal value size=" + size);
+ checkBufferSpace(12 + size);
+ addByte(OpEnum.Modify.getOrMask());
+ ri |= (value_offset >>> 56) << 14; // add top two bits
+ addShort((short) ri);
+ value_offset &= (1L << 56) - 1;
+ addLong7(value_offset);
+ addShort((short) size);
+ if (DEBUG)
+ System.out.println("Modify value fixed part end offset=" + byteIndex);
+ int copied = Math.min(size, this.bytes.length - byteIndex);
+ System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
+ byteIndex += size;
+ return copied;
+ }
+
+// private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
+// checkBufferSpace(0);
+// int current = Math.min(this.bytes.length - byteIndex - 12, left);
+// if(current >= 0) {
+// int written = modiValue(ri, voffset, bytes, offset, current);
+// voffset += written;
+// offset += written;
+// left -= written;
+// }
+//// flushInternal(graphSession, clusterUID);
+// while (left > 0) {
+// int length = Math.min(left, (1 << 16) - 1);
+// if (DEBUG)
+// printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
+// int psize = length + 12;
+//// setHeaderVectorSize(psize);
+// byte[] message = new byte[psize/*+HEADER_SIZE*/];
+//// System.arraycopy(header, 0, message, 0, HEADER_SIZE);
+// int to = 0;
+// Bytes.write(message, to++, OpEnum.Modify.getOrMask());
+// short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
+// Bytes.writeLE(message, to, index); to += 2;
+// Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
+// Bytes.writeLE(message, to, (short)length); to += 2;
+// System.arraycopy(bytes, offset, message, to, length);
+//// graphSession.updateCluster(new UpdateClusterFunction(message));
+// voffset += length;
+// offset += length;
+// left -= length;
+// }
+// }
+
+ private final int setValueBig(int ri, byte[] bytes, int length_) {
+ checkBufferSpace(12);
+ int sum = 0;
+ int voffset = 0;
+ int offset = 0;
+ int left = length_;
+ while (left > 0) {
+ int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
+ if (DEBUG)
+ printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
+ int written = modiValue(ri, voffset, bytes, offset, length);
+ sum += written;
+ voffset += written;
+ offset += written;
+ left -= written;
+ checkBufferSpace(12);
+ }
+ return sum;
+ }
+
+ private final int setValueSmall(int ri, byte[] bytes, int length) {
+ checkBufferSpace(5 + length);
+ int pos = byteIndex;
+ int i = length << 14 | ri;
+ if (length < 32) {
+ byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
+ addByte(op);
+ short s = (short) i;
+ addShort(s);
+ } else {
+ addByte(OpEnum.Set.getOrMask());
+ addInt(i);
+ }
+ System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
+ byteIndex += length;
+ int len = byteIndex - pos;
+ return len;
+ }
+
+ final void setValue(short index, byte[] bytes) {
+ setValue(index, bytes, bytes.length);
+ }
+
+ final public void setValue(short index, byte[] bytes, int length) {
+ checkInitialization();
+ if (ClusterTraitsBase.isIllegalResourceIndex(index))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
+ if (DEBUG)
+ printlnd("Set value ri=" + index
+ + " len=" + length
+ + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
+ int len;
+ /*
+ * The limit for the cluster stream is (1<18)-1 but this avoids the
+ * conversion to big cluster.
+ */
+ if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
+ len = setValueBig(index, bytes, length);
+ else
+ len = setValueSmall(index, bytes, length);
+ if (DEBUG_STAT) {
+ ++info.nValues;
+ info.sValues += len + length;
+ }
+ }
+
+// final void setValue(Change c, byte[] bytes, int length) {
+// short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+// setValue(ri, bytes, length);
+// c.initValue();
+// }
+
+// final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
+// checkInitialization();
+// int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+// if (DEBUG)
+// printlnd("Modify value ri=" + ri
+// + " voff=" + voffset
+// + " vlen=" + length
+// + " blen=" + bytes.length
+// + " boff=" + offset
+// + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
+// modiValueBig(ri, voffset, length, bytes, offset);
+// c.init();
+// if (DEBUG_STAT) {
+// ++info.nValues;
+// info.sValues += length;
+// }
+// }
+ final void setImmutable(boolean immutable) {
+ checkInitialization();
+// clusterChange2.setImmutable(immutable);
+ }
+ final void undoValueEx(int resourceIndex) {
+ checkInitialization();
+// clusterChange2.undoValueEx(resourceIndex);
+ }
+ final void setDeleted(boolean deleted) {
+ checkInitialization();
+// clusterChange2.setDeleted(deleted);
+ }
+ final void corrupt() {
+ checkInitialization();
+ addByte((byte)0);
+ }
+
+ public byte[] getBytes() {
+ byte[] copy = new byte[byteIndex];
+ System.arraycopy(bytes, 0, copy, 0, byteIndex);
+ return copy;
+ }
+
+ /**
+ * @param graphSession
+ * @param clusterId
+ * @return true if actually flushed something
+ */
+ final boolean flush(/*GraphSession graphSession,*/ ClusterUID clusterUID) {
+ throw new UnsupportedOperationException();
+// if (byteIndex > 0) {
+// if(DebugPolicy.REPORT_CLUSTER_STREAM)
+// System.err.println("Flush cluster change set stream " + this);
+// setHeaderVectorSize(byteIndex);
+// byte[] copy = new byte[byteIndex + HEADER_SIZE];
+// System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
+// System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
+// UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
+// if (DEBUG_CCS) {
+// for (DebugStm stm : debugStms)
+// printlnd(stm.toString2());
+// debugStms.clear();
+// }
+// if (DEBUG_STAT) {
+// info.tot = updateClusterFunction.operation.length;
+// printlnd("ReallyFlush: " + info.toString());
+// sum.add(info);
+// printlnd("ReallyFlush sum: " + sum.toString());
+// }
+// // long start = System.nanoTime();
+// graphSession.updateCluster(updateClusterFunction);
+// // long duration = System.nanoTime() - start;
+// // duration2 += duration;
+// // System.err.println("updateCluster " + 1e-9*duration);
+// // System.err.println("updateCluster total " + 1e-9*duration2);
+// clear();
+// clusterChange2.flush(graphSession);
+// return true;
+// } else if (clusterChange2.isDirty()) {
+// clusterChange2.flush(graphSession);
+// clear();
+// return true;
+// } else if (flushed) {
+// flushed = false;
+// return true;
+// } else {
+// return true;
+// }
+ }
+
+ final void flushInternal(ClusterUID clusterUID) {
+ throw new UnsupportedOperationException();
+// flush(graphSession, clusterUID);
+// flushed = true;
+ }
+
+ final class ForeignTable {
+ private final TLongIntHashMap table = new TLongIntHashMap();
+
+ private long createKey(short index, long cluster) {
+ assert (cluster <= (1L << 48) - 1);
+ return (cluster << 14) | index;
+ }
+
+ public int get(short index, long cluster) {
+ int value = table.get(createKey(index, cluster));
+ if (DEBUG)
+ printlnd("ForeignTable get c=" + clusterUID + " i="
+ + (value - 1) + " r=" + index + " rc=" + cluster);
+ return value;
+ }
+
+ public int put(short index, long cluster, int value) {
+ if (DEBUG)
+ printlnd("ForeignTable put c=" + clusterUID + " i="
+ + (value - 1) + " r=" + index + " rc=" + cluster);
+ return table.put(createKey(index, cluster), value);
+ }
+
+ public int size() {
+ return table.size();
+ }
+
+ public void clear() {
+ table.clear();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31*clusterUID.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object)
+ return true;
+ else if (object == null)
+ return false;
+ else if (!(object instanceof ClusterChange))
+ return false;
+ ClusterChange r = (ClusterChange)object;
+ return r.clusterUID.equals(clusterUID);
+ }
+
+ public void flush() {
+
+ if(byteIndex > 0) {
+
+ final ClusterUID cuid = clusterUID;
+
+ byte[] block = getBytes();
+ byte[] raw = new byte[block.length + 28];
+ Bytes.writeLE(raw, 0, 1);
+ System.arraycopy(cuid.asBytes(), 0, raw, 4, 16);
+ Bytes.writeLE(raw, 20, block.length);
+ System.arraycopy(block, 0, raw, 24, block.length);
+ Bytes.writeLE(raw, 24+block.length, 0);
+
+ ByteBuffer rawBB = ByteBuffer.wrap(raw);
+ ByteBuffer outputBB = ByteBuffer.allocate(raw.length + raw.length/8);
+ //outputBB.order(ByteOrder.LITTLE_ENDIAN);
+ int compressedSize = Compressions.get(Compressions.LZ4).compressBuffer(rawBB, 0, raw.length, outputBB, 0);
+
+ byte[] data_ = null;
+ if(compressedSize < raw.length) {
+ data_ = new byte[compressedSize];
+ outputBB.get(data_,0,compressedSize);
+ } else {
+ data_ = raw;
+ }
+
+ byte[] data = new byte[data_.length+24];
+ Bytes.writeLE(data, 0, 0);
+ Bytes.writeLE(data, 4, 0);
+ Bytes.writeLE(data, 8, raw.length);
+ Bytes.writeLE(data, 12, raw.length);
+ Bytes.writeLE(data, 16, data_.length);
+ System.arraycopy(data_, 0, data, 20, data_.length);
+ Bytes.writeLE(data, 20+data_.length, 0);
+
+ stream.add(Pair.make(clusterUID, data));
+ clear();
+ initBuffer();
+
+ }
+ }
+
+}