780ea4e053de836ab5f4f4fa45daf4799eba6581
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / internal / ClusterChange.java
1 package org.simantics.acorn.internal;
2
3 import java.nio.ByteBuffer;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.List;
7
8 import org.simantics.acorn.internal.ClusterStream.ClusterEnum;
9 import org.simantics.acorn.internal.ClusterStream.Data;
10 import org.simantics.acorn.internal.ClusterStream.DebugInfo;
11 import org.simantics.acorn.internal.ClusterStream.OpEnum;
12 import org.simantics.acorn.internal.ClusterStream.StmEnum;
13 import org.simantics.compressions.Compressions;
14 import org.simantics.db.exception.RuntimeDatabaseException;
15 import org.simantics.db.impl.ClusterTraitsBase;
16 import org.simantics.db.procore.cluster.ClusterTraits;
17 import org.simantics.db.procore.cluster.ClusterTraitsSmall;
18 import org.simantics.db.service.Bytes;
19 import org.simantics.db.service.ClusterUID;
20 import org.simantics.utils.datastructures.Pair;
21
22 import gnu.trove.map.hash.TIntByteHashMap;
23 import gnu.trove.map.hash.TLongIntHashMap;
24
25
26 public final class ClusterChange {
27         
28     public static final int VERSION = 1;
29     public static final byte ADD_OPERATION = 2;
30     public static final byte REMOVE_OPERATION = 3;
31     public static final byte DELETE_OPERATION = 5;
32
33     public static final boolean DEBUG = false;
34     public static final boolean DEBUG_STAT = false;
35     public static final boolean DEBUG_CCS = false;
36
37     private static DebugInfo sum = new DebugInfo();
38
39     public final TIntByteHashMap foreignTable = new TIntByteHashMap();
40     private final DebugInfo info;
41 //    private final GraphSession graphSession;
42     public final ClusterUID clusterUID;
43     private final int SIZE_OFFSET;
44 //    private final int HEADER_SIZE;
45     // How much buffer is used before stream is flushed to server. The bigger the better.
46     public static final int MAX_FIXED_BYTES = (1<<15) + (1<<14);
47     private static final int MAX_FIXED_OPERATION_SIZE = 17 + 16;
48     private static final int MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR = MAX_FIXED_OPERATION_SIZE + 36;
49     private int nextSize = MAX_FIXED_BYTES;
50     int byteIndex = 0;
51     private byte[] bytes = null; // Operation data.
52 //    private final byte[] header;
53     private boolean flushed = false;
54     private ArrayList<Pair<ClusterUID, byte[]>> stream;
55
56 //    public ClusterImpl clusterImpl;
57
58     public ClusterChange( ArrayList<Pair<ClusterUID, byte[]>>  stream, ClusterUID clusterUID) {
59         this.clusterUID = clusterUID;
60         long[] longs = new long[ClusterUID.getLongLength()];
61         clusterUID.toLong(longs, 0);
62         this.stream = stream;
63 //        this.graphSession = clusterStream.graphSession;
64         info = new DebugInfo();
65 //        HEADER_SIZE = 8 + longs.length * 8;
66 //        header = new byte[HEADER_SIZE];
67         SIZE_OFFSET = 0;
68 //        Bytes.writeLE(header, SIZE_OFFSET + 0, 0); // Correct byte vector size is set with setHeaderVectorSize() later.
69 //        Bytes.writeLE(header, SIZE_OFFSET + 4, VERSION);
70 //        for (int i=0, offset=8; i<longs.length; ++i, offset+=8)
71 //            Bytes.writeLE(header, offset, longs[i]);
72         //initBuffer();
73 //        this.clusterStream = clusterStream;
74 //        this.clusterChange2 = new ClusterChange2(clusterUID, clusterImpl);
75 //        clusterStream.changes.add(this);
76     }
77
78 //    private void setHeaderVectorSize(int size) {
79 //        if (size < 0)
80 //            throw new RuntimeDatabaseException("Change set size can't be negative.");
81 //        int len = size + HEADER_SIZE - SIZE_OFFSET - 4;
82 //        Bytes.writeLE(header, SIZE_OFFSET, len);
83 //    }
84     @Override
85     public String toString() {
86         return super.toString() + " cluster=" + clusterUID + " off=" + byteIndex;
87     }
88     final public void initBuffer() {
89         flushed = false;
90         if (null == bytes || bytes.length < nextSize) {
91             bytes = new byte[nextSize];
92             nextSize = MAX_FIXED_BYTES;
93         }
94         byteIndex = 0;
95     }
96     private final void clear() {
97 //      if(clusterImpl != null && clusterImpl.change != null)
98 //              clusterImpl.change.init();
99         foreignTable.clear();
100         //initBuffer();
101         bytes = null;
102         byteIndex = 0;
103         if (DEBUG_STAT)
104             info.clear();
105     }
106     private final void checkInitialization() {
107 //        if (0 == byteIndex)
108 //            clusterStream.changes.addChange(this);
109     }
110     private final void printlnd(String s) {
111         System.out.println("DEBUG: ClusterChange " + clusterUID + ": " + s);
112     }
113     public final void createResource(short index) {
114         checkInitialization();
115         if (DEBUG)
116             printlnd("New ri=" + index + " offset=" + byteIndex);
117         if (index > ClusterTraits.getMaxNumberOfResources())
118             throw new RuntimeDatabaseException("Illegal resource index=" + index + ".");
119         checkBufferSpace(null);
120         bytes[byteIndex++] = (byte)52;
121         bytes[byteIndex++] = (byte)index;
122         bytes[byteIndex++] = (byte)(index>>>8);
123     }
124     void flushCollect(Change c) {
125         throw new UnsupportedOperationException();
126 //        flushInternal(graphSession, clusterUID);
127 //        if (DEBUG)
128 //            printlnd("Cluster change data was flushed.");
129 //        if (null != c) {
130 //            if (DEBUG)
131 //                printlnd("Clearing lookup for " + c.toString());
132 //            c.lookup1 = null;
133 //            c.lookup2 = null;
134 //        }
135 //        if (null != clusterImpl) {
136 //            clusterImpl.foreignLookup = null;
137 //        }
138     }
139
140     private final boolean checkBufferSpace(Change c) {
141 //        clusterStream.changes.checkFlush();
142         if(bytes == null) initBuffer();
143         if (MAX_FIXED_BYTES - byteIndex > MAX_FIXED_OPERATION_SIZE_AND_ROOM_FOR_ERROR) {
144             return false;
145         }
146         flush();
147 //        initBuffer();
148         return true;
149     }
150
151     private final void checkBufferSpace(int size) {
152         if(bytes == null) initBuffer();
153         if (bytes.length - byteIndex >= size)
154             return;
155         nextSize = Math.max(MAX_FIXED_BYTES, size);
156         flush();
157         initBuffer();
158     }
159
160     public final void addChange(Change c) {
161         checkInitialization();
162         checkBufferSpace(c);
163         byte operation = c.op0;
164         if(operation == ADD_OPERATION)
165             addStm(c, StmEnum.Add);
166         else if (operation == REMOVE_OPERATION)
167             addStm(c, StmEnum.Remove);
168         else if (operation == DELETE_OPERATION) {
169             if (DEBUG)
170                 printlnd("Delete value offset=" + byteIndex + " " + c);
171             addByte(OpEnum.Delete.getOrMask());
172             addShort(ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0));
173         }
174         c.lastArg = 0;
175     }
176
177     private final void addForeignLong(short index, ClusterUID clusterUID) {
178         byteIndex = clusterUID.toByte(bytes, byteIndex);
179         bytes[byteIndex++] = (byte)(index & 0xFF);
180         bytes[byteIndex++] = (byte)(index >>> 8);
181     }
182
183     private final ClusterEnum addIndexAndCluster(int key, ClusterUID clusterUID, byte lookIndex, byte[] lookup) {
184         assert(!clusterUID.equals(ClusterUID.Null));
185         short resourceIndex = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(key);
186         if (clusterUID.equals(this.clusterUID)) {
187             bytes[byteIndex++] = (byte)(resourceIndex & 0xFF);
188             bytes[byteIndex++] = (byte)(resourceIndex >>> 8);
189             return ClusterEnum.Local;
190         }
191
192         byte foreign = 0;
193         if(lookIndex > 0) {
194             if(lookup != null)
195                 foreign = lookup[lookIndex];
196         } else {
197             foreign = foreignTable.get(key);
198         }
199         if (0 != foreign) {
200             if (foreign > 256)
201                 throw new RuntimeDatabaseException("Internal error." +
202                 "Too big foreing index=" + foreign + " max=256");
203             --foreign;
204             bytes[byteIndex++] = foreign;
205             return ClusterEnum.ForeignShort;
206         } else {
207             byte position = (byte) (foreignTable.size() + 1);
208             if(lookup != null)
209                 lookup[lookIndex] = position;
210             foreignTable.put(key, position);
211             if (DEBUG_STAT)
212                 info.sForeign = foreignTable.size();
213             if (clusterUID.equals(ClusterUID.Null))
214                 throw new RuntimeDatabaseException("Internal error." +
215                 "Cluster unique id not defined for foreing cluster.");
216             addForeignLong(resourceIndex, clusterUID);
217             return ClusterEnum.ForeignLong;
218         }
219     }
220
221     private final void addByte(byte b) {
222         bytes[byteIndex++] = b;
223     }
224
225     private final void addShort(short s) {
226         bytes[byteIndex++] = (byte)(s & 0xFF);
227         bytes[byteIndex++] = (byte)(s >>> 8);
228     }
229
230 //    private final void addShort(int s) {
231 //        bytes[byteIndex++] = (byte) (s & 0xFF);
232 //        bytes[byteIndex++] = (byte) ((s >>> 8) & 0xFF);
233 //    }
234
235     private final void addInt(int i) {
236 //        System.err.println("addInt " + i + " " + i);
237         bytes[byteIndex++] = (byte) (i & 0xFF);
238         bytes[byteIndex++] = (byte) ((i >>> 8) & 0xFF);
239         bytes[byteIndex++] = (byte) ((i >>> 16) & 0xFF);
240         bytes[byteIndex++] = (byte) ((i >>> 24) & 0xFF);
241         // buffer.asIntBuffer().put(i);
242         // buffer.position(buffer.position()+4);
243     }
244
245 //    private void addLong6(long l) {
246 ////        System.err.println("addLong " + l);
247 //        bytes[byteIndex++] = (byte) (l & 0xFF);
248 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
249 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
250 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
251 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
252 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
253 //        // buffer.asLongBuffer().put(l);
254 //        // buffer.position(buffer.position() + 6);
255 //    }
256
257     private void addLong7(long l) {
258         bytes[byteIndex++] = (byte) (l & 0xFF);
259         bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
260         bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
261         bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
262         bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
263         bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
264         bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
265         // buffer.asLongBuffer().put(l);
266         // buffer.position(buffer.position() + 7);
267     }
268
269 //    private void addLong(long l) {
270 //        bytes[byteIndex++] = (byte) (l & 0xFF);
271 //        bytes[byteIndex++] = (byte) ((l >>> 8) & 0xFF);
272 //        bytes[byteIndex++] = (byte) ((l >>> 16) & 0xFF);
273 //        bytes[byteIndex++] = (byte) ((l >>> 24) & 0xFF);
274 //        bytes[byteIndex++] = (byte) ((l >>> 32) & 0xFF);
275 //        bytes[byteIndex++] = (byte) ((l >>> 40) & 0xFF);
276 //        bytes[byteIndex++] = (byte) ((l >>> 48) & 0xFF);
277 //        bytes[byteIndex++] = (byte) ((l >>> 56) & 0xFF);
278 //    }
279     private final byte bufferPop() {
280         return bytes[--byteIndex];
281     }
282
283     final class DebugStm {
284         StmEnum e;
285         int r;
286         int p;
287         int o;
288         ClusterUID pc;
289         ClusterUID oc;
290
291         DebugStm(StmEnum e, int r, int p, ClusterUID pc, int o, ClusterUID oc) {
292             this.e = e;
293             this.r = r;
294             this.p = p;
295             this.o = o;
296             this.pc = pc;
297             this.oc = oc;
298         }
299
300         @Override
301         public String toString() {
302             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
303             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
304             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
305             return "" + e + " rk=" + r + " ri=" + ri + " rc=" + clusterUID
306             + " pk=" + p + " pi=" + pi + " pc=" + pc
307             + " ok=" + o + " oi=" + oi + " oc=" + oc;
308         }
309
310         public String toString2() {
311             return "" + e + " r=" + r + " rc=" + clusterUID + " p=" + p
312                     + " pc=" + pc + " o=" + o + " oc=" + oc;
313         }
314
315         public String toString3() {
316             short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(r);
317             short pi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(p);
318             short oi = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(o);
319             return "" + e + " ri=" + ri
320             + " pi=" + pi + " pc=" + pc
321             + " oi=" + oi + " oc=" + oc;
322         }
323     }
324
325     private List<DebugStm> debugStms = new ArrayList<DebugStm>();
326
327     @SuppressWarnings("unused")
328     private final void addStm(Change c, StmEnum stmEnum) {
329
330         if (DEBUG_STAT)
331             ++info.nStms;
332         if (DEBUG || DEBUG_CCS) {
333             DebugStm d = new DebugStm(stmEnum, c.key0, c.key1, c.clusterUID1, c.key2, c.clusterUID2);
334             if (DEBUG_CCS)
335                 debugStms.add(d);
336             if (DEBUG) {
337                 printlnd(d.toString3() + " offset=" + byteIndex);
338             }
339         }
340         // int opPos = buffer.position();
341         int opPos = byteIndex++;
342         // buffer.put((byte)0); // operation code
343         // addByte((byte)0);
344
345         boolean done = true;
346
347         ClusterEnum a = addIndexAndCluster(c.key1, c.clusterUID1, c.lookIndex1, c.lookup1);
348         byte ab = 0;
349
350         // ForeignShort = byte
351         // Local = short
352         // ForeignLong = 8 byte
353         if (a != ClusterEnum.ForeignShort) {
354             ab = bufferPop();
355             done = false;
356         }
357
358         ClusterEnum b = addIndexAndCluster(c.key2, c.clusterUID2, c.lookIndex2, c.lookup2);
359         byte bb = 0;
360         if (b != ClusterEnum.ForeignShort) {
361             bb = bufferPop();
362             done = false;
363         }
364
365         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
366         if (ClusterTraitsSmall.isIllegalResourceIndex(ri))
367             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
368         bytes[byteIndex++] = (byte)ri; // index low byte
369         if(!done) {
370             Data data = ClusterEnum.getData(stmEnum, a, b);
371             int left = 6 - data.bits;
372             int op = ri >>> (8 + left);
373             ri >>>= 8;
374             ri &= (1 << left) - 1;
375             if (a != ClusterEnum.ForeignShort) {
376                 ri |= ab << left;
377                 left += 6;
378             }
379             if (b != ClusterEnum.ForeignShort) {
380                 ri |= bb << left;
381                 left += 6;
382             }
383             switch (data.bytes) {
384                 default:
385                     throw new RuntimeDatabaseException("Assertion error. Illegal number of bytes=" + data.bytes);
386                 case 2:
387                     bytes[byteIndex++] = (byte)(ri & 0xFF);
388                     bytes[byteIndex++] = (byte)((ri >>> 8) & 0xFF);
389                     break;
390                 case 1:
391                     bytes[byteIndex++] = (byte)(ri & 0xFF);
392                     break;
393                 case 0:
394                     break;
395             }
396             op |= data.mask;
397             this.bytes[opPos] = (byte)op;
398         } else {
399             if (stmEnum == StmEnum.Add)
400                 bytes[opPos] = (byte)((ri >>> 8) + 64);
401             else
402                 bytes[opPos] = (byte)((ri >>> 8) + 128);
403         }
404         if (DEBUG_STAT) {
405             if (a == ClusterEnum.Local && b == ClusterEnum.Local) {
406                 ++info.nLocal;
407             } else if (a == ClusterEnum.Local || b == ClusterEnum.Local) {
408                 ++info.nPartly;
409             } else {
410                 ++info.nForeign;
411             }
412         }
413         if (foreignTable.size() > 252)
414                 flush();
415 //              throw new UnsupportedOperationException();
416             //flushInternal(graphSession, clusterUID);
417     }
418
419     private final int modiValue(int ri, long value_offset, byte[] bytes, int offset, int size) {
420         if (DEBUG)
421             printlnd("Modify value ri=" + ri + " vo=" + value_offset + " size=" + size + " total=" + bytes.length);
422         if (ClusterTraitsBase.isIllegalResourceIndex(ri))
423             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + ri);
424         if (value_offset > (1L << 58 - 1))
425             throw new RuntimeDatabaseException("Illegal value offset="
426                     + value_offset);
427         if (size < 0 || size > MAX_FIXED_BYTES - 1)
428             throw new RuntimeDatabaseException("Illegal value size=" + size);
429         if (offset + size > bytes.length)
430             throw new RuntimeDatabaseException("Illegal value size=" + size);
431         checkBufferSpace(12 + size);
432         addByte(OpEnum.Modify.getOrMask());
433         ri |= (value_offset >>> 56) << 14; // add top two bits
434         addShort((short) ri);
435         value_offset &= (1L << 56) - 1;
436         addLong7(value_offset);
437         addShort((short) size);
438         if (DEBUG)
439             System.out.println("Modify value fixed part end offset=" + byteIndex);
440         int copied = Math.min(size, this.bytes.length - byteIndex);
441         System.arraycopy(bytes, offset, this.bytes, byteIndex, copied);
442         byteIndex += size;
443         return copied;
444     }
445
446 //    private final void modiValueBig(int ri, long voffset, int left, byte[] bytes, int offset) {
447 //        checkBufferSpace(0);
448 //        int current = Math.min(this.bytes.length - byteIndex - 12, left);
449 //        if(current >= 0) {
450 //            int written = modiValue(ri, voffset, bytes, offset, current);
451 //            voffset += written;
452 //            offset += written;
453 //            left -= written;
454 //        }
455 ////        flushInternal(graphSession, clusterUID);
456 //        while (left > 0) {
457 //            int length = Math.min(left, (1 << 16) - 1);
458 //            if (DEBUG)
459 //                printlnd("Modify big value ri=" + ri + " vo=" + voffset + " len=" + length);
460 //            int psize = length + 12;
461 ////            setHeaderVectorSize(psize);
462 //            byte[] message = new byte[psize/*+HEADER_SIZE*/];
463 ////            System.arraycopy(header, 0, message, 0, HEADER_SIZE);
464 //            int to = 0;
465 //            Bytes.write(message, to++, OpEnum.Modify.getOrMask());
466 //            short index = (short)(ri | (voffset >>> 56)<<14); // add top two bits
467 //            Bytes.writeLE(message, to, index); to += 2;
468 //            Bytes.writeLE7(message, to, voffset & ((1L << 56) - 1)); to += 7;
469 //            Bytes.writeLE(message, to, (short)length); to += 2;
470 //            System.arraycopy(bytes, offset, message, to, length);
471 ////            graphSession.updateCluster(new UpdateClusterFunction(message));
472 //            voffset += length;
473 //            offset += length;
474 //            left -= length;
475 //        }
476 //    }
477
478     private final int setValueBig(int ri, byte[] bytes, int length_) {
479         checkBufferSpace(12);
480         int sum = 0;
481         int voffset = 0;
482         int offset = 0;
483         int left = length_;
484         while (left > 0) {
485             int length = Math.min(left, MAX_FIXED_BYTES - 12 - byteIndex);
486             if (DEBUG)
487                 printlnd("Set big value ri=" + ri + " vo=" + voffset + " len=" + length);
488             int written = modiValue(ri, voffset, bytes, offset, length);
489             sum += written;
490             voffset += written;
491             offset += written;
492             left -= written;
493             checkBufferSpace(12);
494         }
495         return sum;
496     }
497
498     private final int setValueSmall(int ri, byte[] bytes, int length) {
499         checkBufferSpace(5 + length);
500         int pos = byteIndex;
501         int i = length << 14 | ri;
502         if (length < 32) {
503             byte op = (byte) (OpEnum.SetShort.getOrMask() | length >>> 2);
504             addByte(op);
505             short s = (short) i;
506             addShort(s);
507         } else {
508             addByte(OpEnum.Set.getOrMask());
509             addInt(i);
510         }
511         System.arraycopy(bytes, 0, this.bytes, byteIndex, length);
512         byteIndex += length;
513         int len = byteIndex - pos;
514         return len;
515     }
516
517     final void setValue(short index, byte[] bytes) {
518         setValue(index, bytes, bytes.length);
519     }
520
521     final public void setValue(short index, byte[] bytes, int length) {
522         checkInitialization();
523         if (ClusterTraitsBase.isIllegalResourceIndex(index))
524             throw new RuntimeDatabaseException("Assertion error. Illegal resource index=" + index);
525         if (DEBUG)
526             printlnd("Set value ri=" + index
527                     + " len=" + length
528                     + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, length))));
529         int len;
530         /*
531          * The limit for the cluster stream is (1<18)-1 but this avoids the
532          * conversion to big cluster.
533          */
534         if (length > ClusterTraitsSmall.VALUE_SIZE_MAX)
535             len = setValueBig(index, bytes, length);
536         else
537             len = setValueSmall(index, bytes, length);
538         if (DEBUG_STAT) {
539             ++info.nValues;
540             info.sValues += len + length;
541         }
542     }
543
544 //    final void setValue(Change c, byte[] bytes, int length) {
545 //        short ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
546 //        setValue(ri, bytes, length);
547 //        c.initValue();
548 //    }
549
550 //    final void modiValue(Change c, long voffset, int length, byte[] bytes, int offset) {
551 //        checkInitialization();
552 //        int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(c.key0);
553 //        if (DEBUG)
554 //            printlnd("Modify value ri=" + ri
555 //                    + " voff=" + voffset
556 //                    + " vlen=" + length
557 //                    + " blen=" + bytes.length
558 //                    + " boff=" + offset
559 //                    + " bytes=" + Arrays.toString(Arrays.copyOfRange(bytes, 0, Math.min(10, bytes.length))));
560 //        modiValueBig(ri, voffset, length, bytes, offset);
561 //        c.init();
562 //        if (DEBUG_STAT) {
563 //            ++info.nValues;
564 //            info.sValues += length;
565 //        }
566 //    }
567     final void setImmutable(boolean immutable) {
568         checkInitialization();
569 //        clusterChange2.setImmutable(immutable);
570     }
571     final void undoValueEx(int resourceIndex) {
572         checkInitialization();
573 //        clusterChange2.undoValueEx(resourceIndex);
574     }
575     final void setDeleted(boolean deleted) {
576         checkInitialization();
577 //        clusterChange2.setDeleted(deleted);
578     }
579     final void corrupt() {
580         checkInitialization();
581         addByte((byte)0);
582     }
583     
584     public byte[] getBytes() {
585         byte[] copy = new byte[byteIndex];
586         System.arraycopy(bytes, 0, copy, 0, byteIndex);
587         return copy;
588     }
589     
590     /**
591      * @param graphSession
592      * @param clusterId
593      * @return true if actually flushed something
594      */
595     final boolean flush(/*GraphSession graphSession,*/ ClusterUID clusterUID) {
596         throw new UnsupportedOperationException();
597 //        if (byteIndex > 0) {
598 //            if(DebugPolicy.REPORT_CLUSTER_STREAM)
599 //                System.err.println("Flush cluster change set stream " + this);
600 //            setHeaderVectorSize(byteIndex);
601 //            byte[] copy = new byte[byteIndex + HEADER_SIZE];
602 //            System.arraycopy(header, 0, copy, 0, HEADER_SIZE);
603 //            System.arraycopy(bytes, 0, copy, HEADER_SIZE, byteIndex);
604 //            UpdateClusterFunction updateClusterFunction = new UpdateClusterFunction(copy);
605 //            if (DEBUG_CCS) {
606 //                for (DebugStm stm : debugStms)
607 //                    printlnd(stm.toString2());
608 //                debugStms.clear();
609 //            }
610 //            if (DEBUG_STAT) {
611 //                info.tot = updateClusterFunction.operation.length;
612 //                printlnd("ReallyFlush: " + info.toString());
613 //                sum.add(info);
614 //                printlnd("ReallyFlush sum: " + sum.toString());
615 //            }
616 //            // long start = System.nanoTime();
617 //            graphSession.updateCluster(updateClusterFunction);
618 //            // long duration = System.nanoTime() - start;
619 //            // duration2 += duration;
620 //            // System.err.println("updateCluster " + 1e-9*duration);
621 //            // System.err.println("updateCluster total " + 1e-9*duration2);
622 //            clear();
623 //            clusterChange2.flush(graphSession);
624 //            return true;
625 //        } else if (clusterChange2.isDirty()) {
626 //            clusterChange2.flush(graphSession);
627 //            clear();
628 //            return true;
629 //        } else if (flushed) {
630 //            flushed = false;
631 //            return true;
632 //        } else {
633 //            return true;
634 //        }
635     }
636
637     final void flushInternal(ClusterUID clusterUID) {
638         throw new UnsupportedOperationException();
639 //        flush(graphSession, clusterUID);
640 //        flushed = true;
641     }
642
643     final class ForeignTable {
644         private final TLongIntHashMap table = new TLongIntHashMap();
645
646         private long createKey(short index, long cluster) {
647             assert (cluster <= (1L << 48) - 1);
648             return (cluster << 14) | index;
649         }
650
651         public int get(short index, long cluster) {
652             int value = table.get(createKey(index, cluster));
653             if (DEBUG)
654                 printlnd("ForeignTable get c=" + clusterUID + " i="
655                         + (value - 1) + " r=" + index + " rc=" + cluster);
656             return value;
657         }
658
659         public int put(short index, long cluster, int value) {
660             if (DEBUG)
661                 printlnd("ForeignTable put c=" + clusterUID + " i="
662                         + (value - 1) + " r=" + index + " rc=" + cluster);
663             return table.put(createKey(index, cluster), value);
664         }
665
666         public int size() {
667             return table.size();
668         }
669
670         public void clear() {
671             table.clear();
672         }
673     }
674
675     @Override
676     public int hashCode() {
677         return 31*clusterUID.hashCode();
678     }
679
680     @Override
681     public boolean equals(Object object) {
682         if (this == object)
683             return true;
684         else if (object == null)
685             return false;
686         else if (!(object instanceof ClusterChange))
687             return false;
688         ClusterChange r = (ClusterChange)object;
689         return r.clusterUID.equals(clusterUID);
690     }
691     
692     public void flush() {
693         
694         if(byteIndex > 0) {
695                 
696                 final ClusterUID cuid = clusterUID;
697                         
698                         byte[] block = getBytes();
699                         byte[] raw = new byte[block.length + 28];
700                         Bytes.writeLE(raw, 0, 1);
701                         System.arraycopy(cuid.asBytes(), 0, raw, 4, 16);
702                         Bytes.writeLE(raw, 20, block.length);
703                         System.arraycopy(block, 0, raw, 24, block.length);
704                         Bytes.writeLE(raw, 24+block.length, 0);
705
706                         ByteBuffer rawBB = ByteBuffer.wrap(raw);
707                         ByteBuffer outputBB = ByteBuffer.allocate(raw.length + raw.length/8);
708                         //outputBB.order(ByteOrder.LITTLE_ENDIAN);
709                         int compressedSize = Compressions.get(Compressions.LZ4).compressBuffer(rawBB, 0, raw.length, outputBB, 0);
710
711                         byte[] data_ = null;
712                         if(compressedSize < raw.length) {
713                                 data_ = new byte[compressedSize];
714                                 outputBB.get(data_,0,compressedSize);   
715                         } else {
716                                 data_ = raw;
717                         }
718
719                         byte[] data = new byte[data_.length+24];
720                         Bytes.writeLE(data, 0, 0);
721                         Bytes.writeLE(data, 4, 0);
722                         Bytes.writeLE(data, 8, raw.length);
723                         Bytes.writeLE(data, 12, raw.length);
724                         Bytes.writeLE(data, 16, data_.length);
725                         System.arraycopy(data_, 0, data, 20, data_.length);
726                         Bytes.writeLE(data, 20+data_.length, 0);
727                         
728                         stream.add(Pair.make(clusterUID, data));
729                         clear();
730                         initBuffer();
731                 
732         }
733     }
734     
735 }