package fi.vtt.simantics.procore.internal; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.ClusterTraitsBase; import org.simantics.db.procore.cluster.ClusterImpl; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.procore.cluster.ClusterTraitsSmall; import org.simantics.db.service.Bytes; import org.simantics.db.service.ClusterUID; import fi.vtt.simantics.procore.DebugPolicy; import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum; import fi.vtt.simantics.procore.internal.ClusterStream.Data; import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo; import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum; import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum; import gnu.trove.map.hash.TIntByteHashMap; import gnu.trove.map.hash.TLongIntHashMap; public final class ClusterChange { public static final int VERSION = 1; public static final byte ADD_OPERATION = 2; public static final byte REMOVE_OPERATION = 3; public static final byte DELETE_OPERATION = 5; public static final boolean DEBUG = false; public static final boolean DEBUG_STAT = false; public static final boolean DEBUG_CCS = false; private static DebugInfo sum = new DebugInfo(); public final TIntByteHashMap foreignTable = new TIntByteHashMap(); private final DebugInfo info; private final GraphSession graphSession; public final ClusterUID clusterUID; private final int SIZE_OFFSET; private final int HEADER_SIZE; // How much buffer is used before stream is flushed to server. The bigger the better. public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14); private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16; private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36; private int nextSize = MAX_FIXED_BYTES; int byteIndex = 0; private byte[] bytes = null; // Operation data. private final byte[] header; private boolean flushed = false; private final ClusterStream clusterStream; private final ClusterChange2 clusterChange2; public ClusterImpl clusterImpl; public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) { this.clusterImpl = clusterImpl; clusterUID = clusterImpl.getClusterUID(); long[] longs = new long[ClusterUID.getLongLength()]; clusterUID.toLong(longs, 0); this.graphSession = clusterStream.graphSession; info = new DebugInfo(); HEADER_SIZE = 8 + longs.length * 8; header = new byte[HEADER_SIZE]; SIZE_OFFSET = 0; Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later. Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION); for (int i=0, offset=8; i>>8); } void flushCollect(Change c) { flushInternal(graphSession, clusterUID); if (DEBUG) printlnd("Cluster change data was flushed."); if (null != c) { if (DEBUG) printlnd("Clearing lookup for " + c.toString()); c.lookup1 = null; c.lookup2 = null; } if (null != clusterImpl) { clusterImpl.foreignLookup = null; } } private final boolean checkBufferSpace(Change c) { clusterStream.changes.checkFlush(); if(bytes == null) initBuffer(); if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) { return false; } flushCollect(c); initBuffer(); return true; } private final void checkBufferSpace(int size) { if(bytes == null) initBuffer(); if (bytes.length - byteIndex >= size) return; nextSize = Math.max(MAX_FIXED_BYTES, size); flushInternal(graphSession, clusterUID); initBuffer(); } final void addChange(Change c) { checkInitialization(); checkBufferSpace(c); byte operation = c.op0; if(operation == ADD_OPERATION) addStm(c, StmEnum.Add); else if (operation == REMOVE_OPERATION) addStm(c, StmEnum.Remove); else if (operation == DELETE_OPERATION) { if (DEBUG) printlnd("Delete value offset=" + byteIndex + " " + c); addByte(OpEnum.Delete.getOrMask()); addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0)); } c.lastArg = 0; } private final void addForeignLong(short index, ClusterUID clusterUID) { byteIndex = clusterUID.toByte(bytes, byteIndex); bytes[byteIndex++] = (byte)(index & 0xFF); bytes[byteIndex++] = (byte)(index >>> 8); } private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) { assert(!clusterUID.equals(ClusterUID.Null)); short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key); if (clusterUID.equals(this.clusterUID)) { bytes[byteIndex++] = (byte)(resourceIndex & 0xFF); bytes[byteIndex++] = (byte)(resourceIndex >>> 8); return ClusterEnum.Local; } byte foreign = 0; if(lookIndex > 0) { if(lookup != null) foreign = lookup[lookIndex]; } else { foreign = foreignTable.get(key); } if (0 != foreign) { if (foreign > 256) throw new RuntimeDatabaseException("Internal error, contact application support." + "Too big foreing index=" + foreign + " max=256"); --foreign; bytes[byteIndex++] = foreign; return ClusterEnum.ForeignShort; } else { byte position = (byte) (foreignTable.size() + 1); if(lookup != null) lookup[lookIndex] = position; foreignTable.put(key, position); if (DEBUG_STAT) info.sForeign = foreignTable.size(); if (clusterUID.equals(ClusterUID.Null)) throw new RuntimeDatabaseException("Internal error, contact application support." + "Cluster unique id not defined for foreing cluster."); addForeignLong(resourceIndex, clusterUID); return ClusterEnum.ForeignLong; } } private final void addByte(byte b) { bytes[byteIndex++] = b; } private final void addShort(short s) { bytes[byteIndex++] = (byte)(s & 0xFF); bytes[byteIndex++] = (byte)(s >>> 8); } // private final void addShort(int s) { // bytes[byteIndex++] = (byte) (s & 0xFF); // bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF); // } private final void addInt(int i) { // System.err.println("addInt " + i + " " + i); bytes[byteIndex++] = (byte) (i & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF); // buffer.asIntBuffer().put(i); // buffer.position(buffer.position()+4); } // private void addLong6(long l) { //// System.err.println("addLong " + l); // bytes[byteIndex++] = (byte) (l & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); // // buffer.asLongBuffer().put(l); // // buffer.position(buffer.position() + 6); // } private void addLong7(long l) { bytes[byteIndex++] = (byte) (l & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); // buffer.asLongBuffer().put(l); // buffer.position(buffer.position() + 7); } // private void addLong(long l) { // bytes[byteIndex++] = (byte) (l & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF); // } private final byte bufferPop() { return bytes[--byteIndex]; } final class DebugStm { StmEnum e; int r; int p; int o; ClusterUID pc; ClusterUID oc; DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) { this.e = e; this.r = r; this.p = p; this.o = o; this.pc = pc; this.oc = oc; } @Override public String toString() { short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID + " pk=" + p + " pi=" + pi + " pc=" + pc + " ok=" + o + " oi=" + oi + " oc=" + oc; } public String toString2() { return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p + " pc=" + pc + " o=" + o + " oc=" + oc; } public String toString3() { short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); return "" + e + " ri=" + ri + " pi=" + pi + " pc=" + pc + " oi=" + oi + " oc=" + oc; } } private List debugStms = new ArrayList(); @SuppressWarnings("unused") private final void addStm(Change c, StmEnum stmEnum) { if (DEBUG_STAT) ++info.nStms; if (DEBUG || DEBUG_CCS) { DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2); if (DEBUG_CCS) debugStms.add(d); if (DEBUG) { printlnd(d.toString3() + " offset=" + byteIndex); } } // int opPos = buffer.position(); int opPos = byteIndex++; // buffer.put((byte)0); // operation code // addByte((byte)0); boolean done = true; ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1); byte ab = 0; // ForeignShort = byte // Local = short // ForeignLong = 8 byte if (a != ClusterEnum.ForeignShort) { ab = bufferPop(); done = false; } ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2); byte bb = 0; if (b != ClusterEnum.ForeignShort) { bb = bufferPop(); done = false; } int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); if (ClusterTraitsSmall.isIllegalResourceIndex(ri)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); bytes[byteIndex++] = (byte)ri; // index low byte if(!done) { Data data = ClusterEnum.getData(stmEnum, a, b); int left = 6 - data.bits; int op = ri >>> (8 + left); ri >>>= 8; ri &= (1 << left) - 1; if (a != ClusterEnum.ForeignShort) { ri |= ab << left; left += 6; } if (b != ClusterEnum.ForeignShort) { ri |= bb << left; left += 6; } switch (data.bytes) { default: throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes); case 2: bytes[byteIndex++] = (byte)(ri & 0xFF); bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF); break; case 1: bytes[byteIndex++] = (byte)(ri & 0xFF); break; case 0: break; } op |= data.mask; this.bytes[opPos] = (byte)op; } else { if (stmEnum == StmEnum.Add) bytes[opPos] = (byte)((ri >>> 8) + 64); else bytes[opPos] = (byte)((ri >>> 8) + 128); } if (DEBUG_STAT) { if (a == ClusterEnum.Local && b == ClusterEnum.Local) { ++info.nLocal; } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) { ++info.nPartly; } else { ++info.nForeign; } } if (foreignTable.size() > 252) flushInternal(graphSession, clusterUID); } private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) { if (DEBUG) printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length); if (ClusterTraitsBase.isIllegalResourceIndex(ri)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); if (value_offset > (1L << 58 - 1)) throw new RuntimeDatabaseException("Illegal value offset=" + value_offset); if (size < 0 || size > MAX_FIXED_BYTES - 1) throw new RuntimeDatabaseException("Illegal value size=" + size); if (offset + size > bytes.length) throw new RuntimeDatabaseException("Illegal value size=" + size); checkBufferSpace(12 + size); addByte(OpEnum.Modify.getOrMask()); ri |= (value_offset >>> 56) << 14; // add top two bits addShort((short) ri); value_offset &= (1L << 56) - 1; addLong7(value_offset); addShort((short) size); if (DEBUG) System.out.println("Modify value fixed part end offset=" + byteIndex); int copied = Math.min(size, this.bytes.length - byteIndex); System.arraycopy(bytes, offset, this.bytes, byteIndex, copied); byteIndex += size; return copied; } private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) { checkBufferSpace(0); int current = Math.min(this.bytes.length - byteIndex - 12, left); if(current >= 0) { int written = modiValue(ri, voffset, bytes, offset, current); voffset += written; offset += written; left -= written; } flushInternal(graphSession, clusterUID); while (left > 0) { int length = Math.min(left, (1 << 16) - 1); if (DEBUG) printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length); int psize = length + 12; setHeaderVectorSize(psize); byte[] message = new byte[psize+HEADER_SIZE]; System.arraycopy(header, 0, message, 0, HEADER_SIZE); int to = HEADER_SIZE; Bytes.write(message, to++, OpEnum.Modify.getOrMask()); short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits Bytes.writeLE(message, to, index); to += 2; Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7; Bytes.writeLE(message, to, (short)length); to += 2; System.arraycopy(bytes, offset, message, to, length); graphSession.updateCluster(new UpdateClusterFunction(message)); voffset += length; offset += length; left -= length; } } private final int setValueBig(int ri, byte[] bytes, int length_) { checkBufferSpace(12); int sum = 0; int voffset = 0; int offset = 0; int left = length_; while (left > 0) { int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex); if (DEBUG) printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length); int written = modiValue(ri, voffset, bytes, offset, length); sum += written; voffset += written; offset += written; left -= written; checkBufferSpace(12); } return sum; } private final int setValueSmall(int ri, byte[] bytes, int length) { checkBufferSpace(5 + length); int pos = byteIndex; int i = length << 14 | ri; if (length < 32) { byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2); addByte(op); short s = (short) i; addShort(s); } else { addByte(OpEnum.Set.getOrMask()); addInt(i); } System.arraycopy(bytes, 0, this.bytes, byteIndex, length); byteIndex += length; int len = byteIndex - pos; return len; } final void setValue(short index, byte[] bytes) { setValue(index, bytes, bytes.length); } final void setValue(short index, byte[] bytes, int length) { checkInitialization(); if (ClusterTraitsBase.isIllegalResourceIndex(index)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index); if (DEBUG) printlnd("Set value ri=" + index + " len=" + length + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length)))); int len; /* * The limit for the cluster stream is (1<18)-1 but this avoids the * conversion to big cluster. */ if (length > ClusterTraitsSmall.VALUE_SIZE_MAX) len = setValueBig(index, bytes, length); else len = setValueSmall(index, bytes, length); if (DEBUG_STAT) { ++info.nValues; info.sValues += len + length; } } final void setValue(Change c, byte[] bytes, int length) { short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); setValue(ri, bytes, length); c.initValue(); } final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) { checkInitialization(); int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); if (DEBUG) printlnd("Modify value ri=" + ri + " voff=" + voffset + " vlen=" + length + " blen=" + bytes.length + " boff=" + offset + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length)))); modiValueBig(ri, voffset, length, bytes, offset); c.init(); if (DEBUG_STAT) { ++info.nValues; info.sValues += length; } } final void setImmutable(boolean immutable) { checkInitialization(); clusterChange2.setImmutable(immutable); } final void undoValueEx(int resourceIndex) { checkInitialization(); clusterChange2.undoValueEx(resourceIndex); } final void setDeleted(boolean deleted) { checkInitialization(); clusterChange2.setDeleted(deleted); } final void corrupt() { checkInitialization(); addByte((byte)0); } /** * @param graphSession * @param clusterId * @return true if actually flushed something */ final boolean flush(GraphSession graphSession, ClusterUID clusterUID) { if (byteIndex > 0) { if(DebugPolicy.REPORT_CLUSTER_STREAM) System.err.println("Flush cluster change set stream " + this); setHeaderVectorSize(byteIndex); byte[] copy = new byte[byteIndex + HEADER_SIZE]; System.arraycopy(header, 0, copy, 0, HEADER_SIZE); System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex); UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy); if (DEBUG_CCS) { for (DebugStm stm : debugStms) printlnd(stm.toString2()); debugStms.clear(); } if (DEBUG_STAT) { info.tot = updateClusterFunction.operation.length; printlnd("ReallyFlush: " + info.toString()); sum.add(info); printlnd("ReallyFlush sum: " + sum.toString()); } // long start = System.nanoTime(); graphSession.updateCluster(updateClusterFunction); // long duration = System.nanoTime() - start; // duration2 += duration; // System.err.println("updateCluster " + 1e-9*duration); // System.err.println("updateCluster total " + 1e-9*duration2); clear(); clusterChange2.flush(graphSession); return true; } else if (clusterChange2.isDirty()) { clusterChange2.flush(graphSession); clear(); return true; } else if (flushed) { flushed = false; return true; } else { return true; } } final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) { flush(graphSession, clusterUID); flushed = true; } final class ForeignTable { private final TLongIntHashMap table = new TLongIntHashMap(); private long createKey(short index, long cluster) { assert (cluster <= (1L << 48) - 1); return (cluster << 14) | index; } public int get(short index, long cluster) { int value = table.get(createKey(index, cluster)); if (DEBUG) printlnd("ForeignTable get c=" + clusterUID + " i=" + (value - 1) + " r=" + index + " rc=" + cluster); return value; } public int put(short index, long cluster, int value) { if (DEBUG) printlnd("ForeignTable put c=" + clusterUID + " i=" + (value - 1) + " r=" + index + " rc=" + cluster); return table.put(createKey(index, cluster), value); } public int size() { return table.size(); } public void clear() { table.clear(); } } @Override public int hashCode() { return 31*clusterImpl.getClusterKey(); } @Override public boolean equals(Object object) { if (this == object) return true; else if (object == null) return false; else if (!(object instanceof ClusterChange)) return false; ClusterChange r = (ClusterChange)object; return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey(); } }