package org.simantics.acorn.internal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.simantics.acorn.internal.ClusterStream.ClusterEnum; import org.simantics.acorn.internal.ClusterStream.Data; import org.simantics.acorn.internal.ClusterStream.DebugInfo; import org.simantics.acorn.internal.ClusterStream.OpEnum; import org.simantics.acorn.internal.ClusterStream.StmEnum; import org.simantics.compressions.Compressions; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.ClusterTraitsBase; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.procore.cluster.ClusterTraitsSmall; import org.simantics.db.service.Bytes; import org.simantics.db.service.ClusterUID; import org.simantics.utils.datastructures.Pair; import gnu.trove.map.hash.TIntByteHashMap; import gnu.trove.map.hash.TLongIntHashMap; public final class ClusterChange { public static final int VERSION = 1; public static final byte ADD_OPERATION = 2; public static final byte REMOVE_OPERATION = 3; public static final byte DELETE_OPERATION = 5; public static final boolean DEBUG = false; public static final boolean DEBUG_STAT = false; public static final boolean DEBUG_CCS = false; private static DebugInfo sum = new DebugInfo(); public final TIntByteHashMap foreignTable = new TIntByteHashMap(); private final DebugInfo info; // private final GraphSession graphSession; public final ClusterUID clusterUID; private final int SIZE_OFFSET; // private final int HEADER_SIZE; // How much buffer is used before stream is flushed to server. The bigger the better. public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14); private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16; private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36; private int nextSize = MAX_FIXED_BYTES; int byteIndex = 0; private byte[] bytes = null; // Operation data. // private final byte[] header; private boolean flushed = false; private ArrayList> stream; // public ClusterImpl clusterImpl; public ClusterChange( ArrayList> stream, ClusterUID clusterUID) { this.clusterUID = clusterUID; long[] longs = new long[ClusterUID.getLongLength()]; clusterUID.toLong(longs, 0); this.stream = stream; // this.graphSession = clusterStream.graphSession; info = new DebugInfo(); // HEADER_SIZE = 8 + longs.length * 8; // header = new byte[HEADER_SIZE]; SIZE_OFFSET = 0; // Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later. // Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION); // for (int i=0, offset=8; i>>8); } void flushCollect(Change c) { throw new UnsupportedOperationException(); // flushInternal(graphSession, clusterUID); // if (DEBUG) // printlnd("Cluster change data was flushed."); // if (null != c) { // if (DEBUG) // printlnd("Clearing lookup for " + c.toString()); // c.lookup1 = null; // c.lookup2 = null; // } // if (null != clusterImpl) { // clusterImpl.foreignLookup = null; // } } private final boolean checkBufferSpace(Change c) { // clusterStream.changes.checkFlush(); if(bytes == null) initBuffer(); if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) { return false; } flush(); // initBuffer(); return true; } private final void checkBufferSpace(int size) { if(bytes == null) initBuffer(); if (bytes.length - byteIndex >= size) return; nextSize = Math.max(MAX_FIXED_BYTES, size); flush(); initBuffer(); } public final void addChange(Change c) { checkInitialization(); checkBufferSpace(c); byte operation = c.op0; if(operation == ADD_OPERATION) addStm(c, StmEnum.Add); else if (operation == REMOVE_OPERATION) addStm(c, StmEnum.Remove); else if (operation == DELETE_OPERATION) { if (DEBUG) printlnd("Delete value offset=" + byteIndex + " " + c); addByte(OpEnum.Delete.getOrMask()); addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0)); } c.lastArg = 0; } private final void addForeignLong(short index, ClusterUID clusterUID) { byteIndex = clusterUID.toByte(bytes, byteIndex); bytes[byteIndex++] = (byte)(index & 0xFF); bytes[byteIndex++] = (byte)(index >>> 8); } private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) { assert(!clusterUID.equals(ClusterUID.Null)); short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key); if (clusterUID.equals(this.clusterUID)) { bytes[byteIndex++] = (byte)(resourceIndex & 0xFF); bytes[byteIndex++] = (byte)(resourceIndex >>> 8); return ClusterEnum.Local; } byte foreign = 0; if(lookIndex > 0) { if(lookup != null) foreign = lookup[lookIndex]; } else { foreign = foreignTable.get(key); } if (0 != foreign) { if (foreign > 256) throw new RuntimeDatabaseException("Internal error, contact application support." + "Too big foreing index=" + foreign + " max=256"); --foreign; bytes[byteIndex++] = foreign; return ClusterEnum.ForeignShort; } else { byte position = (byte) (foreignTable.size() + 1); if(lookup != null) lookup[lookIndex] = position; foreignTable.put(key, position); if (DEBUG_STAT) info.sForeign = foreignTable.size(); if (clusterUID.equals(ClusterUID.Null)) throw new RuntimeDatabaseException("Internal error, contact application support." + "Cluster unique id not defined for foreing cluster."); addForeignLong(resourceIndex, clusterUID); return ClusterEnum.ForeignLong; } } private final void addByte(byte b) { bytes[byteIndex++] = b; } private final void addShort(short s) { bytes[byteIndex++] = (byte)(s & 0xFF); bytes[byteIndex++] = (byte)(s >>> 8); } // private final void addShort(int s) { // bytes[byteIndex++] = (byte) (s & 0xFF); // bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF); // } private final void addInt(int i) { // System.err.println("addInt " + i + " " + i); bytes[byteIndex++] = (byte) (i & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF); bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF); // buffer.asIntBuffer().put(i); // buffer.position(buffer.position()+4); } // private void addLong6(long l) { //// System.err.println("addLong " + l); // bytes[byteIndex++] = (byte) (l & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); // // buffer.asLongBuffer().put(l); // // buffer.position(buffer.position() + 6); // } private void addLong7(long l) { bytes[byteIndex++] = (byte) (l & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); // buffer.asLongBuffer().put(l); // buffer.position(buffer.position() + 7); } // private void addLong(long l) { // bytes[byteIndex++] = (byte) (l & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF); // bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF); // } private final byte bufferPop() { return bytes[--byteIndex]; } final class DebugStm { StmEnum e; int r; int p; int o; ClusterUID pc; ClusterUID oc; DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) { this.e = e; this.r = r; this.p = p; this.o = o; this.pc = pc; this.oc = oc; } @Override public String toString() { short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID + " pk=" + p + " pi=" + pi + " pc=" + pc + " ok=" + o + " oi=" + oi + " oc=" + oc; } public String toString2() { return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p + " pc=" + pc + " o=" + o + " oc=" + oc; } public String toString3() { short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r); short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p); short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o); return "" + e + " ri=" + ri + " pi=" + pi + " pc=" + pc + " oi=" + oi + " oc=" + oc; } } private List debugStms = new ArrayList(); @SuppressWarnings("unused") private final void addStm(Change c, StmEnum stmEnum) { if (DEBUG_STAT) ++info.nStms; if (DEBUG || DEBUG_CCS) { DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2); if (DEBUG_CCS) debugStms.add(d); if (DEBUG) { printlnd(d.toString3() + " offset=" + byteIndex); } } // int opPos = buffer.position(); int opPos = byteIndex++; // buffer.put((byte)0); // operation code // addByte((byte)0); boolean done = true; ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1); byte ab = 0; // ForeignShort = byte // Local = short // ForeignLong = 8 byte if (a != ClusterEnum.ForeignShort) { ab = bufferPop(); done = false; } ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2); byte bb = 0; if (b != ClusterEnum.ForeignShort) { bb = bufferPop(); done = false; } int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); if (ClusterTraitsSmall.isIllegalResourceIndex(ri)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); bytes[byteIndex++] = (byte)ri; // index low byte if(!done) { Data data = ClusterEnum.getData(stmEnum, a, b); int left = 6 - data.bits; int op = ri >>> (8 + left); ri >>>= 8; ri &= (1 << left) - 1; if (a != ClusterEnum.ForeignShort) { ri |= ab << left; left += 6; } if (b != ClusterEnum.ForeignShort) { ri |= bb << left; left += 6; } switch (data.bytes) { default: throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes); case 2: bytes[byteIndex++] = (byte)(ri & 0xFF); bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF); break; case 1: bytes[byteIndex++] = (byte)(ri & 0xFF); break; case 0: break; } op |= data.mask; this.bytes[opPos] = (byte)op; } else { if (stmEnum == StmEnum.Add) bytes[opPos] = (byte)((ri >>> 8) + 64); else bytes[opPos] = (byte)((ri >>> 8) + 128); } if (DEBUG_STAT) { if (a == ClusterEnum.Local && b == ClusterEnum.Local) { ++info.nLocal; } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) { ++info.nPartly; } else { ++info.nForeign; } } if (foreignTable.size() > 252) flush(); // throw new UnsupportedOperationException(); //flushInternal(graphSession, clusterUID); } private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) { if (DEBUG) printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length); if (ClusterTraitsBase.isIllegalResourceIndex(ri)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri); if (value_offset > (1L << 58 - 1)) throw new RuntimeDatabaseException("Illegal value offset=" + value_offset); if (size < 0 || size > MAX_FIXED_BYTES - 1) throw new RuntimeDatabaseException("Illegal value size=" + size); if (offset + size > bytes.length) throw new RuntimeDatabaseException("Illegal value size=" + size); checkBufferSpace(12 + size); addByte(OpEnum.Modify.getOrMask()); ri |= (value_offset >>> 56) << 14; // add top two bits addShort((short) ri); value_offset &= (1L << 56) - 1; addLong7(value_offset); addShort((short) size); if (DEBUG) System.out.println("Modify value fixed part end offset=" + byteIndex); int copied = Math.min(size, this.bytes.length - byteIndex); System.arraycopy(bytes, offset, this.bytes, byteIndex, copied); byteIndex += size; return copied; } // private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) { // checkBufferSpace(0); // int current = Math.min(this.bytes.length - byteIndex - 12, left); // if(current >= 0) { // int written = modiValue(ri, voffset, bytes, offset, current); // voffset += written; // offset += written; // left -= written; // } //// flushInternal(graphSession, clusterUID); // while (left > 0) { // int length = Math.min(left, (1 << 16) - 1); // if (DEBUG) // printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length); // int psize = length + 12; //// setHeaderVectorSize(psize); // byte[] message = new byte[psize/*+HEADER_SIZE*/]; //// System.arraycopy(header, 0, message, 0, HEADER_SIZE); // int to = 0; // Bytes.write(message, to++, OpEnum.Modify.getOrMask()); // short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits // Bytes.writeLE(message, to, index); to += 2; // Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7; // Bytes.writeLE(message, to, (short)length); to += 2; // System.arraycopy(bytes, offset, message, to, length); //// graphSession.updateCluster(new UpdateClusterFunction(message)); // voffset += length; // offset += length; // left -= length; // } // } private final int setValueBig(int ri, byte[] bytes, int length_) { checkBufferSpace(12); int sum = 0; int voffset = 0; int offset = 0; int left = length_; while (left > 0) { int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex); if (DEBUG) printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length); int written = modiValue(ri, voffset, bytes, offset, length); sum += written; voffset += written; offset += written; left -= written; checkBufferSpace(12); } return sum; } private final int setValueSmall(int ri, byte[] bytes, int length) { checkBufferSpace(5 + length); int pos = byteIndex; int i = length << 14 | ri; if (length < 32) { byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2); addByte(op); short s = (short) i; addShort(s); } else { addByte(OpEnum.Set.getOrMask()); addInt(i); } System.arraycopy(bytes, 0, this.bytes, byteIndex, length); byteIndex += length; int len = byteIndex - pos; return len; } final void setValue(short index, byte[] bytes) { setValue(index, bytes, bytes.length); } final public void setValue(short index, byte[] bytes, int length) { checkInitialization(); if (ClusterTraitsBase.isIllegalResourceIndex(index)) throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index); if (DEBUG) printlnd("Set value ri=" + index + " len=" + length + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length)))); int len; /* * The limit for the cluster stream is (1<18)-1 but this avoids the * conversion to big cluster. */ if (length > ClusterTraitsSmall.VALUE_SIZE_MAX) len = setValueBig(index, bytes, length); else len = setValueSmall(index, bytes, length); if (DEBUG_STAT) { ++info.nValues; info.sValues += len + length; } } // final void setValue(Change c, byte[] bytes, int length) { // short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); // setValue(ri, bytes, length); // c.initValue(); // } // final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) { // checkInitialization(); // int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0); // if (DEBUG) // printlnd("Modify value ri=" + ri // + " voff=" + voffset // + " vlen=" + length // + " blen=" + bytes.length // + " boff=" + offset // + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length)))); // modiValueBig(ri, voffset, length, bytes, offset); // c.init(); // if (DEBUG_STAT) { // ++info.nValues; // info.sValues += length; // } // } final void setImmutable(boolean immutable) { checkInitialization(); // clusterChange2.setImmutable(immutable); } final void undoValueEx(int resourceIndex) { checkInitialization(); // clusterChange2.undoValueEx(resourceIndex); } final void setDeleted(boolean deleted) { checkInitialization(); // clusterChange2.setDeleted(deleted); } final void corrupt() { checkInitialization(); addByte((byte)0); } public byte[] getBytes() { byte[] copy = new byte[byteIndex]; System.arraycopy(bytes, 0, copy, 0, byteIndex); return copy; } /** * @param graphSession * @param clusterId * @return true if actually flushed something */ final boolean flush(/*GraphSession graphSession,*/ ClusterUID clusterUID) { throw new UnsupportedOperationException(); // if (byteIndex > 0) { // if(DebugPolicy.REPORT_CLUSTER_STREAM) // System.err.println("Flush cluster change set stream " + this); // setHeaderVectorSize(byteIndex); // byte[] copy = new byte[byteIndex + HEADER_SIZE]; // System.arraycopy(header, 0, copy, 0, HEADER_SIZE); // System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex); // UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy); // if (DEBUG_CCS) { // for (DebugStm stm : debugStms) // printlnd(stm.toString2()); // debugStms.clear(); // } // if (DEBUG_STAT) { // info.tot = updateClusterFunction.operation.length; // printlnd("ReallyFlush: " + info.toString()); // sum.add(info); // printlnd("ReallyFlush sum: " + sum.toString()); // } // // long start = System.nanoTime(); // graphSession.updateCluster(updateClusterFunction); // // long duration = System.nanoTime() - start; // // duration2 += duration; // // System.err.println("updateCluster " + 1e-9*duration); // // System.err.println("updateCluster total " + 1e-9*duration2); // clear(); // clusterChange2.flush(graphSession); // return true; // } else if (clusterChange2.isDirty()) { // clusterChange2.flush(graphSession); // clear(); // return true; // } else if (flushed) { // flushed = false; // return true; // } else { // return true; // } } final void flushInternal(ClusterUID clusterUID) { throw new UnsupportedOperationException(); // flush(graphSession, clusterUID); // flushed = true; } final class ForeignTable { private final TLongIntHashMap table = new TLongIntHashMap(); private long createKey(short index, long cluster) { assert (cluster <= (1L << 48) - 1); return (cluster << 14) | index; } public int get(short index, long cluster) { int value = table.get(createKey(index, cluster)); if (DEBUG) printlnd("ForeignTable get c=" + clusterUID + " i=" + (value - 1) + " r=" + index + " rc=" + cluster); return value; } public int put(short index, long cluster, int value) { if (DEBUG) printlnd("ForeignTable put c=" + clusterUID + " i=" + (value - 1) + " r=" + index + " rc=" + cluster); return table.put(createKey(index, cluster), value); } public int size() { return table.size(); } public void clear() { table.clear(); } } @Override public int hashCode() { return 31*clusterUID.hashCode(); } @Override public boolean equals(Object object) { if (this == object) return true; else if (object == null) return false; else if (!(object instanceof ClusterChange)) return false; ClusterChange r = (ClusterChange)object; return r.clusterUID.equals(clusterUID); } public void flush() { if(byteIndex > 0) { final ClusterUID cuid = clusterUID; byte[] block = getBytes(); byte[] raw = new byte[block.length + 28]; Bytes.writeLE(raw, 0, 1); System.arraycopy(cuid.asBytes(), 0, raw, 4, 16); Bytes.writeLE(raw, 20, block.length); System.arraycopy(block, 0, raw, 24, block.length); Bytes.writeLE(raw, 24+block.length, 0); ByteBuffer rawBB = ByteBuffer.wrap(raw); ByteBuffer outputBB = ByteBuffer.allocate(raw.length + raw.length/8); //outputBB.order(ByteOrder.LITTLE_ENDIAN); int compressedSize = Compressions.get(Compressions.LZ4).compressBuffer(rawBB, 0, raw.length, outputBB, 0); byte[] data_ = null; if(compressedSize < raw.length) { data_ = new byte[compressedSize]; outputBB.get(data_,0,compressedSize); } else { data_ = raw; } byte[] data = new byte[data_.length+24]; Bytes.writeLE(data, 0, 0); Bytes.writeLE(data, 4, 0); Bytes.writeLE(data, 8, raw.length); Bytes.writeLE(data, 12, raw.length); Bytes.writeLE(data, 16, data_.length); System.arraycopy(data_, 0, data, 20, data_.length); Bytes.writeLE(data, 20+data_.length, 0); stream.add(Pair.make(clusterUID, data)); clear(); initBuffer(); } } }