000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClientChangesImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.RandomAccessFile;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.FileChannel;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Iterator;
23
24 import org.simantics.db.ChangeSet;
25 import org.simantics.db.Resource;
26 import org.simantics.db.common.internal.config.InternalClientConfig;
27 import org.simantics.db.exception.RuntimeDatabaseException;
28 import org.simantics.db.impl.ResourceImpl;
29
30 /**
31  * @author Antti Villberg
32  * @author Tuukka Lehtonen
33  */
34 public final class ClientChangesImpl implements ChangeSet {
35
36     private static final boolean DEBUG = false;
37
38     private static final int BUFFER_SIZE = 1 << 17;
39
40     private static final byte CLAIM = 1;
41     private static final byte DENY = 2;
42     private static final byte VALUE = 3;
43     private static final byte TERM = 4;
44     private static final byte INVALIDATE = 5;
45
46     private SessionImplSocket session;
47
48     /**
49      * True if enough data has been written to overflow the normal buffer which
50      * will result in data being written to disk.
51      */
52     private boolean overflow = false;
53
54     private final byte[] readBytes = new byte[BUFFER_SIZE];
55     // By default the write buffer will be the exact same as the read buffer
56     // since initially were not forced to write changes to the disk.
57     private byte[] writeBytes = readBytes;
58
59     private int readBufferIndex = 0;
60     private int writeBufferIndex = 0;
61
62     /**
63      * <code>true</code> if something has been written since last call to
64      * {@link #resetRead()}. By default it is <code>true</code> since
65      * {@link #resetRead()} must on its first invocation write the {@link #TERM}
66      * constant into the stream.
67      */
68     private boolean wroteSinceLastRead = true;
69
70     private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
71     private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
72
73     private File tempFile;
74     private RandomAccessFile file;
75     private FileChannel channel;
76     private long currentPosition = 0L;
77     private long readPosition = 0L;
78     private long writePosition = 0L;
79
80     static abstract class NoRemoveIterator<T> implements Iterator<T> {
81         @Override
82         public void remove() {
83             throw new UnsupportedOperationException();
84         }
85     }
86
87     /**
88      * Base class for all immutable collections returned by
89      * {@link ClientChangesImpl}.
90      * 
91      * @param <T>
92      */
93     static class CollectionBase<T> implements Collection<T> {
94
95         @Override
96         public int size() {
97             throw new UnsupportedOperationException();
98         }
99
100         @Override
101         public boolean isEmpty() {
102             throw new UnsupportedOperationException();
103         }
104
105         @Override
106         public boolean contains(Object o) {
107             throw new UnsupportedOperationException();
108         }
109
110         @Override
111         public Iterator<T> iterator() {
112             throw new UnsupportedOperationException();
113         }
114
115         @Override
116         public Object[] toArray() {
117             throw new UnsupportedOperationException();
118         }
119
120         @Override
121         public <TT> TT[] toArray(TT[] a) {
122             throw new UnsupportedOperationException();
123         }
124
125         @Override
126         public boolean add(T e) {
127             throw new UnsupportedOperationException();
128         }
129
130         @Override
131         public boolean remove(Object o) {
132             throw new UnsupportedOperationException();
133         }
134
135         @Override
136         public boolean containsAll(Collection<?> c) {
137             throw new UnsupportedOperationException();
138         }
139
140         @Override
141         public boolean addAll(Collection<? extends T> c) {
142             throw new UnsupportedOperationException();
143         }
144
145         @Override
146         public boolean removeAll(Collection<?> c) {
147             throw new UnsupportedOperationException();
148         }
149
150         @Override
151         public boolean retainAll(Collection<?> c) {
152             throw new UnsupportedOperationException();
153         }
154
155         @Override
156         public void clear() {
157             throw new UnsupportedOperationException();
158         }
159
160     };
161
162     ClientChangesImpl(SessionImplSocket session) {
163         this.session = session;
164     }
165
166     @Override
167     public String toString() {
168         final int LIMIT = 10000;
169         String to = "";
170         for (StatementChange s : changedStatements()) {
171             String op = s.isClaim() ? "add" : "rem";
172             to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
173             if (to.length() > LIMIT) {
174                 to += "... more.\n";
175                 return to;
176             }
177         }
178         for (Resource r : changedValues()) {
179             to += "\nmod " + r; 
180             if (to.length() > LIMIT) {
181                 to += "...etc.\n";
182                 return to;
183             }
184         }
185         return to;
186     }
187
188     private Resource getResource(int id) {
189         return session.getResource(id);
190     }
191
192     @Override
193     public synchronized Collection<StatementChange> changedStatements() {
194
195         if (!resetRead())
196             return Collections.emptyList();
197
198         return new CollectionBase<StatementChange>() {
199
200             StatementChange seek() {
201                 try {
202                     while(true) {
203                         byte method = readByte();
204                         switch(method) {
205                         case TERM: {
206                             return null;
207                         }
208                         case CLAIM: {
209                             Resource s = getResource(readInt());
210                             Resource p = getResource(readInt());
211                             Resource o = getResource(readInt());
212                             return new StatementChangeImpl(s, p, o, true);
213                         }
214                         case DENY: {
215                             Resource s = getResource(readInt());
216                             Resource p = getResource(readInt());
217                             Resource o = getResource(readInt());
218                             return new StatementChangeImpl(s, p, o, false);
219                         }
220                         case VALUE:
221                         case INVALIDATE: {
222                             // Skipping value opererations.
223                             getResource(readInt());
224                             break;
225                         }
226                         }
227                     }
228                 } catch(Exception e) {
229                     throw new RuntimeDatabaseException(e);
230                 }
231             }
232
233             private StatementChange next = seek();
234
235             @Override
236             public Iterator<StatementChange> iterator() {
237                 return new NoRemoveIterator<StatementChange>() {
238                     @Override
239                     public boolean hasNext() {
240                         return next != null;
241                     }
242                     @Override
243                     public StatementChange next() {
244                         StatementChange result = next;
245                         next = seek();
246                         return result;
247                     }
248                 };
249             }
250
251         };
252
253     }
254
255     @Override
256     public Collection<Resource> changedValues() {
257
258         if (!resetRead())
259             return Collections.emptyList();
260
261         return new CollectionBase<Resource>() {
262
263             Resource seek() {
264                 try {
265                     while(true) {
266                         byte method = readByte();
267                         switch(method) {
268                         case TERM: {
269                             return null;
270                         }
271                         case CLAIM: {
272                             getResource(readInt());
273                             getResource(readInt());
274                             getResource(readInt());
275                             break;
276                         }
277                         case DENY: {
278                             getResource(readInt());
279                             getResource(readInt());
280                             getResource(readInt());
281                             break;
282                         }
283                         case VALUE:
284                         case INVALIDATE: {
285                             return getResource(readInt());
286                         }
287                         }
288                     }
289                 } catch(Exception e) {
290                     return null;
291                 }
292             }
293
294             private Resource next = seek();
295
296             @Override
297             public Iterator<Resource> iterator() {
298                 return new NoRemoveIterator<Resource>() {
299                     @Override
300                     public boolean hasNext() {
301                         return next != null;
302                     }
303                     @Override
304                     public Resource next() {
305                         Resource result = next;
306                         next = seek();
307                         return result;
308                     }
309                 };
310             }
311
312         };
313
314     }
315
316     @Override
317     public Collection<Resource> changedResources() {
318
319         if (!resetRead())
320             return Collections.emptyList();
321
322         return new CollectionBase<Resource>() {
323
324             Resource seek() {
325
326                 try {
327                     while(true) {
328                         byte method = readByte();
329                         switch(method) {
330                         case TERM: {
331                             return null;
332                         }
333                         case CLAIM: {
334                             getResource(readInt());
335                             getResource(readInt());
336                             getResource(readInt());
337                             break;
338                         }
339                         case DENY: {
340                             getResource(readInt());
341                             getResource(readInt());
342                             getResource(readInt());
343                             break;
344                         }
345                         case VALUE: {
346                             getResource(readInt());
347                             break;
348                         }
349                         case INVALIDATE: {
350                             return getResource(readInt());
351                         }
352                         }
353                     }
354                 } catch(Exception e) {
355                     return null;
356                 }
357
358             }
359
360             private Resource next = seek();
361
362             @Override
363             public Iterator<Resource> iterator() {
364                 return new NoRemoveIterator<Resource>() {
365                     @Override
366                     public boolean hasNext() {
367                         return next != null;
368                     }
369                     @Override
370                     public Resource next() {
371                         Resource result = next;
372                         next = seek();
373                         return result;
374                     }
375                 };
376             }
377
378         };
379
380     }
381
382     @Override
383     public boolean isEmpty() {
384         if (notEmpty)
385             return false;
386         return writeBufferIndex == 0 && !overflow;
387     }
388
389     private boolean notEmpty = false;
390     void setNotEmpty(boolean notEmpty) {
391         this.notEmpty = notEmpty;
392     }
393
394     void claim(Resource s, Resource p, Resource o) {
395         writeByte(CLAIM);
396         writeInt(session.getId((ResourceImpl) s));
397         writeInt(session.getId((ResourceImpl) p));
398         writeInt(session.getId((ResourceImpl) o));
399         if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
400     }
401
402     private int sCache1 = -1;
403     private int sCache2 = -1;
404     private int sCache3 = -1;
405
406     final void invalidate(int s) {
407         
408         if(s == sCache1) {
409                 return;
410         } else if (s == sCache2) {
411                 sCache2 = sCache1;
412                 sCache1 = s;
413                 return;
414         } else if (s == sCache3) {
415                 sCache3 = sCache2;
416                 sCache2 = sCache1;
417                 sCache1 = s;
418                 return;
419         }
420
421         sCache3 = sCache2;
422         sCache2 = sCache1;
423         sCache1 = s;
424         
425         writeByte(INVALIDATE);
426         writeInt(s);
427
428         if(DEBUG) System.err.println("invalidate2 " + s);
429
430     }
431
432     void claim(int s, int p, int o) {
433         writeByte(CLAIM);
434         writeInt(s);
435         writeInt(p);
436         writeInt(o);
437         if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
438     }
439
440     void deny(Resource s, Resource p, Resource o) {
441         writeByte(DENY);
442         writeInt(session.getId((ResourceImpl)s));
443         writeInt(session.getId((ResourceImpl)p));
444         writeInt(session.getId((ResourceImpl)o));
445         if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
446     }
447
448     void deny(int s, int p, int o) {
449         writeByte(DENY);
450         writeInt(s);
451         writeInt(p);
452         writeInt(o);
453         if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
454     }
455
456     void claimValue(Resource r) {
457         writeByte(VALUE);
458         writeInt(session.getId((ResourceImpl)r));
459         if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
460     }
461
462     void claimValue(int r) {
463         writeByte(VALUE);
464         writeInt(r);
465         if(DEBUG) System.err.println("deny2 " + r);
466     }
467
468     // Streaming
469
470     private byte readByte() {
471         byte result = readBytes[readBufferIndex++]; 
472         if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
473         return result;
474     }
475
476         private int readInt() {
477         if(readBufferIndex < (BUFFER_SIZE-4)) {
478             int result = (int) 
479                 ((readBytes[readBufferIndex++] & 0xff) | 
480                 ((readBytes[readBufferIndex++] & 0xff) << 8) | 
481                 ((readBytes[readBufferIndex++] & 0xff) << 16) | 
482                 ((readBytes[readBufferIndex++] & 0xff) << 24)); 
483             return result;
484         } else {
485             int has = BUFFER_SIZE-readBufferIndex;
486             int result = 0;
487             if(has == 0) fillReadBuffer();
488             result = (int)(readBytes[readBufferIndex++] & 0xff);
489             if(has == 1) fillReadBuffer();
490             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
491             if(has == 2) fillReadBuffer();
492             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
493             if(has == 3) fillReadBuffer();
494             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
495             if(has == 4) fillReadBuffer();
496             return result;
497         }
498     }
499
500     private void writeByte(int b) {
501         writeBytes[writeBufferIndex++] = (byte)b;
502         if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
503         wroteSinceLastRead = true;
504     }
505
506     private void writeInt(int i) {
507         if(writeBufferIndex < (BUFFER_SIZE-4)) {
508             writeBytes[writeBufferIndex++] = (byte)(i&0xff);
509             writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
510             writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
511             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
512         } else {
513             int has = BUFFER_SIZE-writeBufferIndex;
514             if(has == 0) flushWriteBuffer(BUFFER_SIZE);
515             writeBytes[writeBufferIndex++] = (byte)(i&0xff);
516             if(has == 1) flushWriteBuffer(BUFFER_SIZE);
517             writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
518             if(has == 2) flushWriteBuffer(BUFFER_SIZE);
519             writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
520             if(has == 3) flushWriteBuffer(BUFFER_SIZE);
521             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
522             if(has == 4) flushWriteBuffer(BUFFER_SIZE);
523         }
524         wroteSinceLastRead = true;
525     }
526
527     private int tryFlushBuffer(int size) throws IOException {
528         if (DEBUG)
529             System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
530
531         if (tempFile == null) {
532             File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
533             base.mkdirs();
534             tempFile = File.createTempFile("cset-", ".cs", base);
535             file = new RandomAccessFile(tempFile, "rwd");
536             channel = file.getChannel();
537             if (DEBUG)
538                 System.err.println("Allocated temporary file for changeset: " + tempFile);
539         }
540
541         int wrote = 0;
542         writeBuffer.position(0);
543         writeBuffer.limit(size);
544         for (; wrote < size;) {
545             wrote += channel.write(writeBuffer);
546         }
547         return wrote;
548     }
549
550     private void flushWriteBuffer(int size) {
551         if (DEBUG)
552             System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
553         if (size == 0)
554             return;
555
556         if (!overflow) {
557             // Change set is about to become file-backed.
558             // Initialize separate buffer for writing.
559             writeBytes = Arrays.copyOf(readBytes, readBytes.length);
560             writeBuffer = ByteBuffer.wrap(writeBytes);
561             overflow = true;
562         }
563
564         writeBufferIndex = 0;
565         int wrote = 0;
566         try {
567             setFilePosition(writePosition);
568             wrote = tryFlushBuffer(size);
569         } catch (IOException e) {
570             e.printStackTrace();
571             throw new RuntimeDatabaseException(e);
572         } finally {
573             try {
574                 currentPosition = channel.position();
575                 writePosition = currentPosition;
576             } catch (IOException e) {
577                 e.printStackTrace();
578             }
579             if (DEBUG)
580                 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
581         }
582     }
583
584     private boolean resetRead() {
585         if (DEBUG)
586             System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
587         try {
588             // Flush temporary file first if necessary.
589             if (wroteSinceLastRead) {
590                 if (DEBUG)
591                     System.err.println("\tTerminating write of changeset " + tempFile);
592                 writeByte(TERM);
593                 wroteSinceLastRead = false;
594             }
595             if (overflow) {
596                 if (DEBUG)
597                     System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
598                 flushWriteBuffer(writeBufferIndex);
599                 readPosition = 0L;
600                 setFilePosition(readPosition);
601                 fillReadBuffer();
602             } else {
603                 readBufferIndex = 0;
604             }
605             return true;
606         } catch (IOException e) {
607             e.printStackTrace();
608             return false;
609         }
610     }
611
612     private void fillReadBuffer() {
613         readBufferIndex = 0;
614         int read = 0;
615         try {
616             setFilePosition(readPosition);
617             readBuffer.position(0);
618             readBuffer.limit(BUFFER_SIZE);
619             for (;read < BUFFER_SIZE;) {
620                 int got = channel.read(readBuffer);
621                 if (got == -1)
622                     return;
623                 read += got;
624             }
625         } catch (IOException e) {
626             e.printStackTrace();
627         } finally {
628             try {
629                 currentPosition = channel.position();
630                 readPosition = currentPosition;
631             } catch (IOException e) {
632                 e.printStackTrace();
633             }
634             if (DEBUG)
635                 System.err.println("fillReadBuffer read " + read + " bytes");
636         }
637     }
638
639     /**
640      * @param newPosition
641      * @return <code>true</code> if file position was modified
642      * @throws IOException
643      */
644     private boolean setFilePosition(long newPosition) throws IOException {
645         if (channel == null) {
646             return false;
647         }
648         if (newPosition != currentPosition) {
649             if (DEBUG)
650                 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
651             channel.position(newPosition);
652             currentPosition = newPosition;
653             return true;
654         }
655         return false;
656     }
657
658     @Override
659     protected void finalize() throws Throwable {
660         close();
661         super.finalize();
662     }
663
664     public void close() {
665         session = null;
666         channel = null;
667         if (file != null) {
668             if (DEBUG)
669                 System.err.println("Closing temporary changeset input stream " + file);
670             try {
671                 file.close();
672             } catch (IOException e) {
673             } finally {
674                 file = null;
675             }
676         }
677         if (tempFile != null) {
678             if (DEBUG)
679                 System.err.println("Deleting temporary changeset file " + tempFile);
680             tempFile.delete();
681             tempFile = null;
682         }
683     }
684
685     public static void main(String[] args) {
686         ClientChangesImpl ccs = new ClientChangesImpl(null);
687         System.out.println("isEmpty=" + ccs.isEmpty());
688         System.out.println("clientChanges=" + ccs.toString());
689         System.out.println("isEmpty=" + ccs.isEmpty());
690     }
691
692 }