-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.util.ArrayList;\r
-import java.util.Arrays;\r
-import java.util.List;\r
-\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.ClusterTraitsBase;\r
-import org.simantics.db.procore.cluster.ClusterImpl;\r
-import org.simantics.db.procore.cluster.ClusterTraits;\r
-import org.simantics.db.procore.cluster.ClusterTraitsSmall;\r
-import org.simantics.db.service.Bytes;\r
-import org.simantics.db.service.ClusterUID;\r
-\r
-import fi.vtt.simantics.procore.DebugPolicy;\r
-import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum;\r
-import fi.vtt.simantics.procore.internal.ClusterStream.Data;\r
-import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo;\r
-import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum;\r
-import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum;\r
-import gnu.trove.map.hash.TIntByteHashMap;\r
-import gnu.trove.map.hash.TLongIntHashMap;\r
-\r
-public final class ClusterChange {\r
- public static final int VERSION = 1;\r
- public static final byte ADD_OPERATION = 2;\r
- public static final byte REMOVE_OPERATION = 3;\r
- public static final byte DELETE_OPERATION = 5;\r
-\r
- public static final boolean DEBUG = false;\r
- public static final boolean DEBUG_STAT = false;\r
- public static final boolean DEBUG_CCS = false;\r
-\r
- private static DebugInfo sum = new DebugInfo();\r
-\r
- public final TIntByteHashMap foreignTable = new TIntByteHashMap();\r
- private final DebugInfo info;\r
- private final GraphSession graphSession;\r
- public final ClusterUID clusterUID;\r
- private final int SIZE_OFFSET;\r
- private final int HEADER_SIZE;\r
- // How much buffer is used before stream is flushed to server. The bigger the better.\r
- public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);\r
- private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;\r
- private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;\r
- private int nextSize = MAX_FIXED_BYTES;\r
- int byteIndex = 0;\r
- private byte[] bytes = null; // Operation data.\r
- private final byte[] header;\r
- private boolean flushed = false;\r
- private final ClusterStream clusterStream;\r
- private final ClusterChange2 clusterChange2;\r
-\r
- public ClusterImpl clusterImpl;\r
-\r
- public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) {\r
- this.clusterImpl = clusterImpl;\r
- clusterUID = clusterImpl.getClusterUID();\r
- long[] longs = new long[ClusterUID.getLongLength()];\r
- clusterUID.toLong(longs, 0);\r
- this.graphSession = clusterStream.graphSession;\r
- info = new DebugInfo();\r
- HEADER_SIZE = 8 + longs.length * 8;\r
- header = new byte[HEADER_SIZE];\r
- SIZE_OFFSET = 0;\r
- Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.\r
- Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);\r
- for (int i=0, offset=8; i<longs.length; ++i, offset+=8)\r
- Bytes.writeLE(header, offset, longs[i]);\r
- //initBuffer();\r
- this.clusterStream = clusterStream;\r
- this.clusterChange2 = new ClusterChange2(clusterUID, clusterImpl);\r
- clusterStream.changes.add(this);\r
- }\r
-\r
- private void setHeaderVectorSize(int size) {\r
- if (size < 0)\r
- throw new RuntimeDatabaseException("Change set size can't be negative.");\r
- int len = size + HEADER_SIZE - SIZE_OFFSET - 4;\r
- Bytes.writeLE(header, SIZE_OFFSET, len);\r
- }\r
- @Override\r
- public String toString() {\r
- return super.toString() + " cluster=" + clusterImpl.getClusterUID() + " id=" + clusterImpl.getClusterId() + " off=" + byteIndex;\r
- }\r
- private final void initBuffer() {\r
- flushed = false;\r
- if (null == bytes || bytes.length < nextSize) {\r
- bytes = new byte[nextSize];\r
- nextSize = MAX_FIXED_BYTES;\r
- }\r
- byteIndex = 0;\r
- }\r
- private final void clear() {\r
- if(clusterImpl != null && clusterImpl.change != null)\r
- clusterImpl.change.init();\r
- foreignTable.clear();\r
- //initBuffer();\r
- bytes = null;\r
- byteIndex = 0;\r
- if (DEBUG_STAT)\r
- info.clear();\r
- }\r
- private final void checkInitialization() {\r
- if (0 == byteIndex)\r
- clusterStream.changes.addChange(this);\r
- }\r
- private final void printlnd(String s) {\r
- System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);\r
- }\r
- public final void createResource(short index) {\r
- checkInitialization();\r
- if (DEBUG)\r
- printlnd("New ri=" + index + " offset=" + byteIndex);\r
- if (index > ClusterTraits.getMaxNumberOfResources())\r
- throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");\r
- checkBufferSpace(null);\r
- bytes[byteIndex++] = (byte)52;\r
- bytes[byteIndex++] = (byte)index;\r
- bytes[byteIndex++] = (byte)(index>>>8);\r
- }\r
- void flushCollect(Change c) {\r
- flushInternal(graphSession, clusterUID);\r
- if (DEBUG)\r
- printlnd("Cluster change data was flushed.");\r
- if (null != c) {\r
- if (DEBUG)\r
- printlnd("Clearing lookup for " + c.toString());\r
- c.lookup1 = null;\r
- c.lookup2 = null;\r
- }\r
- if (null != clusterImpl) {\r
- clusterImpl.foreignLookup = null;\r
- }\r
- }\r
-\r
- private final boolean checkBufferSpace(Change c) {\r
- clusterStream.changes.checkFlush();\r
- if(bytes == null) initBuffer();\r
- if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {\r
- return false;\r
- }\r
- flushCollect(c);\r
- initBuffer();\r
- return true;\r
- }\r
-\r
- private final void checkBufferSpace(int size) {\r
- if(bytes == null) initBuffer();\r
- if (bytes.length - byteIndex >= size)\r
- return;\r
- nextSize = Math.max(MAX_FIXED_BYTES, size);\r
- flushInternal(graphSession, clusterUID);\r
- initBuffer();\r
- }\r
-\r
- final void addChange(Change c) {\r
- checkInitialization();\r
- checkBufferSpace(c);\r
- byte operation = c.op0;\r
- if(operation == ADD_OPERATION)\r
- addStm(c, StmEnum.Add);\r
- else if (operation == REMOVE_OPERATION)\r
- addStm(c, StmEnum.Remove);\r
- else if (operation == DELETE_OPERATION) {\r
- if (DEBUG)\r
- printlnd("Delete value offset=" + byteIndex + " " + c);\r
- addByte(OpEnum.Delete.getOrMask());\r
- addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));\r
- }\r
- c.lastArg = 0;\r
- }\r
-\r
- private final void addForeignLong(short index, ClusterUID clusterUID) {\r
- byteIndex = clusterUID.toByte(bytes, byteIndex);\r
- bytes[byteIndex++] = (byte)(index & 0xFF);\r
- bytes[byteIndex++] = (byte)(index >>> 8);\r
- }\r
-\r
- private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {\r
- assert(!clusterUID.equals(ClusterUID.Null));\r
- short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);\r
- if (clusterUID.equals(this.clusterUID)) {\r
- bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);\r
- bytes[byteIndex++] = (byte)(resourceIndex >>> 8);\r
- return ClusterEnum.Local;\r
- }\r
-\r
- byte foreign = 0;\r
- if(lookIndex > 0) {\r
- if(lookup != null)\r
- foreign = lookup[lookIndex];\r
- } else {\r
- foreign = foreignTable.get(key);\r
- }\r
- if (0 != foreign) {\r
- if (foreign > 256)\r
- throw new RuntimeDatabaseException("Internal error, contact application support." +\r
- "Too big foreing index=" + foreign + " max=256");\r
- --foreign;\r
- bytes[byteIndex++] = foreign;\r
- return ClusterEnum.ForeignShort;\r
- } else {\r
- byte position = (byte) (foreignTable.size() + 1);\r
- if(lookup != null)\r
- lookup[lookIndex] = position;\r
- foreignTable.put(key, position);\r
- if (DEBUG_STAT)\r
- info.sForeign = foreignTable.size();\r
- if (clusterUID.equals(ClusterUID.Null))\r
- throw new RuntimeDatabaseException("Internal error, contact application support." +\r
- "Cluster unique id not defined for foreing cluster.");\r
- addForeignLong(resourceIndex, clusterUID);\r
- return ClusterEnum.ForeignLong;\r
- }\r
- }\r
-\r
- private final void addByte(byte b) {\r
- bytes[byteIndex++] = b;\r
- }\r
-\r
- private final void addShort(short s) {\r
- bytes[byteIndex++] = (byte)(s & 0xFF);\r
- bytes[byteIndex++] = (byte)(s >>> 8);\r
- }\r
-\r
-// private final void addShort(int s) {\r
-// bytes[byteIndex++] = (byte) (s & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);\r
-// }\r
-\r
- private final void addInt(int i) {\r
-// System.err.println("addInt " + i + " " + i);\r
- bytes[byteIndex++] = (byte) (i & 0xFF);\r
- bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);\r
- // buffer.asIntBuffer().put(i);\r
- // buffer.position(buffer.position()+4);\r
- }\r
-\r
-// private void addLong6(long l) {\r
-//// System.err.println("addLong " + l);\r
-// bytes[byteIndex++] = (byte) (l & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);\r
-// // buffer.asLongBuffer().put(l);\r
-// // buffer.position(buffer.position() + 6);\r
-// }\r
-\r
- private void addLong7(long l) {\r
- bytes[byteIndex++] = (byte) (l & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);\r
- bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);\r
- // buffer.asLongBuffer().put(l);\r
- // buffer.position(buffer.position() + 7);\r
- }\r
-\r
-// private void addLong(long l) {\r
-// bytes[byteIndex++] = (byte) (l & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);\r
-// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);\r
-// }\r
- private final byte bufferPop() {\r
- return bytes[--byteIndex];\r
- }\r
-\r
- final class DebugStm {\r
- StmEnum e;\r
- int r;\r
- int p;\r
- int o;\r
- ClusterUID pc;\r
- ClusterUID oc;\r
-\r
- DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {\r
- this.e = e;\r
- this.r = r;\r
- this.p = p;\r
- this.o = o;\r
- this.pc = pc;\r
- this.oc = oc;\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);\r
- short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);\r
- short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);\r
- return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID\r
- + " pk=" + p + " pi=" + pi + " pc=" + pc\r
- + " ok=" + o + " oi=" + oi + " oc=" + oc;\r
- }\r
-\r
- public String toString2() {\r
- return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p\r
- + " pc=" + pc + " o=" + o + " oc=" + oc;\r
- }\r
-\r
- public String toString3() {\r
- short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);\r
- short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);\r
- short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);\r
- return "" + e + " ri=" + ri\r
- + " pi=" + pi + " pc=" + pc\r
- + " oi=" + oi + " oc=" + oc;\r
- }\r
- }\r
-\r
- private List<DebugStm> debugStms = new ArrayList<DebugStm>();\r
-\r
- @SuppressWarnings("unused")\r
- private final void addStm(Change c, StmEnum stmEnum) {\r
-\r
- if (DEBUG_STAT)\r
- ++info.nStms;\r
- if (DEBUG || DEBUG_CCS) {\r
- DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);\r
- if (DEBUG_CCS)\r
- debugStms.add(d);\r
- if (DEBUG) {\r
- printlnd(d.toString3() + " offset=" + byteIndex);\r
- }\r
- }\r
- // int opPos = buffer.position();\r
- int opPos = byteIndex++;\r
- // buffer.put((byte)0); // operation code\r
- // addByte((byte)0);\r
-\r
- boolean done = true;\r
-\r
- ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);\r
- byte ab = 0;\r
-\r
- // ForeignShort = byte\r
- // Local = short\r
- // ForeignLong = 8 byte\r
- if (a != ClusterEnum.ForeignShort) {\r
- ab = bufferPop();\r
- done = false;\r
- }\r
-\r
- ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);\r
- byte bb = 0;\r
- if (b != ClusterEnum.ForeignShort) {\r
- bb = bufferPop();\r
- done = false;\r
- }\r
-\r
- int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);\r
- if (ClusterTraitsSmall.isIllegalResourceIndex(ri))\r
- throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);\r
- bytes[byteIndex++] = (byte)ri; // index low byte\r
- if(!done) {\r
- Data data = ClusterEnum.getData(stmEnum, a, b);\r
- int left = 6 - data.bits;\r
- int op = ri >>> (8 + left);\r
- ri >>>= 8;\r
- ri &= (1 << left) - 1;\r
- if (a != ClusterEnum.ForeignShort) {\r
- ri |= ab << left;\r
- left += 6;\r
- }\r
- if (b != ClusterEnum.ForeignShort) {\r
- ri |= bb << left;\r
- left += 6;\r
- }\r
- switch (data.bytes) {\r
- default:\r
- throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);\r
- case 2:\r
- bytes[byteIndex++] = (byte)(ri & 0xFF);\r
- bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);\r
- break;\r
- case 1:\r
- bytes[byteIndex++] = (byte)(ri & 0xFF);\r
- break;\r
- case 0:\r
- break;\r
- }\r
- op |= data.mask;\r
- this.bytes[opPos] = (byte)op;\r
- } else {\r
- if (stmEnum == StmEnum.Add)\r
- bytes[opPos] = (byte)((ri >>> 8) + 64);\r
- else\r
- bytes[opPos] = (byte)((ri >>> 8) + 128);\r
- }\r
- if (DEBUG_STAT) {\r
- if (a == ClusterEnum.Local && b == ClusterEnum.Local) {\r
- ++info.nLocal;\r
- } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {\r
- ++info.nPartly;\r
- } else {\r
- ++info.nForeign;\r
- }\r
- }\r
- if (foreignTable.size() > 252)\r
- flushInternal(graphSession, clusterUID);\r
- }\r
-\r
- private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {\r
- if (DEBUG)\r
- printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);\r
- if (ClusterTraitsBase.isIllegalResourceIndex(ri))\r
- throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);\r
- if (value_offset > (1L << 58 - 1))\r
- throw new RuntimeDatabaseException("Illegal value offset="\r
- + value_offset);\r
- if (size < 0 || size > MAX_FIXED_BYTES - 1)\r
- throw new RuntimeDatabaseException("Illegal value size=" + size);\r
- if (offset + size > bytes.length)\r
- throw new RuntimeDatabaseException("Illegal value size=" + size);\r
- checkBufferSpace(12 + size);\r
- addByte(OpEnum.Modify.getOrMask());\r
- ri |= (value_offset >>> 56) << 14; // add top two bits\r
- addShort((short) ri);\r
- value_offset &= (1L << 56) - 1;\r
- addLong7(value_offset);\r
- addShort((short) size);\r
- if (DEBUG)\r
- System.out.println("Modify value fixed part end offset=" + byteIndex);\r
- int copied = Math.min(size, this.bytes.length - byteIndex);\r
- System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);\r
- byteIndex += size;\r
- return copied;\r
- }\r
-\r
- private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {\r
- checkBufferSpace(0);\r
- int current = Math.min(this.bytes.length - byteIndex - 12, left);\r
- if(current >= 0) {\r
- int written = modiValue(ri, voffset, bytes, offset, current);\r
- voffset += written;\r
- offset += written;\r
- left -= written;\r
- }\r
- flushInternal(graphSession, clusterUID);\r
- while (left > 0) {\r
- int length = Math.min(left, (1 << 16) - 1);\r
- if (DEBUG)\r
- printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);\r
- int psize = length + 12;\r
- setHeaderVectorSize(psize);\r
- byte[] message = new byte[psize+HEADER_SIZE];\r
- System.arraycopy(header, 0, message, 0, HEADER_SIZE);\r
- int to = HEADER_SIZE;\r
- Bytes.write(message, to++, OpEnum.Modify.getOrMask());\r
- short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits\r
- Bytes.writeLE(message, to, index); to += 2;\r
- Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;\r
- Bytes.writeLE(message, to, (short)length); to += 2;\r
- System.arraycopy(bytes, offset, message, to, length);\r
- graphSession.updateCluster(new UpdateClusterFunction(message));\r
- voffset += length;\r
- offset += length;\r
- left -= length;\r
- }\r
- }\r
-\r
- private final int setValueBig(int ri, byte[] bytes, int length_) {\r
- checkBufferSpace(12);\r
- int sum = 0;\r
- int voffset = 0;\r
- int offset = 0;\r
- int left = length_;\r
- while (left > 0) {\r
- int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);\r
- if (DEBUG)\r
- printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);\r
- int written = modiValue(ri, voffset, bytes, offset, length);\r
- sum += written;\r
- voffset += written;\r
- offset += written;\r
- left -= written;\r
- checkBufferSpace(12);\r
- }\r
- return sum;\r
- }\r
-\r
- private final int setValueSmall(int ri, byte[] bytes, int length) {\r
- checkBufferSpace(5 + length);\r
- int pos = byteIndex;\r
- int i = length << 14 | ri;\r
- if (length < 32) {\r
- byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);\r
- addByte(op);\r
- short s = (short) i;\r
- addShort(s);\r
- } else {\r
- addByte(OpEnum.Set.getOrMask());\r
- addInt(i);\r
- }\r
- System.arraycopy(bytes, 0, this.bytes, byteIndex, length);\r
- byteIndex += length;\r
- int len = byteIndex - pos;\r
- return len;\r
- }\r
-\r
- final void setValue(short index, byte[] bytes) {\r
- setValue(index, bytes, bytes.length);\r
- }\r
-\r
- final void setValue(short index, byte[] bytes, int length) {\r
- checkInitialization();\r
- if (ClusterTraitsBase.isIllegalResourceIndex(index))\r
- throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);\r
- if (DEBUG)\r
- printlnd("Set value ri=" + index\r
- + " len=" + length\r
- + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));\r
- int len;\r
- /*\r
- * The limit for the cluster stream is (1<18)-1 but this avoids the\r
- * conversion to big cluster.\r
- */\r
- if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)\r
- len = setValueBig(index, bytes, length);\r
- else\r
- len = setValueSmall(index, bytes, length);\r
- if (DEBUG_STAT) {\r
- ++info.nValues;\r
- info.sValues += len + length;\r
- }\r
- }\r
-\r
- final void setValue(Change c, byte[] bytes, int length) {\r
- short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);\r
- setValue(ri, bytes, length);\r
- c.initValue();\r
- }\r
-\r
- final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {\r
- checkInitialization();\r
- int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);\r
- if (DEBUG)\r
- printlnd("Modify value ri=" + ri\r
- + " voff=" + voffset\r
- + " vlen=" + length\r
- + " blen=" + bytes.length\r
- + " boff=" + offset\r
- + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));\r
- modiValueBig(ri, voffset, length, bytes, offset);\r
- c.init();\r
- if (DEBUG_STAT) {\r
- ++info.nValues;\r
- info.sValues += length;\r
- }\r
- }\r
- final void setImmutable(boolean immutable) {\r
- checkInitialization();\r
- clusterChange2.setImmutable(immutable);\r
- }\r
- final void undoValueEx(int resourceIndex) {\r
- checkInitialization();\r
- clusterChange2.undoValueEx(resourceIndex);\r
- }\r
- final void setDeleted(boolean deleted) {\r
- checkInitialization();\r
- clusterChange2.setDeleted(deleted);\r
- }\r
- final void corrupt() {\r
- checkInitialization();\r
- addByte((byte)0);\r
- }\r
- /**\r
- * @param graphSession\r
- * @param clusterId\r
- * @return true if actually flushed something\r
- */\r
- final boolean flush(GraphSession graphSession, ClusterUID clusterUID) {\r
- if (byteIndex > 0) {\r
- if(DebugPolicy.REPORT_CLUSTER_STREAM)\r
- System.err.println("Flush cluster change set stream " + this);\r
- setHeaderVectorSize(byteIndex);\r
- byte[] copy = new byte[byteIndex + HEADER_SIZE];\r
- System.arraycopy(header, 0, copy, 0, HEADER_SIZE);\r
- System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);\r
- UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);\r
- if (DEBUG_CCS) {\r
- for (DebugStm stm : debugStms)\r
- printlnd(stm.toString2());\r
- debugStms.clear();\r
- }\r
- if (DEBUG_STAT) {\r
- info.tot = updateClusterFunction.operation.length;\r
- printlnd("ReallyFlush: " + info.toString());\r
- sum.add(info);\r
- printlnd("ReallyFlush sum: " + sum.toString());\r
- }\r
- // long start = System.nanoTime();\r
- graphSession.updateCluster(updateClusterFunction);\r
- // long duration = System.nanoTime() - start;\r
- // duration2 += duration;\r
- // System.err.println("updateCluster " + 1e-9*duration);\r
- // System.err.println("updateCluster total " + 1e-9*duration2);\r
- clear();\r
- clusterChange2.flush(graphSession);\r
- return true;\r
- } else if (clusterChange2.isDirty()) {\r
- clusterChange2.flush(graphSession);\r
- clear();\r
- return true;\r
- } else if (flushed) {\r
- flushed = false;\r
- return true;\r
- } else {\r
- return true;\r
- }\r
- }\r
-\r
- final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) {\r
- flush(graphSession, clusterUID);\r
- flushed = true;\r
- }\r
-\r
- final class ForeignTable {\r
- private final TLongIntHashMap table = new TLongIntHashMap();\r
-\r
- private long createKey(short index, long cluster) {\r
- assert (cluster <= (1L << 48) - 1);\r
- return (cluster << 14) | index;\r
- }\r
-\r
- public int get(short index, long cluster) {\r
- int value = table.get(createKey(index, cluster));\r
- if (DEBUG)\r
- printlnd("ForeignTable get c=" + clusterUID + " i="\r
- + (value - 1) + " r=" + index + " rc=" + cluster);\r
- return value;\r
- }\r
-\r
- public int put(short index, long cluster, int value) {\r
- if (DEBUG)\r
- printlnd("ForeignTable put c=" + clusterUID + " i="\r
- + (value - 1) + " r=" + index + " rc=" + cluster);\r
- return table.put(createKey(index, cluster), value);\r
- }\r
-\r
- public int size() {\r
- return table.size();\r
- }\r
-\r
- public void clear() {\r
- table.clear();\r
- }\r
- }\r
-\r
- @Override\r
- public int hashCode() {\r
- return 31*clusterImpl.getClusterKey();\r
- }\r
-\r
- @Override\r
- public boolean equals(Object object) {\r
- if (this == object)\r
- return true;\r
- else if (object == null)\r
- return false;\r
- else if (!(object instanceof ClusterChange))\r
- return false;\r
- ClusterChange r = (ClusterChange)object;\r
- return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey();\r
- }\r
-\r
-\r
-}\r
+package fi.vtt.simantics.procore.internal;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.procore.cluster.ClusterImpl;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.procore.cluster.ClusterTraitsSmall;
+import org.simantics.db.service.Bytes;
+import org.simantics.db.service.ClusterUID;
+
+import fi.vtt.simantics.procore.DebugPolicy;
+import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum;
+import fi.vtt.simantics.procore.internal.ClusterStream.Data;
+import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo;
+import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum;
+import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum;
+import gnu.trove.map.hash.TIntByteHashMap;
+import gnu.trove.map.hash.TLongIntHashMap;
+
+public final class ClusterChange {
+ public static final int VERSION = 1;
+ public static final byte ADD_OPERATION = 2;
+ public static final byte REMOVE_OPERATION = 3;
+ public static final byte DELETE_OPERATION = 5;
+
+ public static final boolean DEBUG = false;
+ public static final boolean DEBUG_STAT = false;
+ public static final boolean DEBUG_CCS = false;
+
+ private static DebugInfo sum = new DebugInfo();
+
+ public final TIntByteHashMap foreignTable = new TIntByteHashMap();
+ private final DebugInfo info;
+ private final GraphSession graphSession;
+ public final ClusterUID clusterUID;
+ private final int SIZE_OFFSET;
+ private final int HEADER_SIZE;
+ // How much buffer is used before stream is flushed to server. The bigger the better.
+ public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
+ private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
+ private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
+ private int nextSize = MAX_FIXED_BYTES;
+ int byteIndex = 0;
+ private byte[] bytes = null; // Operation data.
+ private final byte[] header;
+ private boolean flushed = false;
+ private final ClusterStream clusterStream;
+ private final ClusterChange2 clusterChange2;
+
+ public ClusterImpl clusterImpl;
+
+ public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) {
+
+ this.clusterImpl = clusterImpl;
+
+ clusterUID = clusterImpl.getClusterUID();
+ long[] longs = new long[ClusterUID.getLongLength()];
+ clusterUID.toLong(longs, 0);
+ this.graphSession = clusterStream.graphSession;
+ info = new DebugInfo();
+ HEADER_SIZE = 8 + longs.length * 8;
+ header = new byte[HEADER_SIZE];
+ SIZE_OFFSET = 0;
+ Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
+ Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
+ for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
+ Bytes.writeLE(header, offset, longs[i]);
+ //initBuffer();
+ this.clusterStream = clusterStream;
+ this.clusterChange2 = new ClusterChange2(clusterUID);
+ clusterStream.changes.add(this);
+ }
+
+ public void adopt(ClusterImpl impl) {
+ this.clusterImpl = impl;
+ }
+
+ private void setHeaderVectorSize(int size) {
+ if (size < 0)
+ throw new RuntimeDatabaseException("Change set size can't be negative.");
+ int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
+ Bytes.writeLE(header, SIZE_OFFSET, len);
+ }
+ @Override
+ public String toString() {
+ return super.toString() + " cluster=" + clusterUID + " id=" + clusterImpl.getClusterId() + " off=" + byteIndex;
+ }
+ private final void initBuffer() {
+ flushed = false;
+ if (null == bytes || bytes.length < nextSize) {
+ bytes = new byte[nextSize];
+ nextSize = MAX_FIXED_BYTES;
+ }
+ byteIndex = 0;
+ }
+ private final void clear() {
+ if(clusterImpl != null && clusterImpl.change != null)
+ clusterImpl.change.init();
+ foreignTable.clear();
+ //initBuffer();
+ bytes = null;
+ byteIndex = 0;
+ if (DEBUG_STAT)
+ info.clear();
+ }
+ private final void checkInitialization() {
+ if (0 == byteIndex)
+ clusterStream.changes.addChange(this);
+ }
+ private final void printlnd(String s) {
+ System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
+ }
+ public final void createResource(short index) {
+ checkInitialization();
+ if (DEBUG)
+ printlnd("New ri=" + index + " offset=" + byteIndex);
+ if (index > ClusterTraits.getMaxNumberOfResources())
+ throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
+ checkBufferSpace(null);
+ bytes[byteIndex++] = (byte)52;
+ bytes[byteIndex++] = (byte)index;
+ bytes[byteIndex++] = (byte)(index>>>8);
+ }
+ void flushCollect(Change c) {
+ flushInternal(graphSession, clusterUID);
+ if (DEBUG)
+ printlnd("Cluster change data was flushed.");
+ if (null != c) {
+ if (DEBUG)
+ printlnd("Clearing lookup for " + c.toString());
+ c.lookup1 = null;
+ c.lookup2 = null;
+ }
+ if (null != clusterImpl) {
+ clusterImpl.foreignLookup = null;
+ }
+ }
+
+ private final boolean checkBufferSpace(Change c) {
+ clusterStream.changes.checkFlush();
+ if(bytes == null) initBuffer();
+ if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
+ return false;
+ }
+ flushCollect(c);
+ initBuffer();
+ return true;
+ }
+
+ private final void checkBufferSpace(int size) {
+ if(bytes == null) initBuffer();
+ if (bytes.length - byteIndex >= size)
+ return;
+ nextSize = Math.max(MAX_FIXED_BYTES, size);
+ flushInternal(graphSession, clusterUID);
+ initBuffer();
+ }
+
+ final void addChange(Change c) {
+ checkInitialization();
+ checkBufferSpace(c);
+ byte operation = c.op0;
+ if(operation == ADD_OPERATION)
+ addStm(c, StmEnum.Add);
+ else if (operation == REMOVE_OPERATION)
+ addStm(c, StmEnum.Remove);
+ else if (operation == DELETE_OPERATION) {
+ if (DEBUG)
+ printlnd("Delete value offset=" + byteIndex + " " + c);
+ addByte(OpEnum.Delete.getOrMask());
+ addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
+ }
+ c.lastArg = 0;
+ }
+
+ private final void addForeignLong(short index, ClusterUID clusterUID) {
+ byteIndex = clusterUID.toByte(bytes, byteIndex);
+ bytes[byteIndex++] = (byte)(index & 0xFF);
+ bytes[byteIndex++] = (byte)(index >>> 8);
+ }
+
+ private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
+ assert(!clusterUID.equals(ClusterUID.Null));
+ short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
+ if (clusterUID.equals(this.clusterUID)) {
+ bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
+ bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
+ return ClusterEnum.Local;
+ }
+
+ byte foreign = 0;
+ if(lookIndex > 0) {
+ if(lookup != null)
+ foreign = lookup[lookIndex];
+ } else {
+ foreign = foreignTable.get(key);
+ }
+ if (0 != foreign) {
+ if (foreign > 256)
+ throw new RuntimeDatabaseException("Internal error." +
+ "Too big foreign index=" + foreign + " max=256");
+ --foreign;
+ bytes[byteIndex++] = foreign;
+ return ClusterEnum.ForeignShort;
+ } else {
+ byte position = (byte) (foreignTable.size() + 1);
+ if(lookup != null)
+ lookup[lookIndex] = position;
+ foreignTable.put(key, position);
+ if (DEBUG_STAT)
+ info.sForeign = foreignTable.size();
+ if (clusterUID.equals(ClusterUID.Null))
+ throw new RuntimeDatabaseException("Internal error." +
+ "Cluster unique id not defined for foreign cluster.");
+ addForeignLong(resourceIndex, clusterUID);
+ return ClusterEnum.ForeignLong;
+ }
+ }
+
+ private final void addByte(byte b) {
+ bytes[byteIndex++] = b;
+ }
+
+ private final void addShort(short s) {
+ bytes[byteIndex++] = (byte)(s & 0xFF);
+ bytes[byteIndex++] = (byte)(s >>> 8);
+ }
+
+// private final void addShort(int s) {
+// bytes[byteIndex++] = (byte) (s & 0xFF);
+// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
+// }
+
+ private final void addInt(int i) {
+// System.err.println("addInt " + i + " " + i);
+ bytes[byteIndex++] = (byte) (i & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
+ bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
+ // buffer.asIntBuffer().put(i);
+ // buffer.position(buffer.position()+4);
+ }
+
+// private void addLong6(long l) {
+//// System.err.println("addLong " + l);
+// bytes[byteIndex++] = (byte) (l & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+// // buffer.asLongBuffer().put(l);
+// // buffer.position(buffer.position() + 6);
+// }
+
+ private void addLong7(long l) {
+ bytes[byteIndex++] = (byte) (l & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+ bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
+ // buffer.asLongBuffer().put(l);
+ // buffer.position(buffer.position() + 7);
+ }
+
+// private void addLong(long l) {
+// bytes[byteIndex++] = (byte) (l & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
+// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
+// }
+ private final byte bufferPop() {
+ return bytes[--byteIndex];
+ }
+
+ final class DebugStm {
+ StmEnum e;
+ int r;
+ int p;
+ int o;
+ ClusterUID pc;
+ ClusterUID oc;
+
+ DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
+ this.e = e;
+ this.r = r;
+ this.p = p;
+ this.o = o;
+ this.pc = pc;
+ this.oc = oc;
+ }
+
+ @Override
+ public String toString() {
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
+ short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
+ short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
+ return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
+ + " pk=" + p + " pi=" + pi + " pc=" + pc
+ + " ok=" + o + " oi=" + oi + " oc=" + oc;
+ }
+
+ public String toString2() {
+ return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
+ + " pc=" + pc + " o=" + o + " oc=" + oc;
+ }
+
+ public String toString3() {
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
+ short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
+ short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
+ return "" + e + " ri=" + ri
+ + " pi=" + pi + " pc=" + pc
+ + " oi=" + oi + " oc=" + oc;
+ }
+ }
+
+ private List<DebugStm> debugStms = new ArrayList<DebugStm>();
+
+ @SuppressWarnings("unused")
+ private final void addStm(Change c, StmEnum stmEnum) {
+
+ if (DEBUG_STAT)
+ ++info.nStms;
+ if (DEBUG || DEBUG_CCS) {
+ DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
+ if (DEBUG_CCS)
+ debugStms.add(d);
+ if (DEBUG) {
+ printlnd(d.toString3() + " offset=" + byteIndex);
+ }
+ }
+ // int opPos = buffer.position();
+ int opPos = byteIndex++;
+ // buffer.put((byte)0); // operation code
+ // addByte((byte)0);
+
+ boolean done = true;
+
+ ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
+ byte ab = 0;
+
+ // ForeignShort = byte
+ // Local = short
+ // ForeignLong = 8 byte
+ if (a != ClusterEnum.ForeignShort) {
+ ab = bufferPop();
+ done = false;
+ }
+
+ ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
+ byte bb = 0;
+ if (b != ClusterEnum.ForeignShort) {
+ bb = bufferPop();
+ done = false;
+ }
+
+ int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+ if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
+ bytes[byteIndex++] = (byte)ri; // index low byte
+ if(!done) {
+ Data data = ClusterEnum.getData(stmEnum, a, b);
+ int left = 6 - data.bits;
+ int op = ri >>> (8 + left);
+ ri >>>= 8;
+ ri &= (1 << left) - 1;
+ if (a != ClusterEnum.ForeignShort) {
+ ri |= ab << left;
+ left += 6;
+ }
+ if (b != ClusterEnum.ForeignShort) {
+ ri |= bb << left;
+ left += 6;
+ }
+ switch (data.bytes) {
+ default:
+ throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
+ case 2:
+ bytes[byteIndex++] = (byte)(ri & 0xFF);
+ bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
+ break;
+ case 1:
+ bytes[byteIndex++] = (byte)(ri & 0xFF);
+ break;
+ case 0:
+ break;
+ }
+ op |= data.mask;
+ this.bytes[opPos] = (byte)op;
+ } else {
+ if (stmEnum == StmEnum.Add)
+ bytes[opPos] = (byte)((ri >>> 8) + 64);
+ else
+ bytes[opPos] = (byte)((ri >>> 8) + 128);
+ }
+ if (DEBUG_STAT) {
+ if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
+ ++info.nLocal;
+ } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
+ ++info.nPartly;
+ } else {
+ ++info.nForeign;
+ }
+ }
+ if (foreignTable.size() > 252)
+ flushInternal(graphSession, clusterUID);
+ }
+
+ private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
+ if (DEBUG)
+ printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
+ if (ClusterTraitsBase.isIllegalResourceIndex(ri))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
+ if (value_offset > (1L << 58 - 1))
+ throw new RuntimeDatabaseException("Illegal value offset="
+ + value_offset);
+ if (size < 0 || size > MAX_FIXED_BYTES - 1)
+ throw new RuntimeDatabaseException("Illegal value size=" + size);
+ if (offset + size > bytes.length)
+ throw new RuntimeDatabaseException("Illegal value size=" + size);
+ checkBufferSpace(12 + size);
+ addByte(OpEnum.Modify.getOrMask());
+ ri |= (value_offset >>> 56) << 14; // add top two bits
+ addShort((short) ri);
+ value_offset &= (1L << 56) - 1;
+ addLong7(value_offset);
+ addShort((short) size);
+ if (DEBUG)
+ System.out.println("Modify value fixed part end offset=" + byteIndex);
+ int copied = Math.min(size, this.bytes.length - byteIndex);
+ System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
+ byteIndex += size;
+ return copied;
+ }
+
+ private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
+ checkBufferSpace(0);
+ int current = Math.min(this.bytes.length - byteIndex - 12, left);
+ if(current >= 0) {
+ int written = modiValue(ri, voffset, bytes, offset, current);
+ voffset += written;
+ offset += written;
+ left -= written;
+ }
+ flushInternal(graphSession, clusterUID);
+ while (left > 0) {
+ int length = Math.min(left, (1 << 16) - 1);
+ if (DEBUG)
+ printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
+ int psize = length + 12;
+ setHeaderVectorSize(psize);
+ byte[] message = new byte[psize+HEADER_SIZE];
+ System.arraycopy(header, 0, message, 0, HEADER_SIZE);
+ int to = HEADER_SIZE;
+ Bytes.write(message, to++, OpEnum.Modify.getOrMask());
+ short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
+ Bytes.writeLE(message, to, index); to += 2;
+ Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
+ Bytes.writeLE(message, to, (short)length); to += 2;
+ System.arraycopy(bytes, offset, message, to, length);
+ graphSession.updateCluster(new UpdateClusterFunction(message));
+ voffset += length;
+ offset += length;
+ left -= length;
+ }
+ }
+
+ private final int setValueBig(int ri, byte[] bytes, int length_) {
+ checkBufferSpace(12);
+ int sum = 0;
+ int voffset = 0;
+ int offset = 0;
+ int left = length_;
+ while (left > 0) {
+ int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
+ if (DEBUG)
+ printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
+ int written = modiValue(ri, voffset, bytes, offset, length);
+ sum += written;
+ voffset += written;
+ offset += written;
+ left -= written;
+ checkBufferSpace(12);
+ }
+ // Possibly truncate file
+ modiValue(ri, length_, new byte[0], 0, 0);
+ return sum;
+ }
+
+ private final int setValueSmall(int ri, byte[] bytes, int length) {
+ checkBufferSpace(5 + length);
+ int pos = byteIndex;
+ int i = length << 14 | ri;
+ if (length < 32) {
+ byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
+ addByte(op);
+ short s = (short) i;
+ addShort(s);
+ } else {
+ addByte(OpEnum.Set.getOrMask());
+ addInt(i);
+ }
+ System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
+ byteIndex += length;
+ int len = byteIndex - pos;
+ return len;
+ }
+
+ final void setValue(short index, byte[] bytes) {
+ setValue(index, bytes, bytes.length);
+ }
+
+ final void setValue(short index, byte[] bytes, int length) {
+ checkInitialization();
+ if (ClusterTraitsBase.isIllegalResourceIndex(index))
+ throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
+ if (DEBUG)
+ printlnd("Set value ri=" + index
+ + " len=" + length
+ + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
+ int len;
+ /*
+ * The limit for the cluster stream is (1<18)-1 but this avoids the
+ * conversion to big cluster.
+ */
+ if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
+ len = setValueBig(index, bytes, length);
+ else
+ len = setValueSmall(index, bytes, length);
+ if (DEBUG_STAT) {
+ ++info.nValues;
+ info.sValues += len + length;
+ }
+ }
+
+ final void setValue(Change c, byte[] bytes, int length) {
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+ setValue(ri, bytes, length);
+ c.initValue();
+ }
+
+ final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
+ checkInitialization();
+ int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
+ if (DEBUG)
+ printlnd("Modify value ri=" + ri
+ + " voff=" + voffset
+ + " vlen=" + length
+ + " blen=" + bytes.length
+ + " boff=" + offset
+ + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
+ modiValueBig(ri, voffset, length, bytes, offset);
+ c.init();
+ if (DEBUG_STAT) {
+ ++info.nValues;
+ info.sValues += length;
+ }
+ }
+ final void setImmutable(boolean immutable) {
+ checkInitialization();
+ clusterChange2.setImmutable(immutable);
+ }
+ final void undoValueEx(int resourceIndex) {
+ checkInitialization();
+ clusterChange2.undoValueEx(resourceIndex);
+ }
+ final void setDeleted(boolean deleted) {
+ checkInitialization();
+ clusterChange2.setDeleted(deleted);
+ }
+ final void corrupt() {
+ checkInitialization();
+ addByte((byte)0);
+ }
+ /**
+ * @param graphSession
+ * @param clusterId
+ * @return true if actually flushed something
+ */
+ final boolean flush(GraphSession graphSession, ClusterUID clusterUID) {
+ if (byteIndex > 0) {
+ if(DebugPolicy.REPORT_CLUSTER_STREAM)
+ System.err.println("Flush cluster change set stream " + this);
+ setHeaderVectorSize(byteIndex);
+ byte[] copy = new byte[byteIndex + HEADER_SIZE];
+ System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
+ System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
+ UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
+ if (DEBUG_CCS) {
+ for (DebugStm stm : debugStms)
+ printlnd(stm.toString2());
+ debugStms.clear();
+ }
+ if (DEBUG_STAT) {
+ info.tot = updateClusterFunction.operation.length;
+ printlnd("ReallyFlush: " + info.toString());
+ sum.add(info);
+ printlnd("ReallyFlush sum: " + sum.toString());
+ }
+ // long start = System.nanoTime();
+ graphSession.updateCluster(updateClusterFunction);
+ // long duration = System.nanoTime() - start;
+ // duration2 += duration;
+ // System.err.println("updateCluster " + 1e-9*duration);
+ // System.err.println("updateCluster total " + 1e-9*duration2);
+ clear();
+ clusterChange2.flush(graphSession);
+ return true;
+ } else if (clusterChange2.isDirty()) {
+ clusterChange2.flush(graphSession);
+ clear();
+ return true;
+ } else if (flushed) {
+ flushed = false;
+ return true;
+ } else {
+ return true;
+ }
+ }
+
+ final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) {
+ flush(graphSession, clusterUID);
+ flushed = true;
+ }
+
+ final class ForeignTable {
+ private final TLongIntHashMap table = new TLongIntHashMap();
+
+ private long createKey(short index, long cluster) {
+ assert (cluster <= (1L << 48) - 1);
+ return (cluster << 14) | index;
+ }
+
+ public int get(short index, long cluster) {
+ int value = table.get(createKey(index, cluster));
+ if (DEBUG)
+ printlnd("ForeignTable get c=" + clusterUID + " i="
+ + (value - 1) + " r=" + index + " rc=" + cluster);
+ return value;
+ }
+
+ public int put(short index, long cluster, int value) {
+ if (DEBUG)
+ printlnd("ForeignTable put c=" + clusterUID + " i="
+ + (value - 1) + " r=" + index + " rc=" + cluster);
+ return table.put(createKey(index, cluster), value);
+ }
+
+ public int size() {
+ return table.size();
+ }
+
+ public void clear() {
+ table.clear();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31*clusterImpl.getClusterKey();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object)
+ return true;
+ else if (object == null)
+ return false;
+ else if (!(object instanceof ClusterChange))
+ return false;
+ ClusterChange r = (ClusterChange)object;
+ return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey();
+ }
+
+
+}