]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterChange.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClusterChange.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 import java.util.List;
6
7 import org.simantics.db.exception.RuntimeDatabaseException;
8 import org.simantics.db.impl.ClusterTraitsBase;
9 import org.simantics.db.procore.cluster.ClusterImpl;
10 import org.simantics.db.procore.cluster.ClusterTraits;
11 import org.simantics.db.procore.cluster.ClusterTraitsSmall;
12 import org.simantics.db.service.Bytes;
13 import org.simantics.db.service.ClusterUID;
14
15 import fi.vtt.simantics.procore.DebugPolicy;
16 import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum;
17 import fi.vtt.simantics.procore.internal.ClusterStream.Data;
18 import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo;
19 import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum;
20 import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum;
21 import gnu.trove.map.hash.TIntByteHashMap;
22 import gnu.trove.map.hash.TLongIntHashMap;
23
24 public final class ClusterChange {
25     public static final int VERSION = 1;
26     public static final byte ADD_OPERATION = 2;
27     public static final byte REMOVE_OPERATION = 3;
28     public static final byte DELETE_OPERATION = 5;
29
30     public static final boolean DEBUG = false;
31     public static final boolean DEBUG_STAT = false;
32     public static final boolean DEBUG_CCS = false;
33
34     private static DebugInfo sum = new DebugInfo();
35
36     public final TIntByteHashMap foreignTable = new TIntByteHashMap();
37     private final DebugInfo info;
38     private final GraphSession graphSession;
39     public final ClusterUID clusterUID;
40     private final int SIZE_OFFSET;
41     private final int HEADER_SIZE;
42     // How much buffer is used before stream is flushed to server. The bigger the better.
43     public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
44     private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
45     private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
46     private int nextSize = MAX_FIXED_BYTES;
47     int byteIndex = 0;
48     private byte[] bytes = null; // Operation data.
49     private final byte[] header;
50     private boolean flushed = false;
51     private final ClusterStream clusterStream;
52     private final ClusterChange2 clusterChange2;
53
54     public ClusterImpl clusterImpl;
55
56     public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) {
57         
58         this.clusterImpl = clusterImpl;
59
60         clusterUID = clusterImpl.getClusterUID();
61         long[] longs = new long[ClusterUID.getLongLength()];
62         clusterUID.toLong(longs, 0);
63         this.graphSession = clusterStream.graphSession;
64         info = new DebugInfo();
65         HEADER_SIZE = 8 + longs.length * 8;
66         header = new byte[HEADER_SIZE];
67         SIZE_OFFSET = 0;
68         Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
69         Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
70         for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
71             Bytes.writeLE(header, offset, longs[i]);
72         //initBuffer();
73         this.clusterStream = clusterStream;
74         this.clusterChange2 = new ClusterChange2(clusterUID);
75         clusterStream.changes.add(this);
76     }
77     
78     public void adopt(ClusterImpl impl) {
79         this.clusterImpl = impl;
80     }
81
82     private void setHeaderVectorSize(int size) {
83         if (size < 0)
84             throw new RuntimeDatabaseException("Change set size can't be negative.");
85         int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
86         Bytes.writeLE(header, SIZE_OFFSET, len);
87     }
88     @Override
89     public String toString() {
90         return super.toString() + " cluster=" + clusterUID + " id=" + clusterImpl.getClusterId() + " off=" + byteIndex;
91     }
92     private final void initBuffer() {
93         flushed = false;
94         if (null == bytes || bytes.length < nextSize) {
95             bytes = new byte[nextSize];
96             nextSize = MAX_FIXED_BYTES;
97         }
98         byteIndex = 0;
99     }
100     private final void clear() {
101         if(clusterImpl != null && clusterImpl.change != null)
102                 clusterImpl.change.init();
103         foreignTable.clear();
104         //initBuffer();
105         bytes = null;
106         byteIndex = 0;
107         if (DEBUG_STAT)
108             info.clear();
109     }
110     private final void checkInitialization() {
111         if (0 == byteIndex)
112             clusterStream.changes.addChange(this);
113     }
114     private final void printlnd(String s) {
115         System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
116     }
117     public final void createResource(short index) {
118         checkInitialization();
119         if (DEBUG)
120             printlnd("New ri=" + index + " offset=" + byteIndex);
121         if (index > ClusterTraits.getMaxNumberOfResources())
122             throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
123         checkBufferSpace(null);
124         bytes[byteIndex++] = (byte)52;
125         bytes[byteIndex++] = (byte)index;
126         bytes[byteIndex++] = (byte)(index>>>8);
127     }
128     void flushCollect(Change c) {
129         flushInternal(graphSession, clusterUID);
130         if (DEBUG)
131             printlnd("Cluster change data was flushed.");
132         if (null != c) {
133             if (DEBUG)
134                 printlnd("Clearing lookup for " + c.toString());
135             c.lookup1 = null;
136             c.lookup2 = null;
137         }
138         if (null != clusterImpl) {
139             clusterImpl.foreignLookup = null;
140         }
141     }
142
143     private final boolean checkBufferSpace(Change c) {
144         clusterStream.changes.checkFlush();
145         if(bytes == null) initBuffer();
146         if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
147             return false;
148         }
149         flushCollect(c);
150         initBuffer();
151         return true;
152     }
153
154     private final void checkBufferSpace(int size) {
155         if(bytes == null) initBuffer();
156         if (bytes.length - byteIndex >= size)
157             return;
158         nextSize = Math.max(MAX_FIXED_BYTES, size);
159         flushInternal(graphSession, clusterUID);
160         initBuffer();
161     }
162
163     final void addChange(Change c) {
164         checkInitialization();
165         checkBufferSpace(c);
166         byte operation = c.op0;
167         if(operation == ADD_OPERATION)
168             addStm(c, StmEnum.Add);
169         else if (operation == REMOVE_OPERATION)
170             addStm(c, StmEnum.Remove);
171         else if (operation == DELETE_OPERATION) {
172             if (DEBUG)
173                 printlnd("Delete value offset=" + byteIndex + " " + c);
174             addByte(OpEnum.Delete.getOrMask());
175             addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
176         }
177         c.lastArg = 0;
178     }
179
180     private final void addForeignLong(short index, ClusterUID clusterUID) {
181         byteIndex = clusterUID.toByte(bytes, byteIndex);
182         bytes[byteIndex++] = (byte)(index & 0xFF);
183         bytes[byteIndex++] = (byte)(index >>> 8);
184     }
185
186     private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
187         assert(!clusterUID.equals(ClusterUID.Null));
188         short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
189         if (clusterUID.equals(this.clusterUID)) {
190             bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
191             bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
192             return ClusterEnum.Local;
193         }
194
195         byte foreign = 0;
196         if(lookIndex > 0) {
197             if(lookup != null)
198                 foreign = lookup[lookIndex];
199         } else {
200             foreign = foreignTable.get(key);
201         }
202         if (0 != foreign) {
203             if (foreign > 256)
204                 throw new RuntimeDatabaseException("Internal error." +
205                 "Too big foreign index=" + foreign + " max=256");
206             --foreign;
207             bytes[byteIndex++] = foreign;
208             return ClusterEnum.ForeignShort;
209         } else {
210             byte position = (byte) (foreignTable.size() + 1);
211             if(lookup != null)
212                 lookup[lookIndex] = position;
213             foreignTable.put(key, position);
214             if (DEBUG_STAT)
215                 info.sForeign = foreignTable.size();
216             if (clusterUID.equals(ClusterUID.Null))
217                 throw new RuntimeDatabaseException("Internal error." +
218                 "Cluster unique id not defined for foreign cluster.");
219             addForeignLong(resourceIndex, clusterUID);
220             return ClusterEnum.ForeignLong;
221         }
222     }
223
224     private final void addByte(byte b) {
225         bytes[byteIndex++] = b;
226     }
227
228     private final void addShort(short s) {
229         bytes[byteIndex++] = (byte)(s & 0xFF);
230         bytes[byteIndex++] = (byte)(s >>> 8);
231     }
232
233 //    private final void addShort(int s) {
234 //        bytes[byteIndex++] = (byte) (s & 0xFF);
235 //        bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
236 //    }
237
238     private final void addInt(int i) {
239 //        System.err.println("addInt " + i + " " + i);
240         bytes[byteIndex++] = (byte) (i & 0xFF);
241         bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
242         bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
243         bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
244         // buffer.asIntBuffer().put(i);
245         // buffer.position(buffer.position()+4);
246     }
247
248 //    private void addLong6(long l) {
249 ////        System.err.println("addLong " + l);
250 //        bytes[byteIndex++] = (byte) (l & 0xFF);
251 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
252 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
253 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
254 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
255 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
256 //        // buffer.asLongBuffer().put(l);
257 //        // buffer.position(buffer.position() + 6);
258 //    }
259
260     private void addLong7(long l) {
261         bytes[byteIndex++] = (byte) (l & 0xFF);
262         bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
263         bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
264         bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
265         bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
266         bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
267         bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
268         // buffer.asLongBuffer().put(l);
269         // buffer.position(buffer.position() + 7);
270     }
271
272 //    private void addLong(long l) {
273 //        bytes[byteIndex++] = (byte) (l & 0xFF);
274 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
275 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
276 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
277 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
278 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
279 //        bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
280 //        bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
281 //    }
282     private final byte bufferPop() {
283         return bytes[--byteIndex];
284     }
285
286     final class DebugStm {
287         StmEnum e;
288         int r;
289         int p;
290         int o;
291         ClusterUID pc;
292         ClusterUID oc;
293
294         DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
295             this.e = e;
296             this.r = r;
297             this.p = p;
298             this.o = o;
299             this.pc = pc;
300             this.oc = oc;
301         }
302
303         @Override
304         public String toString() {
305             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
306             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
307             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
308             return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
309             + " pk=" + p + " pi=" + pi + " pc=" + pc
310             + " ok=" + o + " oi=" + oi + " oc=" + oc;
311         }
312
313         public String toString2() {
314             return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
315                     + " pc=" + pc + " o=" + o + " oc=" + oc;
316         }
317
318         public String toString3() {
319             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
320             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
321             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
322             return "" + e + " ri=" + ri
323             + " pi=" + pi + " pc=" + pc
324             + " oi=" + oi + " oc=" + oc;
325         }
326     }
327
328     private List<DebugStm> debugStms = new ArrayList<DebugStm>();
329
330     @SuppressWarnings("unused")
331     private final void addStm(Change c, StmEnum stmEnum) {
332
333         if (DEBUG_STAT)
334             ++info.nStms;
335         if (DEBUG || DEBUG_CCS) {
336             DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
337             if (DEBUG_CCS)
338                 debugStms.add(d);
339             if (DEBUG) {
340                 printlnd(d.toString3() + " offset=" + byteIndex);
341             }
342         }
343         // int opPos = buffer.position();
344         int opPos = byteIndex++;
345         // buffer.put((byte)0); // operation code
346         // addByte((byte)0);
347
348         boolean done = true;
349
350         ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
351         byte ab = 0;
352
353         // ForeignShort = byte
354         // Local = short
355         // ForeignLong = 8 byte
356         if (a != ClusterEnum.ForeignShort) {
357             ab = bufferPop();
358             done = false;
359         }
360
361         ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
362         byte bb = 0;
363         if (b != ClusterEnum.ForeignShort) {
364             bb = bufferPop();
365             done = false;
366         }
367
368         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
369         if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
370             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
371         bytes[byteIndex++] = (byte)ri; // index low byte
372         if(!done) {
373             Data data = ClusterEnum.getData(stmEnum, a, b);
374             int left = 6 - data.bits;
375             int op = ri >>> (8 + left);
376             ri >>>= 8;
377             ri &= (1 << left) - 1;
378             if (a != ClusterEnum.ForeignShort) {
379                 ri |= ab << left;
380                 left += 6;
381             }
382             if (b != ClusterEnum.ForeignShort) {
383                 ri |= bb << left;
384                 left += 6;
385             }
386             switch (data.bytes) {
387                 default:
388                     throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
389                 case 2:
390                     bytes[byteIndex++] = (byte)(ri & 0xFF);
391                     bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
392                     break;
393                 case 1:
394                     bytes[byteIndex++] = (byte)(ri & 0xFF);
395                     break;
396                 case 0:
397                     break;
398             }
399             op |= data.mask;
400             this.bytes[opPos] = (byte)op;
401         } else {
402             if (stmEnum == StmEnum.Add)
403                 bytes[opPos] = (byte)((ri >>> 8) + 64);
404             else
405                 bytes[opPos] = (byte)((ri >>> 8) + 128);
406         }
407         if (DEBUG_STAT) {
408             if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
409                 ++info.nLocal;
410             } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
411                 ++info.nPartly;
412             } else {
413                 ++info.nForeign;
414             }
415         }
416         if (foreignTable.size() > 252)
417             flushInternal(graphSession, clusterUID);
418     }
419
420     private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
421         if (DEBUG)
422             printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
423         if (ClusterTraitsBase.isIllegalResourceIndex(ri))
424             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
425         if (value_offset > (1L << 58 - 1))
426             throw new RuntimeDatabaseException("Illegal value offset="
427                     + value_offset);
428         if (size < 0 || size > MAX_FIXED_BYTES - 1)
429             throw new RuntimeDatabaseException("Illegal value size=" + size);
430         if (offset + size > bytes.length)
431             throw new RuntimeDatabaseException("Illegal value size=" + size);
432         checkBufferSpace(12 + size);
433         addByte(OpEnum.Modify.getOrMask());
434         ri |= (value_offset >>> 56) << 14; // add top two bits
435         addShort((short) ri);
436         value_offset &= (1L << 56) - 1;
437         addLong7(value_offset);
438         addShort((short) size);
439         if (DEBUG)
440             System.out.println("Modify value fixed part end offset=" + byteIndex);
441         int copied = Math.min(size, this.bytes.length - byteIndex);
442         System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
443         byteIndex += size;
444         return copied;
445     }
446
447     private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
448         checkBufferSpace(0);
449         int current = Math.min(this.bytes.length - byteIndex - 12, left);
450         if(current >= 0) {
451             int written = modiValue(ri, voffset, bytes, offset, current);
452             voffset += written;
453             offset += written;
454             left -= written;
455         }
456         flushInternal(graphSession, clusterUID);
457         while (left > 0) {
458             int length = Math.min(left, (1 << 16) - 1);
459             if (DEBUG)
460                 printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
461             int psize = length + 12;
462             setHeaderVectorSize(psize);
463             byte[] message = new byte[psize+HEADER_SIZE];
464             System.arraycopy(header, 0, message, 0, HEADER_SIZE);
465             int to = HEADER_SIZE;
466             Bytes.write(message, to++, OpEnum.Modify.getOrMask());
467             short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
468             Bytes.writeLE(message, to, index); to += 2;
469             Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
470             Bytes.writeLE(message, to, (short)length); to += 2;
471             System.arraycopy(bytes, offset, message, to, length);
472             graphSession.updateCluster(new UpdateClusterFunction(message));
473             voffset += length;
474             offset += length;
475             left -= length;
476         }
477     }
478
479     private final int setValueBig(int ri, byte[] bytes, int length_) {
480         checkBufferSpace(12);
481         int sum = 0;
482         int voffset = 0;
483         int offset = 0;
484         int left = length_;
485         while (left > 0) {
486             int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
487             if (DEBUG)
488                 printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
489             int written = modiValue(ri, voffset, bytes, offset, length);
490             sum += written;
491             voffset += written;
492             offset += written;
493             left -= written;
494             checkBufferSpace(12);
495         }
496         // Possibly truncate file
497         modiValue(ri, length_, new byte[0], 0, 0);
498         return sum;
499     }
500
501     private final int setValueSmall(int ri, byte[] bytes, int length) {
502         checkBufferSpace(5 + length);
503         int pos = byteIndex;
504         int i = length << 14 | ri;
505         if (length < 32) {
506             byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
507             addByte(op);
508             short s = (short) i;
509             addShort(s);
510         } else {
511             addByte(OpEnum.Set.getOrMask());
512             addInt(i);
513         }
514         System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
515         byteIndex += length;
516         int len = byteIndex - pos;
517         return len;
518     }
519
520     final void setValue(short index, byte[] bytes) {
521         setValue(index, bytes, bytes.length);
522     }
523
524     final void setValue(short index, byte[] bytes, int length) {
525         checkInitialization();
526         if (ClusterTraitsBase.isIllegalResourceIndex(index))
527             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
528         if (DEBUG)
529             printlnd("Set value ri=" + index
530                     + " len=" + length
531                     + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
532         int len;
533         /*
534          * The limit for the cluster stream is (1<18)-1 but this avoids the
535          * conversion to big cluster.
536          */
537         if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
538             len = setValueBig(index, bytes, length);
539         else
540             len = setValueSmall(index, bytes, length);
541         if (DEBUG_STAT) {
542             ++info.nValues;
543             info.sValues += len + length;
544         }
545     }
546
547     final void setValue(Change c, byte[] bytes, int length) {
548         short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
549         setValue(ri, bytes, length);
550         c.initValue();
551     }
552
553     final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
554         checkInitialization();
555         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
556         if (DEBUG)
557             printlnd("Modify value ri=" + ri
558                     + " voff=" + voffset
559                     + " vlen=" + length
560                     + " blen=" + bytes.length
561                     + " boff=" + offset
562                     + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
563         modiValueBig(ri, voffset, length, bytes, offset);
564         c.init();
565         if (DEBUG_STAT) {
566             ++info.nValues;
567             info.sValues += length;
568         }
569     }
570     final void setImmutable(boolean immutable) {
571         checkInitialization();
572         clusterChange2.setImmutable(immutable);
573     }
574     final void undoValueEx(int resourceIndex) {
575         checkInitialization();
576         clusterChange2.undoValueEx(resourceIndex);
577     }
578     final void setDeleted(boolean deleted) {
579         checkInitialization();
580         clusterChange2.setDeleted(deleted);
581     }
582     final void corrupt() {
583         checkInitialization();
584         addByte((byte)0);
585     }
586     /**
587      * @param graphSession
588      * @param clusterId
589      * @return true if actually flushed something
590      */
591     final boolean flush(GraphSession graphSession, ClusterUID clusterUID) {
592         if (byteIndex > 0) {
593             if(DebugPolicy.REPORT_CLUSTER_STREAM)
594                 System.err.println("Flush cluster change set stream " + this);
595             setHeaderVectorSize(byteIndex);
596             byte[] copy = new byte[byteIndex + HEADER_SIZE];
597             System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
598             System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
599             UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
600             if (DEBUG_CCS) {
601                 for (DebugStm stm : debugStms)
602                     printlnd(stm.toString2());
603                 debugStms.clear();
604             }
605             if (DEBUG_STAT) {
606                 info.tot = updateClusterFunction.operation.length;
607                 printlnd("ReallyFlush: " + info.toString());
608                 sum.add(info);
609                 printlnd("ReallyFlush sum: " + sum.toString());
610             }
611             // long start = System.nanoTime();
612             graphSession.updateCluster(updateClusterFunction);
613             // long duration = System.nanoTime() - start;
614             // duration2 += duration;
615             // System.err.println("updateCluster " + 1e-9*duration);
616             // System.err.println("updateCluster total " + 1e-9*duration2);
617             clear();
618             clusterChange2.flush(graphSession);
619             return true;
620         } else if (clusterChange2.isDirty()) {
621             clusterChange2.flush(graphSession);
622             clear();
623             return true;
624         } else if (flushed) {
625             flushed = false;
626             return true;
627         } else {
628             return true;
629         }
630     }
631
632     final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) {
633         flush(graphSession, clusterUID);
634         flushed = true;
635     }
636
637     final class ForeignTable {
638         private final TLongIntHashMap table = new TLongIntHashMap();
639
640         private long createKey(short index, long cluster) {
641             assert (cluster <= (1L << 48) - 1);
642             return (cluster << 14) | index;
643         }
644
645         public int get(short index, long cluster) {
646             int value = table.get(createKey(index, cluster));
647             if (DEBUG)
648                 printlnd("ForeignTable get c=" + clusterUID + " i="
649                         + (value - 1) + " r=" + index + " rc=" + cluster);
650             return value;
651         }
652
653         public int put(short index, long cluster, int value) {
654             if (DEBUG)
655                 printlnd("ForeignTable put c=" + clusterUID + " i="
656                         + (value - 1) + " r=" + index + " rc=" + cluster);
657             return table.put(createKey(index, cluster), value);
658         }
659
660         public int size() {
661             return table.size();
662         }
663
664         public void clear() {
665             table.clear();
666         }
667     }
668
669     @Override
670     public int hashCode() {
671         return 31*clusterImpl.getClusterKey();
672     }
673
674     @Override
675     public boolean equals(Object object) {
676         if (this == object)
677             return true;
678         else if (object == null)
679             return false;
680         else if (!(object instanceof ClusterChange))
681             return false;
682         ClusterChange r = (ClusterChange)object;
683         return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey();
684     }
685
686
687 }