DB client state gets corrupted
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClusterChange.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 import java.util.List;
6
7 import org.simantics.db.exception.RuntimeDatabaseException;
8 import org.simantics.db.impl.ClusterTraitsBase;
9 import org.simantics.db.procore.cluster.ClusterImpl;
10 import org.simantics.db.procore.cluster.ClusterTraits;
11 import org.simantics.db.procore.cluster.ClusterTraitsSmall;
12 import org.simantics.db.service.Bytes;
13 import org.simantics.db.service.ClusterUID;
14
15 import fi.vtt.simantics.procore.DebugPolicy;
16 import fi.vtt.simantics.procore.internal.ClusterStream.ClusterEnum;
17 import fi.vtt.simantics.procore.internal.ClusterStream.Data;
18 import fi.vtt.simantics.procore.internal.ClusterStream.DebugInfo;
19 import fi.vtt.simantics.procore.internal.ClusterStream.OpEnum;
20 import fi.vtt.simantics.procore.internal.ClusterStream.StmEnum;
21 import gnu.trove.map.hash.TIntByteHashMap;
22 import gnu.trove.map.hash.TLongIntHashMap;
23
24 public final class ClusterChange {
25     public static final int VERSION = 1;
26     public static final byte ADD_OPERATION = 2;
27     public static final byte REMOVE_OPERATION = 3;
28     public static final byte DELETE_OPERATION = 5;
29
30     public static final boolean DEBUG = false;
31     public static final boolean DEBUG_STAT = false;
32     public static final boolean DEBUG_CCS = false;
33
34     private static DebugInfo sum = new DebugInfo();
35
36     public final TIntByteHashMap foreignTable = new TIntByteHashMap();
37     private final DebugInfo info;
38     private final GraphSession graphSession;
39     public final ClusterUID clusterUID;
40     private final int SIZE_OFFSET;
41     private final int HEADER_SIZE;
42     // How much buffer is used before stream is flushed to server. The bigger the better.
43     public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
44     private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
45     private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
46     private int nextSize = MAX_FIXED_BYTES;
47     int byteIndex = 0;
48     private byte[] bytes = null; // Operation data.
49     private final byte[] header;
50     private boolean flushed = false;
51     private final ClusterStream clusterStream;
52     private final ClusterChange2 clusterChange2;
53
54     public ClusterImpl clusterImpl;
55
56     public ClusterChange(ClusterStream clusterStream, ClusterImpl clusterImpl) {
57         this.clusterImpl = clusterImpl;
58         clusterUID = clusterImpl.getClusterUID();
59         long[] longs = new long[ClusterUID.getLongLength()];
60         clusterUID.toLong(longs, 0);
61         this.graphSession = clusterStream.graphSession;
62         info = new DebugInfo();
63         HEADER_SIZE = 8 + longs.length * 8;
64         header = new byte[HEADER_SIZE];
65         SIZE_OFFSET = 0;
66         Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
67         Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
68         for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
69             Bytes.writeLE(header, offset, longs[i]);
70         //initBuffer();
71         this.clusterStream = clusterStream;
72         this.clusterChange2 = new ClusterChange2(clusterUID, clusterImpl);
73         clusterStream.changes.add(this);
74     }
75
76     private void setHeaderVectorSize(int size) {
77         if (size < 0)
78             throw new RuntimeDatabaseException("Change set size can't be negative.");
79         int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
80         Bytes.writeLE(header, SIZE_OFFSET, len);
81     }
82     @Override
83     public String toString() {
84         return super.toString() + " cluster=" + clusterImpl.getClusterUID() + " id=" + clusterImpl.getClusterId() + " off=" + byteIndex;
85     }
86     private final void initBuffer() {
87         flushed = false;
88         if (null == bytes || bytes.length < nextSize) {
89             bytes = new byte[nextSize];
90             nextSize = MAX_FIXED_BYTES;
91         }
92         byteIndex = 0;
93     }
94     private final void clear() {
95         if(clusterImpl != null && clusterImpl.change != null)
96                 clusterImpl.change.init();
97         foreignTable.clear();
98         //initBuffer();
99         bytes = null;
100         byteIndex = 0;
101         if (DEBUG_STAT)
102             info.clear();
103     }
104     private final void checkInitialization() {
105         if (0 == byteIndex)
106             clusterStream.changes.addChange(this);
107     }
108     private final void printlnd(String s) {
109         System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
110     }
111     public final void createResource(short index) {
112         checkInitialization();
113         if (DEBUG)
114             printlnd("New ri=" + index + " offset=" + byteIndex);
115         if (index > ClusterTraits.getMaxNumberOfResources())
116             throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
117         checkBufferSpace(null);
118         bytes[byteIndex++] = (byte)52;
119         bytes[byteIndex++] = (byte)index;
120         bytes[byteIndex++] = (byte)(index>>>8);
121     }
122     void flushCollect(Change c) {
123         flushInternal(graphSession, clusterUID);
124         if (DEBUG)
125             printlnd("Cluster change data was flushed.");
126         if (null != c) {
127             if (DEBUG)
128                 printlnd("Clearing lookup for " + c.toString());
129             c.lookup1 = null;
130             c.lookup2 = null;
131         }
132         if (null != clusterImpl) {
133             clusterImpl.foreignLookup = null;
134         }
135     }
136
137     private final boolean checkBufferSpace(Change c) {
138         clusterStream.changes.checkFlush();
139         if(bytes == null) initBuffer();
140         if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
141             return false;
142         }
143         flushCollect(c);
144         initBuffer();
145         return true;
146     }
147
148     private final void checkBufferSpace(int size) {
149         if(bytes == null) initBuffer();
150         if (bytes.length - byteIndex >= size)
151             return;
152         nextSize = Math.max(MAX_FIXED_BYTES, size);
153         flushInternal(graphSession, clusterUID);
154         initBuffer();
155     }
156
157     final void addChange(Change c) {
158         checkInitialization();
159         checkBufferSpace(c);
160         byte operation = c.op0;
161         if(operation == ADD_OPERATION)
162             addStm(c, StmEnum.Add);
163         else if (operation == REMOVE_OPERATION)
164             addStm(c, StmEnum.Remove);
165         else if (operation == DELETE_OPERATION) {
166             if (DEBUG)
167                 printlnd("Delete value offset=" + byteIndex + " " + c);
168             addByte(OpEnum.Delete.getOrMask());
169             addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
170         }
171         c.lastArg = 0;
172     }
173
174     private final void addForeignLong(short index, ClusterUID clusterUID) {
175         byteIndex = clusterUID.toByte(bytes, byteIndex);
176         bytes[byteIndex++] = (byte)(index & 0xFF);
177         bytes[byteIndex++] = (byte)(index >>> 8);
178     }
179
180     private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
181         assert(!clusterUID.equals(ClusterUID.Null));
182         short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
183         if (clusterUID.equals(this.clusterUID)) {
184             bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
185             bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
186             return ClusterEnum.Local;
187         }
188
189         byte foreign = 0;
190         if(lookIndex > 0) {
191             if(lookup != null)
192                 foreign = lookup[lookIndex];
193         } else {
194             foreign = foreignTable.get(key);
195         }
196         if (0 != foreign) {
197             if (foreign > 256)
198                 throw new RuntimeDatabaseException("Internal error, contact application support." +
199                 "Too big foreing index=" + foreign + " max=256");
200             --foreign;
201             bytes[byteIndex++] = foreign;
202             return ClusterEnum.ForeignShort;
203         } else {
204             byte position = (byte) (foreignTable.size() + 1);
205             if(lookup != null)
206                 lookup[lookIndex] = position;
207             foreignTable.put(key, position);
208             if (DEBUG_STAT)
209                 info.sForeign = foreignTable.size();
210             if (clusterUID.equals(ClusterUID.Null))
211                 throw new RuntimeDatabaseException("Internal error, contact application support." +
212                 "Cluster unique id not defined for foreing cluster.");
213             addForeignLong(resourceIndex, clusterUID);
214             return ClusterEnum.ForeignLong;
215         }
216     }
217
218     private final void addByte(byte b) {
219         bytes[byteIndex++] = b;
220     }
221
222     private final void addShort(short s) {
223         bytes[byteIndex++] = (byte)(s & 0xFF);
224         bytes[byteIndex++] = (byte)(s >>> 8);
225     }
226
227 //    private final void addShort(int s) {
228 //        bytes[byteIndex++] = (byte) (s & 0xFF);
229 //        bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
230 //    }
231
232     private final void addInt(int i) {
233 //        System.err.println("addInt " + i + " " + i);
234         bytes[byteIndex++] = (byte) (i & 0xFF);
235         bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
236         bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
237         bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
238         // buffer.asIntBuffer().put(i);
239         // buffer.position(buffer.position()+4);
240     }
241
242 //    private void addLong6(long l) {
243 ////        System.err.println("addLong " + l);
244 //        bytes[byteIndex++] = (byte) (l & 0xFF);
245 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
246 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
247 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
248 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
249 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
250 //        // buffer.asLongBuffer().put(l);
251 //        // buffer.position(buffer.position() + 6);
252 //    }
253
254     private void addLong7(long l) {
255         bytes[byteIndex++] = (byte) (l & 0xFF);
256         bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
257         bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
258         bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
259         bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
260         bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
261         bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
262         // buffer.asLongBuffer().put(l);
263         // buffer.position(buffer.position() + 7);
264     }
265
266 //    private void addLong(long l) {
267 //        bytes[byteIndex++] = (byte) (l & 0xFF);
268 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
269 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
270 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
271 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
272 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
273 //        bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
274 //        bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
275 //    }
276     private final byte bufferPop() {
277         return bytes[--byteIndex];
278     }
279
280     final class DebugStm {
281         StmEnum e;
282         int r;
283         int p;
284         int o;
285         ClusterUID pc;
286         ClusterUID oc;
287
288         DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
289             this.e = e;
290             this.r = r;
291             this.p = p;
292             this.o = o;
293             this.pc = pc;
294             this.oc = oc;
295         }
296
297         @Override
298         public String toString() {
299             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
300             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
301             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
302             return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
303             + " pk=" + p + " pi=" + pi + " pc=" + pc
304             + " ok=" + o + " oi=" + oi + " oc=" + oc;
305         }
306
307         public String toString2() {
308             return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
309                     + " pc=" + pc + " o=" + o + " oc=" + oc;
310         }
311
312         public String toString3() {
313             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
314             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
315             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
316             return "" + e + " ri=" + ri
317             + " pi=" + pi + " pc=" + pc
318             + " oi=" + oi + " oc=" + oc;
319         }
320     }
321
322     private List<DebugStm> debugStms = new ArrayList<DebugStm>();
323
324     @SuppressWarnings("unused")
325     private final void addStm(Change c, StmEnum stmEnum) {
326
327         if (DEBUG_STAT)
328             ++info.nStms;
329         if (DEBUG || DEBUG_CCS) {
330             DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
331             if (DEBUG_CCS)
332                 debugStms.add(d);
333             if (DEBUG) {
334                 printlnd(d.toString3() + " offset=" + byteIndex);
335             }
336         }
337         // int opPos = buffer.position();
338         int opPos = byteIndex++;
339         // buffer.put((byte)0); // operation code
340         // addByte((byte)0);
341
342         boolean done = true;
343
344         ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
345         byte ab = 0;
346
347         // ForeignShort = byte
348         // Local = short
349         // ForeignLong = 8 byte
350         if (a != ClusterEnum.ForeignShort) {
351             ab = bufferPop();
352             done = false;
353         }
354
355         ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
356         byte bb = 0;
357         if (b != ClusterEnum.ForeignShort) {
358             bb = bufferPop();
359             done = false;
360         }
361
362         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
363         if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
364             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
365         bytes[byteIndex++] = (byte)ri; // index low byte
366         if(!done) {
367             Data data = ClusterEnum.getData(stmEnum, a, b);
368             int left = 6 - data.bits;
369             int op = ri >>> (8 + left);
370             ri >>>= 8;
371             ri &= (1 << left) - 1;
372             if (a != ClusterEnum.ForeignShort) {
373                 ri |= ab << left;
374                 left += 6;
375             }
376             if (b != ClusterEnum.ForeignShort) {
377                 ri |= bb << left;
378                 left += 6;
379             }
380             switch (data.bytes) {
381                 default:
382                     throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
383                 case 2:
384                     bytes[byteIndex++] = (byte)(ri & 0xFF);
385                     bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
386                     break;
387                 case 1:
388                     bytes[byteIndex++] = (byte)(ri & 0xFF);
389                     break;
390                 case 0:
391                     break;
392             }
393             op |= data.mask;
394             this.bytes[opPos] = (byte)op;
395         } else {
396             if (stmEnum == StmEnum.Add)
397                 bytes[opPos] = (byte)((ri >>> 8) + 64);
398             else
399                 bytes[opPos] = (byte)((ri >>> 8) + 128);
400         }
401         if (DEBUG_STAT) {
402             if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
403                 ++info.nLocal;
404             } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
405                 ++info.nPartly;
406             } else {
407                 ++info.nForeign;
408             }
409         }
410         if (foreignTable.size() > 252)
411             flushInternal(graphSession, clusterUID);
412     }
413
414     private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
415         if (DEBUG)
416             printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
417         if (ClusterTraitsBase.isIllegalResourceIndex(ri))
418             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
419         if (value_offset > (1L << 58 - 1))
420             throw new RuntimeDatabaseException("Illegal value offset="
421                     + value_offset);
422         if (size < 0 || size > MAX_FIXED_BYTES - 1)
423             throw new RuntimeDatabaseException("Illegal value size=" + size);
424         if (offset + size > bytes.length)
425             throw new RuntimeDatabaseException("Illegal value size=" + size);
426         checkBufferSpace(12 + size);
427         addByte(OpEnum.Modify.getOrMask());
428         ri |= (value_offset >>> 56) << 14; // add top two bits
429         addShort((short) ri);
430         value_offset &= (1L << 56) - 1;
431         addLong7(value_offset);
432         addShort((short) size);
433         if (DEBUG)
434             System.out.println("Modify value fixed part end offset=" + byteIndex);
435         int copied = Math.min(size, this.bytes.length - byteIndex);
436         System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
437         byteIndex += size;
438         return copied;
439     }
440
441     private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
442         checkBufferSpace(0);
443         int current = Math.min(this.bytes.length - byteIndex - 12, left);
444         if(current >= 0) {
445             int written = modiValue(ri, voffset, bytes, offset, current);
446             voffset += written;
447             offset += written;
448             left -= written;
449         }
450         flushInternal(graphSession, clusterUID);
451         while (left > 0) {
452             int length = Math.min(left, (1 << 16) - 1);
453             if (DEBUG)
454                 printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
455             int psize = length + 12;
456             setHeaderVectorSize(psize);
457             byte[] message = new byte[psize+HEADER_SIZE];
458             System.arraycopy(header, 0, message, 0, HEADER_SIZE);
459             int to = HEADER_SIZE;
460             Bytes.write(message, to++, OpEnum.Modify.getOrMask());
461             short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
462             Bytes.writeLE(message, to, index); to += 2;
463             Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
464             Bytes.writeLE(message, to, (short)length); to += 2;
465             System.arraycopy(bytes, offset, message, to, length);
466             graphSession.updateCluster(new UpdateClusterFunction(message));
467             voffset += length;
468             offset += length;
469             left -= length;
470         }
471     }
472
473     private final int setValueBig(int ri, byte[] bytes, int length_) {
474         checkBufferSpace(12);
475         int sum = 0;
476         int voffset = 0;
477         int offset = 0;
478         int left = length_;
479         while (left > 0) {
480             int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
481             if (DEBUG)
482                 printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
483             int written = modiValue(ri, voffset, bytes, offset, length);
484             sum += written;
485             voffset += written;
486             offset += written;
487             left -= written;
488             checkBufferSpace(12);
489         }
490         return sum;
491     }
492
493     private final int setValueSmall(int ri, byte[] bytes, int length) {
494         checkBufferSpace(5 + length);
495         int pos = byteIndex;
496         int i = length << 14 | ri;
497         if (length < 32) {
498             byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
499             addByte(op);
500             short s = (short) i;
501             addShort(s);
502         } else {
503             addByte(OpEnum.Set.getOrMask());
504             addInt(i);
505         }
506         System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
507         byteIndex += length;
508         int len = byteIndex - pos;
509         return len;
510     }
511
512     final void setValue(short index, byte[] bytes) {
513         setValue(index, bytes, bytes.length);
514     }
515
516     final void setValue(short index, byte[] bytes, int length) {
517         checkInitialization();
518         if (ClusterTraitsBase.isIllegalResourceIndex(index))
519             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
520         if (DEBUG)
521             printlnd("Set value ri=" + index
522                     + " len=" + length
523                     + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
524         int len;
525         /*
526          * The limit for the cluster stream is (1<18)-1 but this avoids the
527          * conversion to big cluster.
528          */
529         if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
530             len = setValueBig(index, bytes, length);
531         else
532             len = setValueSmall(index, bytes, length);
533         if (DEBUG_STAT) {
534             ++info.nValues;
535             info.sValues += len + length;
536         }
537     }
538
539     final void setValue(Change c, byte[] bytes, int length) {
540         short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
541         setValue(ri, bytes, length);
542         c.initValue();
543     }
544
545     final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
546         checkInitialization();
547         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
548         if (DEBUG)
549             printlnd("Modify value ri=" + ri
550                     + " voff=" + voffset
551                     + " vlen=" + length
552                     + " blen=" + bytes.length
553                     + " boff=" + offset
554                     + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
555         modiValueBig(ri, voffset, length, bytes, offset);
556         c.init();
557         if (DEBUG_STAT) {
558             ++info.nValues;
559             info.sValues += length;
560         }
561     }
562     final void setImmutable(boolean immutable) {
563         checkInitialization();
564         clusterChange2.setImmutable(immutable);
565     }
566     final void undoValueEx(int resourceIndex) {
567         checkInitialization();
568         clusterChange2.undoValueEx(resourceIndex);
569     }
570     final void setDeleted(boolean deleted) {
571         checkInitialization();
572         clusterChange2.setDeleted(deleted);
573     }
574     final void corrupt() {
575         checkInitialization();
576         addByte((byte)0);
577     }
578     /**
579      * @param graphSession
580      * @param clusterId
581      * @return true if actually flushed something
582      */
583     final boolean flush(GraphSession graphSession, ClusterUID clusterUID) {
584         if (byteIndex > 0) {
585             if(DebugPolicy.REPORT_CLUSTER_STREAM)
586                 System.err.println("Flush cluster change set stream " + this);
587             setHeaderVectorSize(byteIndex);
588             byte[] copy = new byte[byteIndex + HEADER_SIZE];
589             System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
590             System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
591             UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
592             if (DEBUG_CCS) {
593                 for (DebugStm stm : debugStms)
594                     printlnd(stm.toString2());
595                 debugStms.clear();
596             }
597             if (DEBUG_STAT) {
598                 info.tot = updateClusterFunction.operation.length;
599                 printlnd("ReallyFlush: " + info.toString());
600                 sum.add(info);
601                 printlnd("ReallyFlush sum: " + sum.toString());
602             }
603             // long start = System.nanoTime();
604             graphSession.updateCluster(updateClusterFunction);
605             // long duration = System.nanoTime() - start;
606             // duration2 += duration;
607             // System.err.println("updateCluster " + 1e-9*duration);
608             // System.err.println("updateCluster total " + 1e-9*duration2);
609             clear();
610             clusterChange2.flush(graphSession);
611             return true;
612         } else if (clusterChange2.isDirty()) {
613             clusterChange2.flush(graphSession);
614             clear();
615             return true;
616         } else if (flushed) {
617             flushed = false;
618             return true;
619         } else {
620             return true;
621         }
622     }
623
624     final void flushInternal(GraphSession graphSession, ClusterUID clusterUID) {
625         flush(graphSession, clusterUID);
626         flushed = true;
627     }
628
629     final class ForeignTable {
630         private final TLongIntHashMap table = new TLongIntHashMap();
631
632         private long createKey(short index, long cluster) {
633             assert (cluster <= (1L << 48) - 1);
634             return (cluster << 14) | index;
635         }
636
637         public int get(short index, long cluster) {
638             int value = table.get(createKey(index, cluster));
639             if (DEBUG)
640                 printlnd("ForeignTable get c=" + clusterUID + " i="
641                         + (value - 1) + " r=" + index + " rc=" + cluster);
642             return value;
643         }
644
645         public int put(short index, long cluster, int value) {
646             if (DEBUG)
647                 printlnd("ForeignTable put c=" + clusterUID + " i="
648                         + (value - 1) + " r=" + index + " rc=" + cluster);
649             return table.put(createKey(index, cluster), value);
650         }
651
652         public int size() {
653             return table.size();
654         }
655
656         public void clear() {
657             table.clear();
658         }
659     }
660
661     @Override
662     public int hashCode() {
663         return 31*clusterImpl.getClusterKey();
664     }
665
666     @Override
667     public boolean equals(Object object) {
668         if (this == object)
669             return true;
670         else if (object == null)
671             return false;
672         else if (!(object instanceof ClusterChange))
673             return false;
674         ClusterChange r = (ClusterChange)object;
675         return r.clusterImpl.getClusterKey() == clusterImpl.getClusterKey();
676     }
677
678
679 }