Dispose ClientChangesImpl ChangeSets to minimize memory footprint
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClientChangesImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.RandomAccessFile;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.FileChannel;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Iterator;
23
24 import org.simantics.db.ChangeSet;
25 import org.simantics.db.Disposable;
26 import org.simantics.db.Resource;
27 import org.simantics.db.common.internal.config.InternalClientConfig;
28 import org.simantics.db.exception.RuntimeDatabaseException;
29 import org.simantics.db.impl.ResourceImpl;
30
31 /**
32  * @author Antti Villberg
33  * @author Tuukka Lehtonen
34  */
35 public final class ClientChangesImpl implements ChangeSet, Disposable {
36
37     private static final boolean ASSERT_ALIVE = true; 
38
39     private static final byte[] EMPTY = {};
40
41     private static final boolean DEBUG = false;
42
43     private static final int BUFFER_SIZE = 1 << 17;
44
45     private static final byte CLAIM = 1;
46     private static final byte DENY = 2;
47     private static final byte VALUE = 3;
48     private static final byte TERM = 4;
49     private static final byte INVALIDATE = 5;
50
51     private boolean disposed = false;
52
53     private SessionImplSocket session;
54
55     /**
56      * True if enough data has been written to overflow the normal buffer which
57      * will result in data being written to disk.
58      */
59     private boolean overflow = false;
60
61     private byte[] readBytes = new byte[BUFFER_SIZE];
62     // By default the write buffer will be the exact same as the read buffer
63     // since initially were not forced to write changes to the disk.
64     private byte[] writeBytes = readBytes;
65
66     private int readBufferIndex = 0;
67     private int writeBufferIndex = 0;
68
69     /**
70      * <code>true</code> if something has been written since last call to
71      * {@link #resetRead()}. By default it is <code>true</code> since
72      * {@link #resetRead()} must on its first invocation write the {@link #TERM}
73      * constant into the stream.
74      */
75     private boolean wroteSinceLastRead = true;
76
77     private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
78     private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
79
80     private File tempFile;
81     private RandomAccessFile file;
82     private FileChannel channel;
83     private long currentPosition = 0L;
84     private long readPosition = 0L;
85     private long writePosition = 0L;
86
87     static abstract class NoRemoveIterator<T> implements Iterator<T> {
88         @Override
89         public void remove() {
90             throw new UnsupportedOperationException();
91         }
92     }
93
94     /**
95      * Base class for all immutable collections returned by
96      * {@link ClientChangesImpl}.
97      * 
98      * @param <T>
99      */
100     static class CollectionBase<T> implements Collection<T> {
101
102         @Override
103         public int size() {
104             throw new UnsupportedOperationException();
105         }
106
107         @Override
108         public boolean isEmpty() {
109             throw new UnsupportedOperationException();
110         }
111
112         @Override
113         public boolean contains(Object o) {
114             throw new UnsupportedOperationException();
115         }
116
117         @Override
118         public Iterator<T> iterator() {
119             throw new UnsupportedOperationException();
120         }
121
122         @Override
123         public Object[] toArray() {
124             throw new UnsupportedOperationException();
125         }
126
127         @Override
128         public <TT> TT[] toArray(TT[] a) {
129             throw new UnsupportedOperationException();
130         }
131
132         @Override
133         public boolean add(T e) {
134             throw new UnsupportedOperationException();
135         }
136
137         @Override
138         public boolean remove(Object o) {
139             throw new UnsupportedOperationException();
140         }
141
142         @Override
143         public boolean containsAll(Collection<?> c) {
144             throw new UnsupportedOperationException();
145         }
146
147         @Override
148         public boolean addAll(Collection<? extends T> c) {
149             throw new UnsupportedOperationException();
150         }
151
152         @Override
153         public boolean removeAll(Collection<?> c) {
154             throw new UnsupportedOperationException();
155         }
156
157         @Override
158         public boolean retainAll(Collection<?> c) {
159             throw new UnsupportedOperationException();
160         }
161
162         @Override
163         public void clear() {
164             throw new UnsupportedOperationException();
165         }
166
167     };
168
169     ClientChangesImpl(SessionImplSocket session) {
170         this.session = session;
171         //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
172     }
173
174     @Override
175     public String toString() {
176         final int LIMIT = 10000;
177         String to = "";
178         for (StatementChange s : changedStatements()) {
179             String op = s.isClaim() ? "add" : "rem";
180             to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
181             if (to.length() > LIMIT) {
182                 to += "... more.\n";
183                 return to;
184             }
185         }
186         for (Resource r : changedValues()) {
187             to += "\nmod " + r; 
188             if (to.length() > LIMIT) {
189                 to += "...etc.\n";
190                 return to;
191             }
192         }
193         return to;
194     }
195
196     private Resource getResource(int id) {
197         return session.getResource(id);
198     }
199
200     @Override
201     public synchronized Collection<StatementChange> changedStatements() {
202         if (ASSERT_ALIVE)
203             assertAlive();
204
205         if (!resetRead())
206             return Collections.emptyList();
207
208         return new CollectionBase<StatementChange>() {
209
210             StatementChange seek() {
211                 try {
212                     while(true) {
213                         byte method = readByte();
214                         switch(method) {
215                         case TERM: {
216                             return null;
217                         }
218                         case CLAIM: {
219                             Resource s = getResource(readInt());
220                             Resource p = getResource(readInt());
221                             Resource o = getResource(readInt());
222                             return new StatementChangeImpl(s, p, o, true);
223                         }
224                         case DENY: {
225                             Resource s = getResource(readInt());
226                             Resource p = getResource(readInt());
227                             Resource o = getResource(readInt());
228                             return new StatementChangeImpl(s, p, o, false);
229                         }
230                         case VALUE:
231                         case INVALIDATE: {
232                             // Skipping value opererations.
233                             getResource(readInt());
234                             break;
235                         }
236                         }
237                     }
238                 } catch(Exception e) {
239                     throw new RuntimeDatabaseException(e);
240                 }
241             }
242
243             private StatementChange next = seek();
244
245             @Override
246             public Iterator<StatementChange> iterator() {
247                 return new NoRemoveIterator<StatementChange>() {
248                     @Override
249                     public boolean hasNext() {
250                         return next != null;
251                     }
252                     @Override
253                     public StatementChange next() {
254                         StatementChange result = next;
255                         next = seek();
256                         return result;
257                     }
258                 };
259             }
260
261         };
262
263     }
264
265     @Override
266     public Collection<Resource> changedValues() {
267         if (ASSERT_ALIVE)
268             assertAlive();
269
270         if (!resetRead())
271             return Collections.emptyList();
272
273         return new CollectionBase<Resource>() {
274
275             Resource seek() {
276                 try {
277                     while(true) {
278                         byte method = readByte();
279                         switch(method) {
280                         case TERM: {
281                             return null;
282                         }
283                         case CLAIM: {
284                             getResource(readInt());
285                             getResource(readInt());
286                             getResource(readInt());
287                             break;
288                         }
289                         case DENY: {
290                             getResource(readInt());
291                             getResource(readInt());
292                             getResource(readInt());
293                             break;
294                         }
295                         case VALUE:
296                         case INVALIDATE: {
297                             return getResource(readInt());
298                         }
299                         }
300                     }
301                 } catch(Exception e) {
302                     return null;
303                 }
304             }
305
306             private Resource next = seek();
307
308             @Override
309             public Iterator<Resource> iterator() {
310                 return new NoRemoveIterator<Resource>() {
311                     @Override
312                     public boolean hasNext() {
313                         return next != null;
314                     }
315                     @Override
316                     public Resource next() {
317                         Resource result = next;
318                         next = seek();
319                         return result;
320                     }
321                 };
322             }
323
324         };
325
326     }
327
328     @Override
329     public Collection<Resource> changedResources() {
330         if (ASSERT_ALIVE)
331             assertAlive();
332
333         if (!resetRead())
334             return Collections.emptyList();
335
336         return new CollectionBase<Resource>() {
337
338             Resource seek() {
339
340                 try {
341                     while(true) {
342                         byte method = readByte();
343                         switch(method) {
344                         case TERM: {
345                             return null;
346                         }
347                         case CLAIM: {
348                             getResource(readInt());
349                             getResource(readInt());
350                             getResource(readInt());
351                             break;
352                         }
353                         case DENY: {
354                             getResource(readInt());
355                             getResource(readInt());
356                             getResource(readInt());
357                             break;
358                         }
359                         case VALUE: {
360                             getResource(readInt());
361                             break;
362                         }
363                         case INVALIDATE: {
364                             return getResource(readInt());
365                         }
366                         }
367                     }
368                 } catch(Exception e) {
369                     return null;
370                 }
371
372             }
373
374             private Resource next = seek();
375
376             @Override
377             public Iterator<Resource> iterator() {
378                 return new NoRemoveIterator<Resource>() {
379                     @Override
380                     public boolean hasNext() {
381                         return next != null;
382                     }
383                     @Override
384                     public Resource next() {
385                         Resource result = next;
386                         next = seek();
387                         return result;
388                     }
389                 };
390             }
391
392         };
393
394     }
395
396     @Override
397     public boolean isEmpty() {
398         if (notEmpty)
399             return false;
400         return writeBufferIndex == 0 && !overflow;
401     }
402
403     private boolean notEmpty = false;
404     void setNotEmpty(boolean notEmpty) {
405         this.notEmpty = notEmpty;
406     }
407
408     void claim(Resource s, Resource p, Resource o) {
409         if (ASSERT_ALIVE)
410             assertAlive();
411         writeByte(CLAIM);
412         writeInt(session.getId((ResourceImpl) s));
413         writeInt(session.getId((ResourceImpl) p));
414         writeInt(session.getId((ResourceImpl) o));
415         if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
416     }
417
418     private int sCache1 = -1;
419     private int sCache2 = -1;
420     private int sCache3 = -1;
421
422     final void invalidate(int s) {
423         if (ASSERT_ALIVE)
424             assertAlive();
425         
426         if(s == sCache1) {
427                 return;
428         } else if (s == sCache2) {
429                 sCache2 = sCache1;
430                 sCache1 = s;
431                 return;
432         } else if (s == sCache3) {
433                 sCache3 = sCache2;
434                 sCache2 = sCache1;
435                 sCache1 = s;
436                 return;
437         }
438
439         sCache3 = sCache2;
440         sCache2 = sCache1;
441         sCache1 = s;
442         
443         writeByte(INVALIDATE);
444         writeInt(s);
445
446         if(DEBUG) System.err.println("invalidate2 " + s);
447
448     }
449
450     void claim(int s, int p, int o) {
451         if (ASSERT_ALIVE)
452             assertAlive();
453         writeByte(CLAIM);
454         writeInt(s);
455         writeInt(p);
456         writeInt(o);
457         if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
458     }
459
460     void deny(Resource s, Resource p, Resource o) {
461         if (ASSERT_ALIVE)
462             assertAlive();
463         writeByte(DENY);
464         writeInt(session.getId((ResourceImpl)s));
465         writeInt(session.getId((ResourceImpl)p));
466         writeInt(session.getId((ResourceImpl)o));
467         if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
468     }
469
470     void deny(int s, int p, int o) {
471         if (ASSERT_ALIVE)
472             assertAlive();
473         writeByte(DENY);
474         writeInt(s);
475         writeInt(p);
476         writeInt(o);
477         if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
478     }
479
480     void claimValue(Resource r) {
481         if (ASSERT_ALIVE)
482             assertAlive();
483         writeByte(VALUE);
484         writeInt(session.getId((ResourceImpl)r));
485         if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
486     }
487
488     void claimValue(int r) {
489         if (ASSERT_ALIVE)
490             assertAlive();
491         writeByte(VALUE);
492         writeInt(r);
493         if(DEBUG) System.err.println("deny2 " + r);
494     }
495
496     // Streaming
497
498     private byte readByte() {
499         byte result = readBytes[readBufferIndex++]; 
500         if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
501         return result;
502     }
503
504         private int readInt() {
505         int ri = readBufferIndex; 
506         int remaining = BUFFER_SIZE - ri;
507         int result;
508         switch (remaining) {
509         case 0:
510             fillReadBuffer();
511             // Intentional fall-through
512         default:
513             result = (int)(
514                      (readBytes[ri] & 0xff) | 
515                     ((readBytes[ri+1] & 0xff) << 8) | 
516                     ((readBytes[ri+2] & 0xff) << 16) | 
517                     ((readBytes[ri+3] & 0xff) << 24));
518             readBufferIndex += 4;
519             return result;
520
521         case 4:
522             result = (int)(
523                      (readBytes[ri] & 0xff) | 
524                     ((readBytes[ri+1] & 0xff) << 8) | 
525                     ((readBytes[ri+2] & 0xff) << 16) | 
526                     ((readBytes[ri+3] & 0xff) << 24));
527             fillReadBuffer();
528             return result;
529
530         case 3:
531             result = (int)(
532                      (readBytes[ri] & 0xff) | 
533                     ((readBytes[ri+1] & 0xff) << 8) | 
534                     ((readBytes[ri+2] & 0xff) << 16)); 
535             fillReadBuffer();
536             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
537             return result;
538
539         case 2:
540             result = (int)(
541                     (readBytes[ri] & 0xff) | 
542                    ((readBytes[ri+1] & 0xff) << 8)); 
543             fillReadBuffer();
544             ri = readBufferIndex;
545             result |= (int)(
546                    ((readBytes[ri] & 0xff) << 16) | 
547                    ((readBytes[ri+1] & 0xff) << 24)); 
548             readBufferIndex += 2;
549             return result;
550
551         case 1:
552             result = (int)(readBytes[ri] & 0xff); 
553             fillReadBuffer();
554             ri = readBufferIndex;
555             result |= (int)( 
556                     ((readBytes[ri] & 0xff) << 8) | 
557                     ((readBytes[ri+1] & 0xff) << 16) | 
558                     ((readBytes[ri+2] & 0xff) << 24));
559             readBufferIndex += 3;
560             return result;
561         }
562     }
563
564     private void writeByte(int b) {
565         writeBytes[writeBufferIndex++] = (byte)b;
566         if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
567         wroteSinceLastRead = true;
568     }
569
570     private void writeInt(int i) {
571         int wi = writeBufferIndex;
572         int room = BUFFER_SIZE - wi;
573         switch (room) {
574         case 0:
575             flushWriteBuffer(BUFFER_SIZE);
576             // Intentional fall-through
577         default:
578             writeBytes[wi]   = (byte)(i&0xff);
579             writeBytes[wi+1] = (byte)((i>>>8)&0xff);
580             writeBytes[wi+2] = (byte)((i>>>16)&0xff);
581             writeBytes[wi+3] = (byte)((i>>>24)&0xff);
582             writeBufferIndex += 4;
583             break;
584
585         case 4:
586             writeBytes[wi]   = (byte)(i&0xff);
587             writeBytes[wi+1] = (byte)((i>>>8)&0xff);
588             writeBytes[wi+2] = (byte)((i>>>16)&0xff);
589             writeBytes[wi+3] = (byte)((i>>>24)&0xff);
590             flushWriteBuffer(BUFFER_SIZE);
591             break;
592
593         case 3:
594             writeBytes[wi]   = (byte)(i&0xff);
595             writeBytes[wi+1] = (byte)((i>>>8)&0xff);
596             writeBytes[wi+2] = (byte)((i>>>16)&0xff);
597             flushWriteBuffer(BUFFER_SIZE);
598             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
599             break;
600
601         case 2:
602             writeBytes[wi]   = (byte)(i&0xff);
603             writeBytes[wi+1] = (byte)((i>>>8)&0xff);
604             flushWriteBuffer(BUFFER_SIZE);
605             wi = writeBufferIndex;
606             writeBytes[wi]   = (byte)((i>>>16)&0xff);
607             writeBytes[wi+1] = (byte)((i>>>24)&0xff);
608             writeBufferIndex += 2;
609             break;
610
611         case 1:
612             writeBytes[wi]   = (byte)(i&0xff);
613             flushWriteBuffer(BUFFER_SIZE);
614             wi = writeBufferIndex;
615             writeBytes[wi]   = (byte)((i>>>8)&0xff);
616             writeBytes[wi+1] = (byte)((i>>>16)&0xff);
617             writeBytes[wi+2] = (byte)((i>>>24)&0xff);
618             writeBufferIndex += 3;
619             break;
620         }
621         wroteSinceLastRead = true;
622     }
623
624     private int tryFlushBuffer(int size) throws IOException {
625         if (DEBUG)
626             System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
627
628         if (tempFile == null) {
629             File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
630             base.mkdirs();
631             tempFile = File.createTempFile("cset-", ".cs", base);
632             file = new RandomAccessFile(tempFile, "rwd");
633             channel = file.getChannel();
634             if (DEBUG)
635                 System.err.println("Allocated temporary file for changeset: " + tempFile);
636         }
637
638         int wrote = 0;
639         writeBuffer.position(0);
640         writeBuffer.limit(size);
641         for (; wrote < size;) {
642             wrote += channel.write(writeBuffer);
643         }
644         return wrote;
645     }
646
647     private void flushWriteBuffer(int size) {
648         if (DEBUG)
649             System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
650         if (size == 0)
651             return;
652
653         if (!overflow) {
654             // Change set is about to become file-backed.
655             // Initialize separate buffer for writing.
656             writeBytes = Arrays.copyOf(readBytes, readBytes.length);
657             writeBuffer = ByteBuffer.wrap(writeBytes);
658             overflow = true;
659         }
660
661         writeBufferIndex = 0;
662         int wrote = 0;
663         try {
664             setFilePosition(writePosition);
665             wrote = tryFlushBuffer(size);
666         } catch (IOException e) {
667             e.printStackTrace();
668             throw new RuntimeDatabaseException(e);
669         } finally {
670             try {
671                 currentPosition = channel.position();
672                 writePosition = currentPosition;
673             } catch (IOException e) {
674                 e.printStackTrace();
675             }
676             if (DEBUG)
677                 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
678         }
679     }
680
681     private boolean resetRead() {
682         if (DEBUG)
683             System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
684         try {
685             // Flush temporary file first if necessary.
686             if (wroteSinceLastRead) {
687                 if (DEBUG)
688                     System.err.println("\tTerminating write of changeset " + tempFile);
689                 writeByte(TERM);
690                 wroteSinceLastRead = false;
691             }
692             if (overflow) {
693                 if (DEBUG)
694                     System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
695                 flushWriteBuffer(writeBufferIndex);
696                 readPosition = 0L;
697                 setFilePosition(readPosition);
698                 fillReadBuffer();
699             } else {
700                 readBufferIndex = 0;
701             }
702             return true;
703         } catch (IOException e) {
704             e.printStackTrace();
705             return false;
706         }
707     }
708
709     private void fillReadBuffer() {
710         readBufferIndex = 0;
711         int read = 0;
712         try {
713             setFilePosition(readPosition);
714             readBuffer.position(0);
715             readBuffer.limit(BUFFER_SIZE);
716             for (;read < BUFFER_SIZE;) {
717                 int got = channel.read(readBuffer);
718                 if (got == -1)
719                     return;
720                 read += got;
721             }
722         } catch (IOException e) {
723             e.printStackTrace();
724         } finally {
725             try {
726                 currentPosition = channel.position();
727                 readPosition = currentPosition;
728             } catch (IOException e) {
729                 e.printStackTrace();
730             }
731             if (DEBUG)
732                 System.err.println("fillReadBuffer read " + read + " bytes");
733         }
734     }
735
736     /**
737      * @param newPosition
738      * @return <code>true</code> if file position was modified
739      * @throws IOException
740      */
741     private boolean setFilePosition(long newPosition) throws IOException {
742         if (channel == null) {
743             return false;
744         }
745         if (newPosition != currentPosition) {
746             if (DEBUG)
747                 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
748             channel.position(newPosition);
749             currentPosition = newPosition;
750             return true;
751         }
752         return false;
753     }
754
755     @Override
756     protected void finalize() throws Throwable {
757         close();
758         super.finalize();
759     }
760
761     public void close() {
762         if (disposed)
763             return;
764
765         disposed = true;
766         session = null;
767         channel = null;
768         if (file != null) {
769             if (DEBUG)
770                 System.err.println("Closing temporary changeset input stream " + file);
771             try {
772                 file.close();
773             } catch (IOException e) {
774             } finally {
775                 file = null;
776             }
777         }
778         if (tempFile != null) {
779             if (DEBUG)
780                 System.err.println("Deleting temporary changeset file " + tempFile);
781             tempFile.delete();
782             tempFile = null;
783         }
784
785         // Release large buffers to minimize memory consumption
786         readBytes = EMPTY;
787         readBuffer = ByteBuffer.wrap(EMPTY);
788         writeBytes = EMPTY;
789         writeBuffer = ByteBuffer.wrap(EMPTY);
790     }
791
792     private void assertAlive() {
793         if (disposed)
794             throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
795     }
796
797     @Override
798     public void dispose() {
799         //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
800         close();
801     }
802
803     public static void main(String[] args) {
804         ClientChangesImpl ccs = new ClientChangesImpl(null);
805         System.out.println("isEmpty=" + ccs.isEmpty());
806         System.out.println("clientChanges=" + ccs.toString());
807         System.out.println("isEmpty=" + ccs.isEmpty());
808     }
809
810 }