1 package fi.vtt.simantics.procore.internal;
3 import java.util.ArrayList;
4 import java.util.Arrays;
7 import org.simantics.db.exception.RuntimeDatabaseException;
8 import org.simantics.db.impl.ClusterTraitsBase;
9 import org.simantics.db.procore.cluster.ClusterImpl;
10 import org.simantics.db.procore.cluster.ClusterTraits;
11 import org.simantics.db.procore.cluster.ClusterTraitsSmall;
12 import org.simantics.db.service.Bytes;
13 import org.simantics.db.service.ClusterUID;
15 import fi.vtt.simantics.procore.DebugPolicy;
16 import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum;
17 import fi.vtt.simantics.procore.internal.ClusterStream.Data;
18 import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo;
19 import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum;
20 import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum;
21 import gnu.trove.map.hash.TIntByteHashMap;
22 import gnu.trove.map.hash.TLongIntHashMap;
24 public final class ClusterChange {
25 public static final int VERSION = 1;
26 public static final byte ADD_OPERATION = 2;
27 public static final byte REMOVE_OPERATION = 3;
28 public static final byte DELETE_OPERATION = 5;
30 public static final boolean DEBUG = false;
31 public static final boolean DEBUG_STAT = false;
32 public static final boolean DEBUG_CCS = false;
34 private static DebugInfo sum = new DebugInfo();
36 public final TIntByteHashMap foreignTable = new TIntByteHashMap();
37 private final DebugInfo info;
38 private final GraphSession graphSession;
39 public final ClusterUID clusterUID;
40 private final int SIZE_OFFSET;
41 private final int HEADER_SIZE;
42 // How much buffer is used before stream is flushed to server. The bigger the better.
43 public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
44 private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
45 private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
46 private int nextSize = MAX_FIXED_BYTES;
48 private byte[] bytes = null; // Operation data.
49 private final byte[] header;
50 private boolean flushed = false;
51 private final ClusterStream clusterStream;
52 private final ClusterChange2 clusterChange2;
54 public ClusterImpl clusterImpl;
56 public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) {
58 this.clusterImpl = clusterImpl;
60 if (!clusterImpl.isLoaded())
61 new IllegalStateException("Change to proxy cluster " + clusterImpl.getClusterUID()).printStackTrace();
63 clusterUID = clusterImpl.getClusterUID();
64 long[] longs = new long[ClusterUID.getLongLength()];
65 clusterUID.toLong(longs, 0);
66 this.graphSession = clusterStream.graphSession;
67 info = new DebugInfo();
68 HEADER_SIZE = 8 + longs.length * 8;
69 header = new byte[HEADER_SIZE];
71 Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
72 Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
73 for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
74 Bytes.writeLE(header, offset, longs[i]);
76 this.clusterStream = clusterStream;
77 this.clusterChange2 = new ClusterChange2(clusterUID);
78 clusterStream.changes.add(this);
81 public void adopt(ClusterImpl impl) {
82 this.clusterImpl = impl;
85 private void setHeaderVectorSize(int size) {
87 throw new RuntimeDatabaseException("Change set size can't be negative.");
88 int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
89 Bytes.writeLE(header, SIZE_OFFSET, len);
92 public String toString() {
93 return super.toString() + " cluster=" + clusterUID + " id=" + clusterImpl.getClusterId() + " off=" + byteIndex;
95 private final void initBuffer() {
97 if (null == bytes || bytes.length < nextSize) {
98 bytes = new byte[nextSize];
99 nextSize = MAX_FIXED_BYTES;
103 private final void clear() {
104 if(clusterImpl != null && clusterImpl.change != null)
105 clusterImpl.change.init();
106 foreignTable.clear();
113 private final void checkInitialization() {
115 clusterStream.changes.addChange(this);
117 private final void printlnd(String s) {
118 System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
120 public final void createResource(short index) {
121 checkInitialization();
123 printlnd("New ri=" + index + " offset=" + byteIndex);
124 if (index > ClusterTraits.getMaxNumberOfResources())
125 throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
126 checkBufferSpace(null);
127 bytes[byteIndex++] = (byte)52;
128 bytes[byteIndex++] = (byte)index;
129 bytes[byteIndex++] = (byte)(index>>>8);
131 void flushCollect(Change c) {
132 flushInternal(graphSession, clusterUID);
134 printlnd("Cluster change data was flushed.");
137 printlnd("Clearing lookup for " + c.toString());
141 if (null != clusterImpl) {
142 clusterImpl.foreignLookup = null;
146 private final boolean checkBufferSpace(Change c) {
147 clusterStream.changes.checkFlush();
148 if(bytes == null) initBuffer();
149 if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
157 private final void checkBufferSpace(int size) {
158 if(bytes == null) initBuffer();
159 if (bytes.length - byteIndex >= size)
161 nextSize = Math.max(MAX_FIXED_BYTES, size);
162 flushInternal(graphSession, clusterUID);
166 final void addChange(Change c) {
167 checkInitialization();
169 byte operation = c.op0;
170 if(operation == ADD_OPERATION)
171 addStm(c, StmEnum.Add);
172 else if (operation == REMOVE_OPERATION)
173 addStm(c, StmEnum.Remove);
174 else if (operation == DELETE_OPERATION) {
176 printlnd("Delete value offset=" + byteIndex + " " + c);
177 addByte(OpEnum.Delete.getOrMask());
178 addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
183 private final void addForeignLong(short index, ClusterUID clusterUID) {
184 byteIndex = clusterUID.toByte(bytes, byteIndex);
185 bytes[byteIndex++] = (byte)(index & 0xFF);
186 bytes[byteIndex++] = (byte)(index >>> 8);
189 private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
190 assert(!clusterUID.equals(ClusterUID.Null));
191 short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
192 if (clusterUID.equals(this.clusterUID)) {
193 bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
194 bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
195 return ClusterEnum.Local;
201 foreign = lookup[lookIndex];
203 foreign = foreignTable.get(key);
207 throw new RuntimeDatabaseException("Internal error, contact application support." +
208 "Too big foreing index=" + foreign + " max=256");
210 bytes[byteIndex++] = foreign;
211 return ClusterEnum.ForeignShort;
213 byte position = (byte) (foreignTable.size() + 1);
215 lookup[lookIndex] = position;
216 foreignTable.put(key, position);
218 info.sForeign = foreignTable.size();
219 if (clusterUID.equals(ClusterUID.Null))
220 throw new RuntimeDatabaseException("Internal error, contact application support." +
221 "Cluster unique id not defined for foreing cluster.");
222 addForeignLong(resourceIndex, clusterUID);
223 return ClusterEnum.ForeignLong;
227 private final void addByte(byte b) {
228 bytes[byteIndex++] = b;
231 private final void addShort(short s) {
232 bytes[byteIndex++] = (byte)(s & 0xFF);
233 bytes[byteIndex++] = (byte)(s >>> 8);
236 // private final void addShort(int s) {
237 // bytes[byteIndex++] = (byte) (s & 0xFF);
238 // bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
241 private final void addInt(int i) {
242 // System.err.println("addInt " + i + " " + i);
243 bytes[byteIndex++] = (byte) (i & 0xFF);
244 bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
245 bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
246 bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
247 // buffer.asIntBuffer().put(i);
248 // buffer.position(buffer.position()+4);
251 // private void addLong6(long l) {
252 //// System.err.println("addLong " + l);
253 // bytes[byteIndex++] = (byte) (l & 0xFF);
254 // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
255 // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
256 // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
257 // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
258 // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
259 // // buffer.asLongBuffer().put(l);
260 // // buffer.position(buffer.position() + 6);
263 private void addLong7(long l) {
264 bytes[byteIndex++] = (byte) (l & 0xFF);
265 bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
266 bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
267 bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
268 bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
269 bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
270 bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
271 // buffer.asLongBuffer().put(l);
272 // buffer.position(buffer.position() + 7);
275 // private void addLong(long l) {
276 // bytes[byteIndex++] = (byte) (l & 0xFF);
277 // bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
278 // bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
279 // bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
280 // bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
281 // bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
282 // bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
283 // bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
285 private final byte bufferPop() {
286 return bytes[--byteIndex];
289 final class DebugStm {
297 DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
307 public String toString() {
308 short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
309 short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
310 short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
311 return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
312 + " pk=" + p + " pi=" + pi + " pc=" + pc
313 + " ok=" + o + " oi=" + oi + " oc=" + oc;
316 public String toString2() {
317 return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
318 + " pc=" + pc + " o=" + o + " oc=" + oc;
321 public String toString3() {
322 short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
323 short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
324 short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
325 return "" + e + " ri=" + ri
326 + " pi=" + pi + " pc=" + pc
327 + " oi=" + oi + " oc=" + oc;
331 private List<DebugStm> debugStms = new ArrayList<DebugStm>();
333 @SuppressWarnings("unused")
334 private final void addStm(Change c, StmEnum stmEnum) {
338 if (DEBUG || DEBUG_CCS) {
339 DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
343 printlnd(d.toString3() + " offset=" + byteIndex);
346 // int opPos = buffer.position();
347 int opPos = byteIndex++;
348 // buffer.put((byte)0); // operation code
353 ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
356 // ForeignShort = byte
358 // ForeignLong = 8 byte
359 if (a != ClusterEnum.ForeignShort) {
364 ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
366 if (b != ClusterEnum.ForeignShort) {
371 int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
372 if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
373 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
374 bytes[byteIndex++] = (byte)ri; // index low byte
376 Data data = ClusterEnum.getData(stmEnum, a, b);
377 int left = 6 - data.bits;
378 int op = ri >>> (8 + left);
380 ri &= (1 << left) - 1;
381 if (a != ClusterEnum.ForeignShort) {
385 if (b != ClusterEnum.ForeignShort) {
389 switch (data.bytes) {
391 throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
393 bytes[byteIndex++] = (byte)(ri & 0xFF);
394 bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
397 bytes[byteIndex++] = (byte)(ri & 0xFF);
403 this.bytes[opPos] = (byte)op;
405 if (stmEnum == StmEnum.Add)
406 bytes[opPos] = (byte)((ri >>> 8) + 64);
408 bytes[opPos] = (byte)((ri >>> 8) + 128);
411 if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
413 } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
419 if (foreignTable.size() > 252)
420 flushInternal(graphSession, clusterUID);
423 private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
425 printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
426 if (ClusterTraitsBase.isIllegalResourceIndex(ri))
427 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
428 if (value_offset > (1L << 58 - 1))
429 throw new RuntimeDatabaseException("Illegal value offset="
431 if (size < 0 || size > MAX_FIXED_BYTES - 1)
432 throw new RuntimeDatabaseException("Illegal value size=" + size);
433 if (offset + size > bytes.length)
434 throw new RuntimeDatabaseException("Illegal value size=" + size);
435 checkBufferSpace(12 + size);
436 addByte(OpEnum.Modify.getOrMask());
437 ri |= (value_offset >>> 56) << 14; // add top two bits
438 addShort((short) ri);
439 value_offset &= (1L << 56) - 1;
440 addLong7(value_offset);
441 addShort((short) size);
443 System.out.println("Modify value fixed part end offset=" + byteIndex);
444 int copied = Math.min(size, this.bytes.length - byteIndex);
445 System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
450 private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
452 int current = Math.min(this.bytes.length - byteIndex - 12, left);
454 int written = modiValue(ri, voffset, bytes, offset, current);
459 flushInternal(graphSession, clusterUID);
461 int length = Math.min(left, (1 << 16) - 1);
463 printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
464 int psize = length + 12;
465 setHeaderVectorSize(psize);
466 byte[] message = new byte[psize+HEADER_SIZE];
467 System.arraycopy(header, 0, message, 0, HEADER_SIZE);
468 int to = HEADER_SIZE;
469 Bytes.write(message, to++, OpEnum.Modify.getOrMask());
470 short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
471 Bytes.writeLE(message, to, index); to += 2;
472 Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
473 Bytes.writeLE(message, to, (short)length); to += 2;
474 System.arraycopy(bytes, offset, message, to, length);
475 graphSession.updateCluster(new UpdateClusterFunction(message));
482 private final int setValueBig(int ri, byte[] bytes, int length_) {
483 checkBufferSpace(12);
489 int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
491 printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
492 int written = modiValue(ri, voffset, bytes, offset, length);
497 checkBufferSpace(12);
502 private final int setValueSmall(int ri, byte[] bytes, int length) {
503 checkBufferSpace(5 + length);
505 int i = length << 14 | ri;
507 byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
512 addByte(OpEnum.Set.getOrMask());
515 System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
517 int len = byteIndex - pos;
521 final void setValue(short index, byte[] bytes) {
522 setValue(index, bytes, bytes.length);
525 final void setValue(short index, byte[] bytes, int length) {
526 checkInitialization();
527 if (ClusterTraitsBase.isIllegalResourceIndex(index))
528 throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
530 printlnd("Set value ri=" + index
532 + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
535 * The limit for the cluster stream is (1<18)-1 but this avoids the
536 * conversion to big cluster.
538 if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
539 len = setValueBig(index, bytes, length);
541 len = setValueSmall(index, bytes, length);
544 info.sValues += len + length;
548 final void setValue(Change c, byte[] bytes, int length) {
549 short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
550 setValue(ri, bytes, length);
554 final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
555 checkInitialization();
556 int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
558 printlnd("Modify value ri=" + ri
561 + " blen=" + bytes.length
563 + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
564 modiValueBig(ri, voffset, length, bytes, offset);
568 info.sValues += length;
571 final void setImmutable(boolean immutable) {
572 checkInitialization();
573 clusterChange2.setImmutable(immutable);
575 final void undoValueEx(int resourceIndex) {
576 checkInitialization();
577 clusterChange2.undoValueEx(resourceIndex);
579 final void setDeleted(boolean deleted) {
580 checkInitialization();
581 clusterChange2.setDeleted(deleted);
583 final void corrupt() {
584 checkInitialization();
588 * @param graphSession
590 * @return true if actually flushed something
592 final boolean flush(GraphSession graphSession, ClusterUID clusterUID) {
594 if(DebugPolicy.REPORT_CLUSTER_STREAM)
595 System.err.println("Flush cluster change set stream " + this);
596 setHeaderVectorSize(byteIndex);
597 byte[] copy = new byte[byteIndex + HEADER_SIZE];
598 System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
599 System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
600 UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
602 for (DebugStm stm : debugStms)
603 printlnd(stm.toString2());
607 info.tot = updateClusterFunction.operation.length;
608 printlnd("ReallyFlush: " + info.toString());
610 printlnd("ReallyFlush sum: " + sum.toString());
612 // long start = System.nanoTime();
613 graphSession.updateCluster(updateClusterFunction);
614 // long duration = System.nanoTime() - start;
615 // duration2 += duration;
616 // System.err.println("updateCluster " + 1e-9*duration);
617 // System.err.println("updateCluster total " + 1e-9*duration2);
619 clusterChange2.flush(graphSession);
621 } else if (clusterChange2.isDirty()) {
622 clusterChange2.flush(graphSession);
625 } else if (flushed) {
633 final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) {
634 flush(graphSession, clusterUID);
638 final class ForeignTable {
639 private final TLongIntHashMap table = new TLongIntHashMap();
641 private long createKey(short index, long cluster) {
642 assert (cluster <= (1L << 48) - 1);
643 return (cluster << 14) | index;
646 public int get(short index, long cluster) {
647 int value = table.get(createKey(index, cluster));
649 printlnd("ForeignTable get c=" + clusterUID + " i="
650 + (value - 1) + " r=" + index + " rc=" + cluster);
654 public int put(short index, long cluster, int value) {
656 printlnd("ForeignTable put c=" + clusterUID + " i="
657 + (value - 1) + " r=" + index + " rc=" + cluster);
658 return table.put(createKey(index, cluster), value);
665 public void clear() {
671 public int hashCode() {
672 return 31*clusterImpl.getClusterKey();
676 public boolean equals(Object object) {
679 else if (object == null)
681 else if (!(object instanceof ClusterChange))
683 ClusterChange r = (ClusterChange)object;
684 return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey();