1 package org.simantics.acorn.internal;
3 import java.nio.ByteBuffer;
4 import java.util.ArrayList;
5 import java.util.Arrays;
8 import org.simantics.acorn.internal.ClusterStream.ClusterEnum;
9 import org.simantics.acorn.internal.ClusterStream.Data;
10 import org.simantics.acorn.internal.ClusterStream.DebugInfo;
11 import org.simantics.acorn.internal.ClusterStream.OpEnum;
12 import org.simantics.acorn.internal.ClusterStream.StmEnum;
13 import org.simantics.compressions.Compressions;
14 import org.simantics.db.exception.RuntimeDatabaseException;
15 import org.simantics.db.impl.ClusterTraitsBase;
16 import org.simantics.db.procore.cluster.ClusterTraits;
17 import org.simantics.db.procore.cluster.ClusterTraitsSmall;
18 import org.simantics.db.service.Bytes;
19 import org.simantics.db.service.ClusterUID;
20 import org.simantics.utils.datastructures.Pair;
22 import gnu.trove.map.hash.TIntByteHashMap;
23 import gnu.trove.map.hash.TLongIntHashMap;
26 public final class ClusterChange {
28 public static final int VERSION = 1;
29 public static final byte ADD_OPERATION = 2;
30 public static final byte REMOVE_OPERATION = 3;
31 public static final byte DELETE_OPERATION = 5;
33 public static final boolean DEBUG = false;
34 public static final boolean DEBUG_STAT = false;
35 public static final boolean DEBUG_CCS = false;
37 private static DebugInfo sum = new DebugInfo();
39 public final TIntByteHashMap foreignTable = new TIntByteHashMap();
40 private final DebugInfo info;
41 // private final GraphSession graphSession;
42 public final ClusterUID clusterUID;
43 private final int SIZE_OFFSET;
44 // private final int HEADER_SIZE;
45 // How much buffer is used before stream is flushed to server. The bigger the better.
46 public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
47 private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
48 private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
49 private int nextSize = MAX_FIXED_BYTES;
51 private byte[] bytes = null; // Operation data.
52 // private final byte[] header;
53 private boolean flushed = false;
54 private ArrayList<Pair<ClusterUID, byte[]>> stream;
56 // public ClusterImpl clusterImpl;
58 public ClusterChange( ArrayList<Pair<ClusterUID, byte[]>> stream, ClusterUID clusterUID) {
59 this.clusterUID = clusterUID;
60 long[] longs = new long[ClusterUID.getLongLength()];
61 clusterUID.toLong(longs, 0);
63 // this.graphSession = clusterStream.graphSession;
64 info = new DebugInfo();
65 // HEADER_SIZE = 8 + longs.length * 8;
66 // header = new byte[HEADER_SIZE];
68 // Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
69 // Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
70 // for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
71 // Bytes.writeLE(header, offset, longs[i]);
73 // this.clusterStream = clusterStream;
74 // this.clusterChange2 = new ClusterChange2(clusterUID, clusterImpl);
75 // clusterStream.changes.add(this);
78 // private void setHeaderVectorSize(int size) {
80 // throw new RuntimeDatabaseException("Change set size can't be negative.");
81 // int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
82 // Bytes.writeLE(header, SIZE_OFFSET, len);
85 public String toString() {
86 return super.toString() + " cluster=" + clusterUID + " off=" + byteIndex;
88 final public void initBuffer() {
90 if (null == bytes || bytes.length < nextSize) {
91 bytes = new byte[nextSize];
92 nextSize = MAX_FIXED_BYTES;
96 private final void clear() {
97 // if(clusterImpl != null && clusterImpl.change != null)
98 // clusterImpl.change.init();
106 private final void checkInitialization() {
107 // if (0 == byteIndex)
108 // clusterStream.changes.addChange(this);
110 private final void printlnd(String s) {
111 System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
113 public final void createResource(short index) {
114 checkInitialization();
116 printlnd("New ri=" + index + " offset=" + byteIndex);
117 if (index > ClusterTraits.getMaxNumberOfResources())
118 throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
119 checkBufferSpace(null);
120 bytes[byteIndex++] = (byte)52;
121 bytes[byteIndex++] = (byte)index;
122 bytes[byteIndex++] = (byte)(index>>>8);
124 void flushCollect(Change c) {
125 throw new UnsupportedOperationException();
126 // flushInternal(graphSession, clusterUID);
128 // printlnd("Cluster change data was flushed.");
131 // printlnd("Clearing lookup for " + c.toString());
135 // if (null != clusterImpl) {
136 // clusterImpl.foreignLookup = null;
140 private final boolean checkBufferSpace(Change c) {
141 // clusterStream.changes.checkFlush();
142 if(bytes == null) initBuffer();
143 if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
151 private final void checkBufferSpace(int size) {
152 if(bytes == null) initBuffer();
153 if (bytes.length - byteIndex >= size)
155 nextSize = Math.max(MAX_FIXED_BYTES, size);
160 public final void addChange(Change c) {
161 checkInitialization();
163 byte operation = c.op0;
164 if(operation == ADD_OPERATION)
165 addStm(c, StmEnum.Add);
166 else if (operation == REMOVE_OPERATION)
167 addStm(c, StmEnum.Remove);
168 else if (operation == DELETE_OPERATION) {
170 printlnd("Delete value offset=" + byteIndex + " " + c);
171 addByte(OpEnum.Delete.getOrMask());
172 addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
177 private final void addForeignLong(short index, ClusterUID clusterUID) {
178 byteIndex = clusterUID.toByte(bytes, byteIndex);
179 bytes[byteIndex++] = (byte)(index & 0xFF);
180 bytes[byteIndex++] = (byte)(index >>> 8);
183 private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
184 assert(!clusterUID.equals(ClusterUID.Null));
185 short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
186 if (clusterUID.equals(this.clusterUID)) {
187 bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
188 bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
189 return ClusterEnum.Local;
195 foreign = lookup[lookIndex];
197 foreign = foreignTable.get(key);
201 throw new RuntimeDatabaseException("Internal error, contact application support." +
202 "Too big foreing index=" + foreign + " max=256");
204 bytes[byteIndex++] = foreign;
205 return ClusterEnum.ForeignShort;
207 byte position = (byte) (foreignTable.size() + 1);
209 lookup[lookIndex] = position;
210 foreignTable.put(key, position);
212 info.sForeign = foreignTable.size();
213 if (clusterUID.equals(ClusterUID.Null))
214 throw new RuntimeDatabaseException("Internal error, contact application support." +
215 "Cluster unique id not defined for foreing cluster.");
216 addForeignLong(resourceIndex, clusterUID);
217 return ClusterEnum.ForeignLong;
221 private final void addByte(byte b) {
222 bytes[byteIndex++] = b;
225 private final void addShort(short s) {
226 bytes[byteIndex++] = (byte)(s & 0xFF);
227 bytes[byteIndex++] = (byte)(s >>> 8);
230 // private final void addShort(int s) {
231 // bytes[byteIndex++] = (byte) (s & 0xFF);
232 // bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
235 private final void addInt(int i) {
236 // System.err.println("addInt " + i + " " + i);
237 bytes[byteIndex++] = (byte) (i & 0xFF);
238 bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
239 bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
240 bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
241 // buffer.asIntBuffer().put(i);
242 // buffer.position(buffer.position()+4);
245 // private void addLong6(long l) {
246 //// System.err.println("addLong " + l);
247 // bytes[byteIndex++] = (byte) (l & 0xFF);
248 // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
249 // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
250 // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
251 // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
252 // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
253 // // buffer.asLongBuffer().put(l);
254 // // buffer.position(buffer.position() + 6);
257 private void addLong7(long l) {
258 bytes[byteIndex++] = (byte) (l & 0xFF);
259 bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
260 bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
261 bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
262 bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
263 bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
264 bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
265 // buffer.asLongBuffer().put(l);
266 // buffer.position(buffer.position() + 7);
269 // private void addLong(long l) {
270 // bytes[byteIndex++] = (byte) (l & 0xFF);
271 // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
272 // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
273 // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
274 // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
275 // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
276 // bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
277 // bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
279 private final byte bufferPop() {
280 return bytes[--byteIndex];
283 final class DebugStm {
291 DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
301 public String toString() {
302 short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
303 short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
304 short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
305 return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
306 + " pk=" + p + " pi=" + pi + " pc=" + pc
307 + " ok=" + o + " oi=" + oi + " oc=" + oc;
310 public String toString2() {
311 return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
312 + " pc=" + pc + " o=" + o + " oc=" + oc;
315 public String toString3() {
316 short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
317 short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
318 short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
319 return "" + e + " ri=" + ri
320 + " pi=" + pi + " pc=" + pc
321 + " oi=" + oi + " oc=" + oc;
325 private List<DebugStm> debugStms = new ArrayList<DebugStm>();
327 @SuppressWarnings("unused")
328 private final void addStm(Change c, StmEnum stmEnum) {
332 if (DEBUG || DEBUG_CCS) {
333 DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
337 printlnd(d.toString3() + " offset=" + byteIndex);
340 // int opPos = buffer.position();
341 int opPos = byteIndex++;
342 // buffer.put((byte)0); // operation code
347 ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
350 // ForeignShort = byte
352 // ForeignLong = 8 byte
353 if (a != ClusterEnum.ForeignShort) {
358 ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
360 if (b != ClusterEnum.ForeignShort) {
365 int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
366 if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
367 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
368 bytes[byteIndex++] = (byte)ri; // index low byte
370 Data data = ClusterEnum.getData(stmEnum, a, b);
371 int left = 6 - data.bits;
372 int op = ri >>> (8 + left);
374 ri &= (1 << left) - 1;
375 if (a != ClusterEnum.ForeignShort) {
379 if (b != ClusterEnum.ForeignShort) {
383 switch (data.bytes) {
385 throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
387 bytes[byteIndex++] = (byte)(ri & 0xFF);
388 bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
391 bytes[byteIndex++] = (byte)(ri & 0xFF);
397 this.bytes[opPos] = (byte)op;
399 if (stmEnum == StmEnum.Add)
400 bytes[opPos] = (byte)((ri >>> 8) + 64);
402 bytes[opPos] = (byte)((ri >>> 8) + 128);
405 if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
407 } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
413 if (foreignTable.size() > 252)
415 // throw new UnsupportedOperationException();
416 //flushInternal(graphSession, clusterUID);
419 private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
421 printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
422 if (ClusterTraitsBase.isIllegalResourceIndex(ri))
423 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
424 if (value_offset > (1L << 58 - 1))
425 throw new RuntimeDatabaseException("Illegal value offset="
427 if (size < 0 || size > MAX_FIXED_BYTES - 1)
428 throw new RuntimeDatabaseException("Illegal value size=" + size);
429 if (offset + size > bytes.length)
430 throw new RuntimeDatabaseException("Illegal value size=" + size);
431 checkBufferSpace(12 + size);
432 addByte(OpEnum.Modify.getOrMask());
433 ri |= (value_offset >>> 56) << 14; // add top two bits
434 addShort((short) ri);
435 value_offset &= (1L << 56) - 1;
436 addLong7(value_offset);
437 addShort((short) size);
439 System.out.println("Modify value fixed part end offset=" + byteIndex);
440 int copied = Math.min(size, this.bytes.length - byteIndex);
441 System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
446 // private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
447 // checkBufferSpace(0);
448 // int current = Math.min(this.bytes.length - byteIndex - 12, left);
449 // if(current >= 0) {
450 // int written = modiValue(ri, voffset, bytes, offset, current);
451 // voffset += written;
452 // offset += written;
455 //// flushInternal(graphSession, clusterUID);
456 // while (left > 0) {
457 // int length = Math.min(left, (1 << 16) - 1);
459 // printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
460 // int psize = length + 12;
461 //// setHeaderVectorSize(psize);
462 // byte[] message = new byte[psize/*+HEADER_SIZE*/];
463 //// System.arraycopy(header, 0, message, 0, HEADER_SIZE);
465 // Bytes.write(message, to++, OpEnum.Modify.getOrMask());
466 // short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
467 // Bytes.writeLE(message, to, index); to += 2;
468 // Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
469 // Bytes.writeLE(message, to, (short)length); to += 2;
470 // System.arraycopy(bytes, offset, message, to, length);
471 //// graphSession.updateCluster(new UpdateClusterFunction(message));
472 // voffset += length;
478 private final int setValueBig(int ri, byte[] bytes, int length_) {
479 checkBufferSpace(12);
485 int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
487 printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
488 int written = modiValue(ri, voffset, bytes, offset, length);
493 checkBufferSpace(12);
498 private final int setValueSmall(int ri, byte[] bytes, int length) {
499 checkBufferSpace(5 + length);
501 int i = length << 14 | ri;
503 byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
508 addByte(OpEnum.Set.getOrMask());
511 System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
513 int len = byteIndex - pos;
517 final void setValue(short index, byte[] bytes) {
518 setValue(index, bytes, bytes.length);
521 final public void setValue(short index, byte[] bytes, int length) {
522 checkInitialization();
523 if (ClusterTraitsBase.isIllegalResourceIndex(index))
524 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
526 printlnd("Set value ri=" + index
528 + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
531 * The limit for the cluster stream is (1<18)-1 but this avoids the
532 * conversion to big cluster.
534 if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
535 len = setValueBig(index, bytes, length);
537 len = setValueSmall(index, bytes, length);
540 info.sValues += len + length;
544 // final void setValue(Change c, byte[] bytes, int length) {
545 // short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
546 // setValue(ri, bytes, length);
550 // final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
551 // checkInitialization();
552 // int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
554 // printlnd("Modify value ri=" + ri
555 // + " voff=" + voffset
556 // + " vlen=" + length
557 // + " blen=" + bytes.length
558 // + " boff=" + offset
559 // + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
560 // modiValueBig(ri, voffset, length, bytes, offset);
564 // info.sValues += length;
567 final void setImmutable(boolean immutable) {
568 checkInitialization();
569 // clusterChange2.setImmutable(immutable);
571 final void undoValueEx(int resourceIndex) {
572 checkInitialization();
573 // clusterChange2.undoValueEx(resourceIndex);
575 final void setDeleted(boolean deleted) {
576 checkInitialization();
577 // clusterChange2.setDeleted(deleted);
579 final void corrupt() {
580 checkInitialization();
584 public byte[] getBytes() {
585 byte[] copy = new byte[byteIndex];
586 System.arraycopy(bytes, 0, copy, 0, byteIndex);
591 * @param graphSession
593 * @return true if actually flushed something
595 final boolean flush(/*GraphSession graphSession,*/ ClusterUID clusterUID) {
596 throw new UnsupportedOperationException();
597 // if (byteIndex > 0) {
598 // if(DebugPolicy.REPORT_CLUSTER_STREAM)
599 // System.err.println("Flush cluster change set stream " + this);
600 // setHeaderVectorSize(byteIndex);
601 // byte[] copy = new byte[byteIndex + HEADER_SIZE];
602 // System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
603 // System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
604 // UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
606 // for (DebugStm stm : debugStms)
607 // printlnd(stm.toString2());
608 // debugStms.clear();
611 // info.tot = updateClusterFunction.operation.length;
612 // printlnd("ReallyFlush: " + info.toString());
614 // printlnd("ReallyFlush sum: " + sum.toString());
616 // // long start = System.nanoTime();
617 // graphSession.updateCluster(updateClusterFunction);
618 // // long duration = System.nanoTime() - start;
619 // // duration2 += duration;
620 // // System.err.println("updateCluster " + 1e-9*duration);
621 // // System.err.println("updateCluster total " + 1e-9*duration2);
623 // clusterChange2.flush(graphSession);
625 // } else if (clusterChange2.isDirty()) {
626 // clusterChange2.flush(graphSession);
629 // } else if (flushed) {
637 final void flushInternal(ClusterUID clusterUID) {
638 throw new UnsupportedOperationException();
639 // flush(graphSession, clusterUID);
643 final class ForeignTable {
644 private final TLongIntHashMap table = new TLongIntHashMap();
646 private long createKey(short index, long cluster) {
647 assert (cluster <= (1L << 48) - 1);
648 return (cluster << 14) | index;
651 public int get(short index, long cluster) {
652 int value = table.get(createKey(index, cluster));
654 printlnd("ForeignTable get c=" + clusterUID + " i="
655 + (value - 1) + " r=" + index + " rc=" + cluster);
659 public int put(short index, long cluster, int value) {
661 printlnd("ForeignTable put c=" + clusterUID + " i="
662 + (value - 1) + " r=" + index + " rc=" + cluster);
663 return table.put(createKey(index, cluster), value);
670 public void clear() {
676 public int hashCode() {
677 return 31*clusterUID.hashCode();
681 public boolean equals(Object object) {
684 else if (object == null)
686 else if (!(object instanceof ClusterChange))
688 ClusterChange r = (ClusterChange)object;
689 return r.clusterUID.equals(clusterUID);
692 public void flush() {
696 final ClusterUID cuid = clusterUID;
698 byte[] block = getBytes();
699 byte[] raw = new byte[block.length + 28];
700 Bytes.writeLE(raw, 0, 1);
701 System.arraycopy(cuid.asBytes(), 0, raw, 4, 16);
702 Bytes.writeLE(raw, 20, block.length);
703 System.arraycopy(block, 0, raw, 24, block.length);
704 Bytes.writeLE(raw, 24+block.length, 0);
706 ByteBuffer rawBB = ByteBuffer.wrap(raw);
707 ByteBuffer outputBB = ByteBuffer.allocate(raw.length + raw.length/8);
708 //outputBB.order(ByteOrder.LITTLE_ENDIAN);
709 int compressedSize = Compressions.get(Compressions.LZ4).compressBuffer(rawBB, 0, raw.length, outputBB, 0);
712 if(compressedSize < raw.length) {
713 data_ = new byte[compressedSize];
714 outputBB.get(data_,0,compressedSize);
719 byte[] data = new byte[data_.length+24];
720 Bytes.writeLE(data, 0, 0);
721 Bytes.writeLE(data, 4, 0);
722 Bytes.writeLE(data, 8, raw.length);
723 Bytes.writeLE(data, 12, raw.length);
724 Bytes.writeLE(data, 16, data_.length);
725 System.arraycopy(data_, 0, data, 20, data_.length);
726 Bytes.writeLE(data, 20+data_.length, 0);
728 stream.add(Pair.make(clusterUID, data));