X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FClusterChange.java;h=0b17631341b275059d1241ff3ac28413575527b4;hp=4914014a14cf644a0cb9a48bdb10135bbcff8168;hb=fdf0dbac10faf2b717cc0f7a7bd9462b371e208a;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterChange.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterChange.java index 4914014a1..0b1763134 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterChange.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterChange.java @@ -1,679 +1,687 @@ -package fi.vtt.simantics.procore.internal; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.simantics.db.exception.RuntimeDatabaseException; -import org.simantics.db.impl.ClusterTraitsBase; -import org.simantics.db.procore.cluster.ClusterImpl; -import org.simantics.db.procore.cluster.ClusterTraits; -import org.simantics.db.procore.cluster.ClusterTraitsSmall; -import org.simantics.db.service.Bytes; -import org.simantics.db.service.ClusterUID; - -import fi.vtt.simantics.procore.DebugPolicy; -import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum; -import fi.vtt.simantics.procore.internal.ClusterStream.Data; -import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo; -import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum; -import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum; -import gnu.trove.map.hash.TIntByteHashMap; -import gnu.trove.map.hash.TLongIntHashMap; - -public final class ClusterChange { - public static final int VERSION = 1; - public static final byte ADD_OPERATION = 2; - public static final byte REMOVE_OPERATION = 3; - public static final byte DELETE_OPERATION = 5; - - public static final boolean DEBUG = false; - public static final boolean DEBUG_STAT = false; - public static final boolean DEBUG_CCS = false; - - private static DebugInfo sum = new DebugInfo(); - - public final TIntByteHashMap foreignTable = new TIntByteHashMap(); - private final DebugInfo info; - private final GraphSession graphSession; - public final ClusterUID clusterUID; - private final int SIZE_OFFSET; - private final int HEADER_SIZE; - // How much buffer is used before stream is flushed to server. The bigger the better. - public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14); - private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16; - private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36; - private int nextSize = MAX_FIXED_BYTES; - int byteIndex = 0; - private byte[] bytes = null; // Operation data. - private final byte[] header; - private boolean flushed = false; - private final ClusterStream clusterStream; - private final ClusterChange2 clusterChange2; - - public ClusterImpl clusterImpl; - - public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) { - this.clusterImpl = clusterImpl; - clusterUID = clusterImpl.getClusterUID(); - long[] longs = new long[ClusterUID.getLongLength()]; - clusterUID.toLong(longs, 0); - this.graphSession = clusterStream.graphSession; - info = new DebugInfo(); - HEADER_SIZE = 8 + longs.length * 8; - header = new byte[HEADER_SIZE]; - SIZE_OFFSET = 0; - Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later. - Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION); - for (int i=0, offset=8; i>>8); - } - void flushCollect(Change c) { - flushInternal(graphSession, clusterUID); - if (DEBUG) - printlnd("Cluster change data was flushed."); - if (null != c) { - if (DEBUG) - printlnd("Clearing lookup for " + c.toString()); - c.lookup1 = null; - c.lookup2 = null; - } - if (null != clusterImpl) { - clusterImpl.foreignLookup = null; - } - } - - private final boolean checkBufferSpace(Change c) { - clusterStream.changes.checkFlush(); - if(bytes == null) initBuffer(); - if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) { - return false; - } - flushCollect(c); - initBuffer(); - return true; - } - - private final void checkBufferSpace(int size) { - if(bytes == null) initBuffer(); - if (bytes.length - byteIndex >= size) - return; - nextSize = Math.max(MAX_FIXED_BYTES, size); - flushInternal(graphSession, clusterUID); - initBuffer(); - } - - final void addChange(Change c) { - checkInitialization(); - checkBufferSpace(c); - byte operation = c.op0; - if(operation == ADD_OPERATION) - addStm(c, StmEnum.Add); - else if (operation == REMOVE_OPERATION) - addStm(c, StmEnum.Remove); - else if (operation == DELETE_OPERATION) { - if (DEBUG) - printlnd("Delete value offset=" + byteIndex + " " + c); - addByte(OpEnum.Delete.getOrMask()); - addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0)); - } - c.lastArg = 0; - } - - private final void addForeignLong(short index, ClusterUID clusterUID) { - byteIndex = clusterUID.toByte(bytes, byteIndex); - bytes[byteIndex++] = (byte)(index & 0xFF); - bytes[byteIndex++] = (byte)(index >>> 8); - } - - private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) { - assert(!clusterUID.equals(ClusterUID.Null)); - short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key); - if (clusterUID.equals(this.clusterUID)) { - bytes[byteIndex++] = (byte)(resourceIndex & 0xFF); - bytes[byteIndex++] = (byte)(resourceIndex >>> 8); - return ClusterEnum.Local; - } - - byte foreign = 0; - if(lookIndex > 0) { - if(lookup != null) - foreign = lookup[lookIndex]; - } else { - foreign = foreignTable.get(key); - } - if (0 != foreign) { - if (foreign > 256) - throw new RuntimeDatabaseException("Internal error, contact application support." + - "Too big foreing index=" + foreign + " max=256"); - --foreign; - bytes[byteIndex++] = foreign; - return ClusterEnum.ForeignShort; - } else { - byte position = (byte) (foreignTable.size() + 1); - if(lookup != null) - lookup[lookIndex] = position; - foreignTable.put(key, position); - if (DEBUG_STAT) - info.sForeign = foreignTable.size(); - if (clusterUID.equals(ClusterUID.Null)) - throw new RuntimeDatabaseException("Internal error, contact application support." + - "Cluster unique id not defined for foreing cluster."); - addForeignLong(resourceIndex, clusterUID); - return ClusterEnum.ForeignLong; - } - } - - private final void addByte(byte b) { - bytes[byteIndex++] = b; - } - - private final void addShort(short s) { - bytes[byteIndex++] = (byte)(s & 0xFF); - bytes[byteIndex++] = (byte)(s >>> 8); - } - -// private final void addShort(int s) { -// bytes[byteIndex++] = (byte) (s & 0xFF); -// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF); -// } - - private final void addInt(int i) { -// System.err.println("addInt " + i + " " + i); - bytes[byteIndex++] = (byte) (i & 0xFF); - bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF); - bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF); - bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF); - // buffer.asIntBuffer().put(i); - // buffer.position(buffer.position()+4); - } - -// private void addLong6(long l) { -//// System.err.println("addLong " + l); -// bytes[byteIndex++] = (byte) (l & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); -// // buffer.asLongBuffer().put(l); -// // buffer.position(buffer.position() + 6); -// } - - private void addLong7(long l) { - bytes[byteIndex++] = (byte) (l & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); - bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); - // buffer.asLongBuffer().put(l); - // buffer.position(buffer.position() + 7); - } - -// private void addLong(long l) { -// bytes[byteIndex++] = (byte) (l & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); -// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF); -// } - private final byte bufferPop() { - return bytes[--byteIndex]; - } - - final class DebugStm { - StmEnum e; - int r; - int p; - int o; - ClusterUID pc; - ClusterUID oc; - - DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) { - this.e = e; - this.r = r; - this.p = p; - this.o = o; - this.pc = pc; - this.oc = oc; - } - - @Override - public String toString() { - short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); - short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); - short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); - return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID - + " pk=" + p + " pi=" + pi + " pc=" + pc - + " ok=" + o + " oi=" + oi + " oc=" + oc; - } - - public String toString2() { - return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p - + " pc=" + pc + " o=" + o + " oc=" + oc; - } - - public String toString3() { - short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); - short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); - short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); - return "" + e + " ri=" + ri - + " pi=" + pi + " pc=" + pc - + " oi=" + oi + " oc=" + oc; - } - } - - private List debugStms = new ArrayList(); - - @SuppressWarnings("unused") - private final void addStm(Change c, StmEnum stmEnum) { - - if (DEBUG_STAT) - ++info.nStms; - if (DEBUG || DEBUG_CCS) { - DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2); - if (DEBUG_CCS) - debugStms.add(d); - if (DEBUG) { - printlnd(d.toString3() + " offset=" + byteIndex); - } - } - // int opPos = buffer.position(); - int opPos = byteIndex++; - // buffer.put((byte)0); // operation code - // addByte((byte)0); - - boolean done = true; - - ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1); - byte ab = 0; - - // ForeignShort = byte - // Local = short - // ForeignLong = 8 byte - if (a != ClusterEnum.ForeignShort) { - ab = bufferPop(); - done = false; - } - - ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2); - byte bb = 0; - if (b != ClusterEnum.ForeignShort) { - bb = bufferPop(); - done = false; - } - - int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); - if (ClusterTraitsSmall.isIllegalResourceIndex(ri)) - throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); - bytes[byteIndex++] = (byte)ri; // index low byte - if(!done) { - Data data = ClusterEnum.getData(stmEnum, a, b); - int left = 6 - data.bits; - int op = ri >>> (8 + left); - ri >>>= 8; - ri &= (1 << left) - 1; - if (a != ClusterEnum.ForeignShort) { - ri |= ab << left; - left += 6; - } - if (b != ClusterEnum.ForeignShort) { - ri |= bb << left; - left += 6; - } - switch (data.bytes) { - default: - throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes); - case 2: - bytes[byteIndex++] = (byte)(ri & 0xFF); - bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF); - break; - case 1: - bytes[byteIndex++] = (byte)(ri & 0xFF); - break; - case 0: - break; - } - op |= data.mask; - this.bytes[opPos] = (byte)op; - } else { - if (stmEnum == StmEnum.Add) - bytes[opPos] = (byte)((ri >>> 8) + 64); - else - bytes[opPos] = (byte)((ri >>> 8) + 128); - } - if (DEBUG_STAT) { - if (a == ClusterEnum.Local && b == ClusterEnum.Local) { - ++info.nLocal; - } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) { - ++info.nPartly; - } else { - ++info.nForeign; - } - } - if (foreignTable.size() > 252) - flushInternal(graphSession, clusterUID); - } - - private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) { - if (DEBUG) - printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length); - if (ClusterTraitsBase.isIllegalResourceIndex(ri)) - throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); - if (value_offset > (1L << 58 - 1)) - throw new RuntimeDatabaseException("Illegal value offset=" - + value_offset); - if (size < 0 || size > MAX_FIXED_BYTES - 1) - throw new RuntimeDatabaseException("Illegal value size=" + size); - if (offset + size > bytes.length) - throw new RuntimeDatabaseException("Illegal value size=" + size); - checkBufferSpace(12 + size); - addByte(OpEnum.Modify.getOrMask()); - ri |= (value_offset >>> 56) << 14; // add top two bits - addShort((short) ri); - value_offset &= (1L << 56) - 1; - addLong7(value_offset); - addShort((short) size); - if (DEBUG) - System.out.println("Modify value fixed part end offset=" + byteIndex); - int copied = Math.min(size, this.bytes.length - byteIndex); - System.arraycopy(bytes, offset, this.bytes, byteIndex, copied); - byteIndex += size; - return copied; - } - - private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) { - checkBufferSpace(0); - int current = Math.min(this.bytes.length - byteIndex - 12, left); - if(current >= 0) { - int written = modiValue(ri, voffset, bytes, offset, current); - voffset += written; - offset += written; - left -= written; - } - flushInternal(graphSession, clusterUID); - while (left > 0) { - int length = Math.min(left, (1 << 16) - 1); - if (DEBUG) - printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length); - int psize = length + 12; - setHeaderVectorSize(psize); - byte[] message = new byte[psize+HEADER_SIZE]; - System.arraycopy(header, 0, message, 0, HEADER_SIZE); - int to = HEADER_SIZE; - Bytes.write(message, to++, OpEnum.Modify.getOrMask()); - short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits - Bytes.writeLE(message, to, index); to += 2; - Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7; - Bytes.writeLE(message, to, (short)length); to += 2; - System.arraycopy(bytes, offset, message, to, length); - graphSession.updateCluster(new UpdateClusterFunction(message)); - voffset += length; - offset += length; - left -= length; - } - } - - private final int setValueBig(int ri, byte[] bytes, int length_) { - checkBufferSpace(12); - int sum = 0; - int voffset = 0; - int offset = 0; - int left = length_; - while (left > 0) { - int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex); - if (DEBUG) - printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length); - int written = modiValue(ri, voffset, bytes, offset, length); - sum += written; - voffset += written; - offset += written; - left -= written; - checkBufferSpace(12); - } - return sum; - } - - private final int setValueSmall(int ri, byte[] bytes, int length) { - checkBufferSpace(5 + length); - int pos = byteIndex; - int i = length << 14 | ri; - if (length < 32) { - byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2); - addByte(op); - short s = (short) i; - addShort(s); - } else { - addByte(OpEnum.Set.getOrMask()); - addInt(i); - } - System.arraycopy(bytes, 0, this.bytes, byteIndex, length); - byteIndex += length; - int len = byteIndex - pos; - return len; - } - - final void setValue(short index, byte[] bytes) { - setValue(index, bytes, bytes.length); - } - - final void setValue(short index, byte[] bytes, int length) { - checkInitialization(); - if (ClusterTraitsBase.isIllegalResourceIndex(index)) - throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index); - if (DEBUG) - printlnd("Set value ri=" + index - + " len=" + length - + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length)))); - int len; - /* - * The limit for the cluster stream is (1<18)-1 but this avoids the - * conversion to big cluster. - */ - if (length > ClusterTraitsSmall.VALUE_SIZE_MAX) - len = setValueBig(index, bytes, length); - else - len = setValueSmall(index, bytes, length); - if (DEBUG_STAT) { - ++info.nValues; - info.sValues += len + length; - } - } - - final void setValue(Change c, byte[] bytes, int length) { - short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); - setValue(ri, bytes, length); - c.initValue(); - } - - final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) { - checkInitialization(); - int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); - if (DEBUG) - printlnd("Modify value ri=" + ri - + " voff=" + voffset - + " vlen=" + length - + " blen=" + bytes.length - + " boff=" + offset - + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length)))); - modiValueBig(ri, voffset, length, bytes, offset); - c.init(); - if (DEBUG_STAT) { - ++info.nValues; - info.sValues += length; - } - } - final void setImmutable(boolean immutable) { - checkInitialization(); - clusterChange2.setImmutable(immutable); - } - final void undoValueEx(int resourceIndex) { - checkInitialization(); - clusterChange2.undoValueEx(resourceIndex); - } - final void setDeleted(boolean deleted) { - checkInitialization(); - clusterChange2.setDeleted(deleted); - } - final void corrupt() { - checkInitialization(); - addByte((byte)0); - } - /** - * @param graphSession - * @param clusterId - * @return true if actually flushed something - */ - final boolean flush(GraphSession graphSession, ClusterUID clusterUID) { - if (byteIndex > 0) { - if(DebugPolicy.REPORT_CLUSTER_STREAM) - System.err.println("Flush cluster change set stream " + this); - setHeaderVectorSize(byteIndex); - byte[] copy = new byte[byteIndex + HEADER_SIZE]; - System.arraycopy(header, 0, copy, 0, HEADER_SIZE); - System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex); - UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy); - if (DEBUG_CCS) { - for (DebugStm stm : debugStms) - printlnd(stm.toString2()); - debugStms.clear(); - } - if (DEBUG_STAT) { - info.tot = updateClusterFunction.operation.length; - printlnd("ReallyFlush: " + info.toString()); - sum.add(info); - printlnd("ReallyFlush sum: " + sum.toString()); - } - // long start = System.nanoTime(); - graphSession.updateCluster(updateClusterFunction); - // long duration = System.nanoTime() - start; - // duration2 += duration; - // System.err.println("updateCluster " + 1e-9*duration); - // System.err.println("updateCluster total " + 1e-9*duration2); - clear(); - clusterChange2.flush(graphSession); - return true; - } else if (clusterChange2.isDirty()) { - clusterChange2.flush(graphSession); - clear(); - return true; - } else if (flushed) { - flushed = false; - return true; - } else { - return true; - } - } - - final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) { - flush(graphSession, clusterUID); - flushed = true; - } - - final class ForeignTable { - private final TLongIntHashMap table = new TLongIntHashMap(); - - private long createKey(short index, long cluster) { - assert (cluster <= (1L << 48) - 1); - return (cluster << 14) | index; - } - - public int get(short index, long cluster) { - int value = table.get(createKey(index, cluster)); - if (DEBUG) - printlnd("ForeignTable get c=" + clusterUID + " i=" - + (value - 1) + " r=" + index + " rc=" + cluster); - return value; - } - - public int put(short index, long cluster, int value) { - if (DEBUG) - printlnd("ForeignTable put c=" + clusterUID + " i=" - + (value - 1) + " r=" + index + " rc=" + cluster); - return table.put(createKey(index, cluster), value); - } - - public int size() { - return table.size(); - } - - public void clear() { - table.clear(); - } - } - - @Override - public int hashCode() { - return 31*clusterImpl.getClusterKey(); - } - - @Override - public boolean equals(Object object) { - if (this == object) - return true; - else if (object == null) - return false; - else if (!(object instanceof ClusterChange)) - return false; - ClusterChange r = (ClusterChange)object; - return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey(); - } - - -} +package fi.vtt.simantics.procore.internal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.ClusterTraitsBase; +import org.simantics.db.procore.cluster.ClusterImpl; +import org.simantics.db.procore.cluster.ClusterTraits; +import org.simantics.db.procore.cluster.ClusterTraitsSmall; +import org.simantics.db.service.Bytes; +import org.simantics.db.service.ClusterUID; + +import fi.vtt.simantics.procore.DebugPolicy; +import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum; +import fi.vtt.simantics.procore.internal.ClusterStream.Data; +import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo; +import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum; +import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum; +import gnu.trove.map.hash.TIntByteHashMap; +import gnu.trove.map.hash.TLongIntHashMap; + +public final class ClusterChange { + public static final int VERSION = 1; + public static final byte ADD_OPERATION = 2; + public static final byte REMOVE_OPERATION = 3; + public static final byte DELETE_OPERATION = 5; + + public static final boolean DEBUG = false; + public static final boolean DEBUG_STAT = false; + public static final boolean DEBUG_CCS = false; + + private static DebugInfo sum = new DebugInfo(); + + public final TIntByteHashMap foreignTable = new TIntByteHashMap(); + private final DebugInfo info; + private final GraphSession graphSession; + public final ClusterUID clusterUID; + private final int SIZE_OFFSET; + private final int HEADER_SIZE; + // How much buffer is used before stream is flushed to server. The bigger the better. + public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14); + private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16; + private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36; + private int nextSize = MAX_FIXED_BYTES; + int byteIndex = 0; + private byte[] bytes = null; // Operation data. + private final byte[] header; + private boolean flushed = false; + private final ClusterStream clusterStream; + private final ClusterChange2 clusterChange2; + + public ClusterImpl clusterImpl; + + public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) { + + this.clusterImpl = clusterImpl; + + clusterUID = clusterImpl.getClusterUID(); + long[] longs = new long[ClusterUID.getLongLength()]; + clusterUID.toLong(longs, 0); + this.graphSession = clusterStream.graphSession; + info = new DebugInfo(); + HEADER_SIZE = 8 + longs.length * 8; + header = new byte[HEADER_SIZE]; + SIZE_OFFSET = 0; + Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later. + Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION); + for (int i=0, offset=8; i>>8); + } + void flushCollect(Change c) { + flushInternal(graphSession, clusterUID); + if (DEBUG) + printlnd("Cluster change data was flushed."); + if (null != c) { + if (DEBUG) + printlnd("Clearing lookup for " + c.toString()); + c.lookup1 = null; + c.lookup2 = null; + } + if (null != clusterImpl) { + clusterImpl.foreignLookup = null; + } + } + + private final boolean checkBufferSpace(Change c) { + clusterStream.changes.checkFlush(); + if(bytes == null) initBuffer(); + if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) { + return false; + } + flushCollect(c); + initBuffer(); + return true; + } + + private final void checkBufferSpace(int size) { + if(bytes == null) initBuffer(); + if (bytes.length - byteIndex >= size) + return; + nextSize = Math.max(MAX_FIXED_BYTES, size); + flushInternal(graphSession, clusterUID); + initBuffer(); + } + + final void addChange(Change c) { + checkInitialization(); + checkBufferSpace(c); + byte operation = c.op0; + if(operation == ADD_OPERATION) + addStm(c, StmEnum.Add); + else if (operation == REMOVE_OPERATION) + addStm(c, StmEnum.Remove); + else if (operation == DELETE_OPERATION) { + if (DEBUG) + printlnd("Delete value offset=" + byteIndex + " " + c); + addByte(OpEnum.Delete.getOrMask()); + addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0)); + } + c.lastArg = 0; + } + + private final void addForeignLong(short index, ClusterUID clusterUID) { + byteIndex = clusterUID.toByte(bytes, byteIndex); + bytes[byteIndex++] = (byte)(index & 0xFF); + bytes[byteIndex++] = (byte)(index >>> 8); + } + + private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) { + assert(!clusterUID.equals(ClusterUID.Null)); + short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key); + if (clusterUID.equals(this.clusterUID)) { + bytes[byteIndex++] = (byte)(resourceIndex & 0xFF); + bytes[byteIndex++] = (byte)(resourceIndex >>> 8); + return ClusterEnum.Local; + } + + byte foreign = 0; + if(lookIndex > 0) { + if(lookup != null) + foreign = lookup[lookIndex]; + } else { + foreign = foreignTable.get(key); + } + if (0 != foreign) { + if (foreign > 256) + throw new RuntimeDatabaseException("Internal error." + + "Too big foreign index=" + foreign + " max=256"); + --foreign; + bytes[byteIndex++] = foreign; + return ClusterEnum.ForeignShort; + } else { + byte position = (byte) (foreignTable.size() + 1); + if(lookup != null) + lookup[lookIndex] = position; + foreignTable.put(key, position); + if (DEBUG_STAT) + info.sForeign = foreignTable.size(); + if (clusterUID.equals(ClusterUID.Null)) + throw new RuntimeDatabaseException("Internal error." + + "Cluster unique id not defined for foreign cluster."); + addForeignLong(resourceIndex, clusterUID); + return ClusterEnum.ForeignLong; + } + } + + private final void addByte(byte b) { + bytes[byteIndex++] = b; + } + + private final void addShort(short s) { + bytes[byteIndex++] = (byte)(s & 0xFF); + bytes[byteIndex++] = (byte)(s >>> 8); + } + +// private final void addShort(int s) { +// bytes[byteIndex++] = (byte) (s & 0xFF); +// bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF); +// } + + private final void addInt(int i) { +// System.err.println("addInt " + i + " " + i); + bytes[byteIndex++] = (byte) (i & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF); + bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF); + // buffer.asIntBuffer().put(i); + // buffer.position(buffer.position()+4); + } + +// private void addLong6(long l) { +//// System.err.println("addLong " + l); +// bytes[byteIndex++] = (byte) (l & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); +// // buffer.asLongBuffer().put(l); +// // buffer.position(buffer.position() + 6); +// } + + private void addLong7(long l) { + bytes[byteIndex++] = (byte) (l & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); + bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); + // buffer.asLongBuffer().put(l); + // buffer.position(buffer.position() + 7); + } + +// private void addLong(long l) { +// bytes[byteIndex++] = (byte) (l & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); +// bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF); +// } + private final byte bufferPop() { + return bytes[--byteIndex]; + } + + final class DebugStm { + StmEnum e; + int r; + int p; + int o; + ClusterUID pc; + ClusterUID oc; + + DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) { + this.e = e; + this.r = r; + this.p = p; + this.o = o; + this.pc = pc; + this.oc = oc; + } + + @Override + public String toString() { + short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); + short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); + short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); + return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID + + " pk=" + p + " pi=" + pi + " pc=" + pc + + " ok=" + o + " oi=" + oi + " oc=" + oc; + } + + public String toString2() { + return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p + + " pc=" + pc + " o=" + o + " oc=" + oc; + } + + public String toString3() { + short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); + short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); + short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); + return "" + e + " ri=" + ri + + " pi=" + pi + " pc=" + pc + + " oi=" + oi + " oc=" + oc; + } + } + + private List debugStms = new ArrayList(); + + @SuppressWarnings("unused") + private final void addStm(Change c, StmEnum stmEnum) { + + if (DEBUG_STAT) + ++info.nStms; + if (DEBUG || DEBUG_CCS) { + DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2); + if (DEBUG_CCS) + debugStms.add(d); + if (DEBUG) { + printlnd(d.toString3() + " offset=" + byteIndex); + } + } + // int opPos = buffer.position(); + int opPos = byteIndex++; + // buffer.put((byte)0); // operation code + // addByte((byte)0); + + boolean done = true; + + ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1); + byte ab = 0; + + // ForeignShort = byte + // Local = short + // ForeignLong = 8 byte + if (a != ClusterEnum.ForeignShort) { + ab = bufferPop(); + done = false; + } + + ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2); + byte bb = 0; + if (b != ClusterEnum.ForeignShort) { + bb = bufferPop(); + done = false; + } + + int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); + if (ClusterTraitsSmall.isIllegalResourceIndex(ri)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); + bytes[byteIndex++] = (byte)ri; // index low byte + if(!done) { + Data data = ClusterEnum.getData(stmEnum, a, b); + int left = 6 - data.bits; + int op = ri >>> (8 + left); + ri >>>= 8; + ri &= (1 << left) - 1; + if (a != ClusterEnum.ForeignShort) { + ri |= ab << left; + left += 6; + } + if (b != ClusterEnum.ForeignShort) { + ri |= bb << left; + left += 6; + } + switch (data.bytes) { + default: + throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes); + case 2: + bytes[byteIndex++] = (byte)(ri & 0xFF); + bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF); + break; + case 1: + bytes[byteIndex++] = (byte)(ri & 0xFF); + break; + case 0: + break; + } + op |= data.mask; + this.bytes[opPos] = (byte)op; + } else { + if (stmEnum == StmEnum.Add) + bytes[opPos] = (byte)((ri >>> 8) + 64); + else + bytes[opPos] = (byte)((ri >>> 8) + 128); + } + if (DEBUG_STAT) { + if (a == ClusterEnum.Local && b == ClusterEnum.Local) { + ++info.nLocal; + } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) { + ++info.nPartly; + } else { + ++info.nForeign; + } + } + if (foreignTable.size() > 252) + flushInternal(graphSession, clusterUID); + } + + private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) { + if (DEBUG) + printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length); + if (ClusterTraitsBase.isIllegalResourceIndex(ri)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); + if (value_offset > (1L << 58 - 1)) + throw new RuntimeDatabaseException("Illegal value offset=" + + value_offset); + if (size < 0 || size > MAX_FIXED_BYTES - 1) + throw new RuntimeDatabaseException("Illegal value size=" + size); + if (offset + size > bytes.length) + throw new RuntimeDatabaseException("Illegal value size=" + size); + checkBufferSpace(12 + size); + addByte(OpEnum.Modify.getOrMask()); + ri |= (value_offset >>> 56) << 14; // add top two bits + addShort((short) ri); + value_offset &= (1L << 56) - 1; + addLong7(value_offset); + addShort((short) size); + if (DEBUG) + System.out.println("Modify value fixed part end offset=" + byteIndex); + int copied = Math.min(size, this.bytes.length - byteIndex); + System.arraycopy(bytes, offset, this.bytes, byteIndex, copied); + byteIndex += size; + return copied; + } + + private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) { + checkBufferSpace(0); + int current = Math.min(this.bytes.length - byteIndex - 12, left); + if(current >= 0) { + int written = modiValue(ri, voffset, bytes, offset, current); + voffset += written; + offset += written; + left -= written; + } + flushInternal(graphSession, clusterUID); + while (left > 0) { + int length = Math.min(left, (1 << 16) - 1); + if (DEBUG) + printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length); + int psize = length + 12; + setHeaderVectorSize(psize); + byte[] message = new byte[psize+HEADER_SIZE]; + System.arraycopy(header, 0, message, 0, HEADER_SIZE); + int to = HEADER_SIZE; + Bytes.write(message, to++, OpEnum.Modify.getOrMask()); + short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits + Bytes.writeLE(message, to, index); to += 2; + Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7; + Bytes.writeLE(message, to, (short)length); to += 2; + System.arraycopy(bytes, offset, message, to, length); + graphSession.updateCluster(new UpdateClusterFunction(message)); + voffset += length; + offset += length; + left -= length; + } + } + + private final int setValueBig(int ri, byte[] bytes, int length_) { + checkBufferSpace(12); + int sum = 0; + int voffset = 0; + int offset = 0; + int left = length_; + while (left > 0) { + int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex); + if (DEBUG) + printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length); + int written = modiValue(ri, voffset, bytes, offset, length); + sum += written; + voffset += written; + offset += written; + left -= written; + checkBufferSpace(12); + } + // Possibly truncate file + modiValue(ri, length_, new byte[0], 0, 0); + return sum; + } + + private final int setValueSmall(int ri, byte[] bytes, int length) { + checkBufferSpace(5 + length); + int pos = byteIndex; + int i = length << 14 | ri; + if (length < 32) { + byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2); + addByte(op); + short s = (short) i; + addShort(s); + } else { + addByte(OpEnum.Set.getOrMask()); + addInt(i); + } + System.arraycopy(bytes, 0, this.bytes, byteIndex, length); + byteIndex += length; + int len = byteIndex - pos; + return len; + } + + final void setValue(short index, byte[] bytes) { + setValue(index, bytes, bytes.length); + } + + final void setValue(short index, byte[] bytes, int length) { + checkInitialization(); + if (ClusterTraitsBase.isIllegalResourceIndex(index)) + throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index); + if (DEBUG) + printlnd("Set value ri=" + index + + " len=" + length + + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length)))); + int len; + /* + * The limit for the cluster stream is (1<18)-1 but this avoids the + * conversion to big cluster. + */ + if (length > ClusterTraitsSmall.VALUE_SIZE_MAX) + len = setValueBig(index, bytes, length); + else + len = setValueSmall(index, bytes, length); + if (DEBUG_STAT) { + ++info.nValues; + info.sValues += len + length; + } + } + + final void setValue(Change c, byte[] bytes, int length) { + short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); + setValue(ri, bytes, length); + c.initValue(); + } + + final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) { + checkInitialization(); + int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); + if (DEBUG) + printlnd("Modify value ri=" + ri + + " voff=" + voffset + + " vlen=" + length + + " blen=" + bytes.length + + " boff=" + offset + + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length)))); + modiValueBig(ri, voffset, length, bytes, offset); + c.init(); + if (DEBUG_STAT) { + ++info.nValues; + info.sValues += length; + } + } + final void setImmutable(boolean immutable) { + checkInitialization(); + clusterChange2.setImmutable(immutable); + } + final void undoValueEx(int resourceIndex) { + checkInitialization(); + clusterChange2.undoValueEx(resourceIndex); + } + final void setDeleted(boolean deleted) { + checkInitialization(); + clusterChange2.setDeleted(deleted); + } + final void corrupt() { + checkInitialization(); + addByte((byte)0); + } + /** + * @param graphSession + * @param clusterId + * @return true if actually flushed something + */ + final boolean flush(GraphSession graphSession, ClusterUID clusterUID) { + if (byteIndex > 0) { + if(DebugPolicy.REPORT_CLUSTER_STREAM) + System.err.println("Flush cluster change set stream " + this); + setHeaderVectorSize(byteIndex); + byte[] copy = new byte[byteIndex + HEADER_SIZE]; + System.arraycopy(header, 0, copy, 0, HEADER_SIZE); + System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex); + UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy); + if (DEBUG_CCS) { + for (DebugStm stm : debugStms) + printlnd(stm.toString2()); + debugStms.clear(); + } + if (DEBUG_STAT) { + info.tot = updateClusterFunction.operation.length; + printlnd("ReallyFlush: " + info.toString()); + sum.add(info); + printlnd("ReallyFlush sum: " + sum.toString()); + } + // long start = System.nanoTime(); + graphSession.updateCluster(updateClusterFunction); + // long duration = System.nanoTime() - start; + // duration2 += duration; + // System.err.println("updateCluster " + 1e-9*duration); + // System.err.println("updateCluster total " + 1e-9*duration2); + clear(); + clusterChange2.flush(graphSession); + return true; + } else if (clusterChange2.isDirty()) { + clusterChange2.flush(graphSession); + clear(); + return true; + } else if (flushed) { + flushed = false; + return true; + } else { + return true; + } + } + + final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) { + flush(graphSession, clusterUID); + flushed = true; + } + + final class ForeignTable { + private final TLongIntHashMap table = new TLongIntHashMap(); + + private long createKey(short index, long cluster) { + assert (cluster <= (1L << 48) - 1); + return (cluster << 14) | index; + } + + public int get(short index, long cluster) { + int value = table.get(createKey(index, cluster)); + if (DEBUG) + printlnd("ForeignTable get c=" + clusterUID + " i=" + + (value - 1) + " r=" + index + " rc=" + cluster); + return value; + } + + public int put(short index, long cluster, int value) { + if (DEBUG) + printlnd("ForeignTable put c=" + clusterUID + " i=" + + (value - 1) + " r=" + index + " rc=" + cluster); + return table.put(createKey(index, cluster), value); + } + + public int size() { + return table.size(); + } + + public void clear() { + table.clear(); + } + } + + @Override + public int hashCode() { + return 31*clusterImpl.getClusterKey(); + } + + @Override + public boolean equals(Object object) { + if (this == object) + return true; + else if (object == null) + return false; + else if (!(object instanceof ClusterChange)) + return false; + ClusterChange r = (ClusterChange)object; + return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey(); + } + + +}