]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
Merge commit '31664b6'
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClientChangesImpl.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.File;\r
15 import java.io.IOException;\r
16 import java.io.RandomAccessFile;\r
17 import java.nio.ByteBuffer;\r
18 import java.nio.channels.FileChannel;\r
19 import java.util.Arrays;\r
20 import java.util.Collection;\r
21 import java.util.Collections;\r
22 import java.util.Iterator;\r
23 \r
24 import org.simantics.db.ChangeSet;\r
25 import org.simantics.db.Resource;\r
26 import org.simantics.db.common.internal.config.InternalClientConfig;\r
27 import org.simantics.db.exception.RuntimeDatabaseException;\r
28 import org.simantics.db.impl.ResourceImpl;\r
29 \r
30 /**\r
31  * @author Antti Villberg\r
32  * @author Tuukka Lehtonen\r
33  */\r
34 public final class ClientChangesImpl implements ChangeSet {\r
35 \r
36     private static final boolean DEBUG = false;\r
37 \r
38     private static final int BUFFER_SIZE = 1 << 17;\r
39 \r
40     private static final byte CLAIM = 1;\r
41     private static final byte DENY = 2;\r
42     private static final byte VALUE = 3;\r
43     private static final byte TERM = 4;\r
44     private static final byte INVALIDATE = 5;\r
45 \r
46     private SessionImplSocket session;\r
47 \r
48     /**\r
49      * True if enough data has been written to overflow the normal buffer which\r
50      * will result in data being written to disk.\r
51      */\r
52     private boolean overflow = false;\r
53 \r
54     private final byte[] readBytes = new byte[BUFFER_SIZE];\r
55     // By default the write buffer will be the exact same as the read buffer\r
56     // since initially were not forced to write changes to the disk.\r
57     private byte[] writeBytes = readBytes;\r
58 \r
59     private int readBufferIndex = 0;\r
60     private int writeBufferIndex = 0;\r
61 \r
62     /**\r
63      * <code>true</code> if something has been written since last call to\r
64      * {@link #resetRead()}. By default it is <code>true</code> since\r
65      * {@link #resetRead()} must on its first invocation write the {@link #TERM}\r
66      * constant into the stream.\r
67      */\r
68     private boolean wroteSinceLastRead = true;\r
69 \r
70     private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);\r
71     private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);\r
72 \r
73     private File tempFile;\r
74     private RandomAccessFile file;\r
75     private FileChannel channel;\r
76     private long currentPosition = 0L;\r
77     private long readPosition = 0L;\r
78     private long writePosition = 0L;\r
79 \r
80     static abstract class NoRemoveIterator<T> implements Iterator<T> {\r
81         @Override\r
82         public void remove() {\r
83             throw new UnsupportedOperationException();\r
84         }\r
85     }\r
86 \r
87     /**\r
88      * Base class for all immutable collections returned by\r
89      * {@link ClientChangesImpl}.\r
90      * \r
91      * @param <T>\r
92      */\r
93     static class CollectionBase<T> implements Collection<T> {\r
94 \r
95         @Override\r
96         public int size() {\r
97             throw new UnsupportedOperationException();\r
98         }\r
99 \r
100         @Override\r
101         public boolean isEmpty() {\r
102             throw new UnsupportedOperationException();\r
103         }\r
104 \r
105         @Override\r
106         public boolean contains(Object o) {\r
107             throw new UnsupportedOperationException();\r
108         }\r
109 \r
110         @Override\r
111         public Iterator<T> iterator() {\r
112             throw new UnsupportedOperationException();\r
113         }\r
114 \r
115         @Override\r
116         public Object[] toArray() {\r
117             throw new UnsupportedOperationException();\r
118         }\r
119 \r
120         @Override\r
121         public <TT> TT[] toArray(TT[] a) {\r
122             throw new UnsupportedOperationException();\r
123         }\r
124 \r
125         @Override\r
126         public boolean add(T e) {\r
127             throw new UnsupportedOperationException();\r
128         }\r
129 \r
130         @Override\r
131         public boolean remove(Object o) {\r
132             throw new UnsupportedOperationException();\r
133         }\r
134 \r
135         @Override\r
136         public boolean containsAll(Collection<?> c) {\r
137             throw new UnsupportedOperationException();\r
138         }\r
139 \r
140         @Override\r
141         public boolean addAll(Collection<? extends T> c) {\r
142             throw new UnsupportedOperationException();\r
143         }\r
144 \r
145         @Override\r
146         public boolean removeAll(Collection<?> c) {\r
147             throw new UnsupportedOperationException();\r
148         }\r
149 \r
150         @Override\r
151         public boolean retainAll(Collection<?> c) {\r
152             throw new UnsupportedOperationException();\r
153         }\r
154 \r
155         @Override\r
156         public void clear() {\r
157             throw new UnsupportedOperationException();\r
158         }\r
159 \r
160     };\r
161 \r
162     ClientChangesImpl(SessionImplSocket session) {\r
163         this.session = session;\r
164     }\r
165 \r
166     @Override\r
167     public String toString() {\r
168         final int LIMIT = 10000;\r
169         String to = "";\r
170         for (StatementChange s : changedStatements()) {\r
171             String op = s.isClaim() ? "add" : "rem";\r
172             to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();\r
173             if (to.length() > LIMIT) {\r
174                 to += "... more.\n";\r
175                 return to;\r
176             }\r
177         }\r
178         for (Resource r : changedValues()) {\r
179             to += "\nmod " + r; \r
180             if (to.length() > LIMIT) {\r
181                 to += "...etc.\n";\r
182                 return to;\r
183             }\r
184         }\r
185         return to;\r
186     }\r
187 \r
188     private Resource getResource(int id) {\r
189         return session.getResource(id);\r
190     }\r
191 \r
192     @Override\r
193     public synchronized Collection<StatementChange> changedStatements() {\r
194 \r
195         if (!resetRead())\r
196             return Collections.emptyList();\r
197 \r
198         return new CollectionBase<StatementChange>() {\r
199 \r
200             StatementChange seek() {\r
201                 try {\r
202                     while(true) {\r
203                         byte method = readByte();\r
204                         switch(method) {\r
205                         case TERM: {\r
206                             return null;\r
207                         }\r
208                         case CLAIM: {\r
209                             Resource s = getResource(readInt());\r
210                             Resource p = getResource(readInt());\r
211                             Resource o = getResource(readInt());\r
212                             return new StatementChangeImpl(s, p, o, true);\r
213                         }\r
214                         case DENY: {\r
215                             Resource s = getResource(readInt());\r
216                             Resource p = getResource(readInt());\r
217                             Resource o = getResource(readInt());\r
218                             return new StatementChangeImpl(s, p, o, false);\r
219                         }\r
220                         case VALUE:\r
221                         case INVALIDATE: {\r
222                             // Skipping value opererations.\r
223                             getResource(readInt());\r
224                             break;\r
225                         }\r
226                         }\r
227                     }\r
228                 } catch(Exception e) {\r
229                     throw new RuntimeDatabaseException(e);\r
230                 }\r
231             }\r
232 \r
233             private StatementChange next = seek();\r
234 \r
235             @Override\r
236             public Iterator<StatementChange> iterator() {\r
237                 return new NoRemoveIterator<StatementChange>() {\r
238                     @Override\r
239                     public boolean hasNext() {\r
240                         return next != null;\r
241                     }\r
242                     @Override\r
243                     public StatementChange next() {\r
244                         StatementChange result = next;\r
245                         next = seek();\r
246                         return result;\r
247                     }\r
248                 };\r
249             }\r
250 \r
251         };\r
252 \r
253     }\r
254 \r
255     @Override\r
256     public Collection<Resource> changedValues() {\r
257 \r
258         if (!resetRead())\r
259             return Collections.emptyList();\r
260 \r
261         return new CollectionBase<Resource>() {\r
262 \r
263             Resource seek() {\r
264                 try {\r
265                     while(true) {\r
266                         byte method = readByte();\r
267                         switch(method) {\r
268                         case TERM: {\r
269                             return null;\r
270                         }\r
271                         case CLAIM: {\r
272                             getResource(readInt());\r
273                             getResource(readInt());\r
274                             getResource(readInt());\r
275                             break;\r
276                         }\r
277                         case DENY: {\r
278                             getResource(readInt());\r
279                             getResource(readInt());\r
280                             getResource(readInt());\r
281                             break;\r
282                         }\r
283                         case VALUE:\r
284                         case INVALIDATE: {\r
285                             return getResource(readInt());\r
286                         }\r
287                         }\r
288                     }\r
289                 } catch(Exception e) {\r
290                     return null;\r
291                 }\r
292             }\r
293 \r
294             private Resource next = seek();\r
295 \r
296             @Override\r
297             public Iterator<Resource> iterator() {\r
298                 return new NoRemoveIterator<Resource>() {\r
299                     @Override\r
300                     public boolean hasNext() {\r
301                         return next != null;\r
302                     }\r
303                     @Override\r
304                     public Resource next() {\r
305                         Resource result = next;\r
306                         next = seek();\r
307                         return result;\r
308                     }\r
309                 };\r
310             }\r
311 \r
312         };\r
313 \r
314     }\r
315 \r
316     @Override\r
317     public Collection<Resource> changedResources() {\r
318 \r
319         if (!resetRead())\r
320             return Collections.emptyList();\r
321 \r
322         return new CollectionBase<Resource>() {\r
323 \r
324             Resource seek() {\r
325 \r
326                 try {\r
327                     while(true) {\r
328                         byte method = readByte();\r
329                         switch(method) {\r
330                         case TERM: {\r
331                             return null;\r
332                         }\r
333                         case CLAIM: {\r
334                             getResource(readInt());\r
335                             getResource(readInt());\r
336                             getResource(readInt());\r
337                             break;\r
338                         }\r
339                         case DENY: {\r
340                             getResource(readInt());\r
341                             getResource(readInt());\r
342                             getResource(readInt());\r
343                             break;\r
344                         }\r
345                         case VALUE: {\r
346                             getResource(readInt());\r
347                             break;\r
348                         }\r
349                         case INVALIDATE: {\r
350                             return getResource(readInt());\r
351                         }\r
352                         }\r
353                     }\r
354                 } catch(Exception e) {\r
355                     return null;\r
356                 }\r
357 \r
358             }\r
359 \r
360             private Resource next = seek();\r
361 \r
362             @Override\r
363             public Iterator<Resource> iterator() {\r
364                 return new NoRemoveIterator<Resource>() {\r
365                     @Override\r
366                     public boolean hasNext() {\r
367                         return next != null;\r
368                     }\r
369                     @Override\r
370                     public Resource next() {\r
371                         Resource result = next;\r
372                         next = seek();\r
373                         return result;\r
374                     }\r
375                 };\r
376             }\r
377 \r
378         };\r
379 \r
380     }\r
381 \r
382     @Override\r
383     public boolean isEmpty() {\r
384         if (notEmpty)\r
385             return false;\r
386         return writeBufferIndex == 0 && !overflow;\r
387     }\r
388 \r
389     private boolean notEmpty = false;\r
390     void setNotEmpty(boolean notEmpty) {\r
391         this.notEmpty = notEmpty;\r
392     }\r
393 \r
394     void claim(Resource s, Resource p, Resource o) {\r
395         writeByte(CLAIM);\r
396         writeInt(session.getId((ResourceImpl) s));\r
397         writeInt(session.getId((ResourceImpl) p));\r
398         writeInt(session.getId((ResourceImpl) o));\r
399         if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());\r
400     }\r
401 \r
402     private int sCache1 = -1;\r
403     private int sCache2 = -1;\r
404     private int sCache3 = -1;\r
405 \r
406     final void invalidate(int s) {\r
407         \r
408         if(s == sCache1) {\r
409                 return;\r
410         } else if (s == sCache2) {\r
411                 sCache2 = sCache1;\r
412                 sCache1 = s;\r
413                 return;\r
414         } else if (s == sCache3) {\r
415                 sCache3 = sCache2;\r
416                 sCache2 = sCache1;\r
417                 sCache1 = s;\r
418                 return;\r
419         }\r
420 \r
421         sCache3 = sCache2;\r
422         sCache2 = sCache1;\r
423         sCache1 = s;\r
424         \r
425         writeByte(INVALIDATE);\r
426         writeInt(s);\r
427 \r
428         if(DEBUG) System.err.println("invalidate2 " + s);\r
429 \r
430     }\r
431 \r
432     void claim(int s, int p, int o) {\r
433         writeByte(CLAIM);\r
434         writeInt(s);\r
435         writeInt(p);\r
436         writeInt(o);\r
437         if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);\r
438     }\r
439 \r
440     void deny(Resource s, Resource p, Resource o) {\r
441         writeByte(DENY);\r
442         writeInt(session.getId((ResourceImpl)s));\r
443         writeInt(session.getId((ResourceImpl)p));\r
444         writeInt(session.getId((ResourceImpl)o));\r
445         if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));\r
446     }\r
447 \r
448     void deny(int s, int p, int o) {\r
449         writeByte(DENY);\r
450         writeInt(s);\r
451         writeInt(p);\r
452         writeInt(o);\r
453         if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);\r
454     }\r
455 \r
456     void claimValue(Resource r) {\r
457         writeByte(VALUE);\r
458         writeInt(session.getId((ResourceImpl)r));\r
459         if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());\r
460     }\r
461 \r
462     void claimValue(int r) {\r
463         writeByte(VALUE);\r
464         writeInt(r);\r
465         if(DEBUG) System.err.println("deny2 " + r);\r
466     }\r
467 \r
468     // Streaming\r
469 \r
470     private byte readByte() {\r
471         byte result = readBytes[readBufferIndex++]; \r
472         if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();\r
473         return result;\r
474     }\r
475 \r
476         private int readInt() {\r
477         if(readBufferIndex < (BUFFER_SIZE-4)) {\r
478             int result = (int) \r
479                 ((readBytes[readBufferIndex++] & 0xff) | \r
480                 ((readBytes[readBufferIndex++] & 0xff) << 8) | \r
481                 ((readBytes[readBufferIndex++] & 0xff) << 16) | \r
482                 ((readBytes[readBufferIndex++] & 0xff) << 24)); \r
483             return result;\r
484         } else {\r
485             int has = BUFFER_SIZE-readBufferIndex;\r
486             int result = 0;\r
487             if(has == 0) fillReadBuffer();\r
488             result = (int)(readBytes[readBufferIndex++] & 0xff);\r
489             if(has == 1) fillReadBuffer();\r
490             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);\r
491             if(has == 2) fillReadBuffer();\r
492             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);\r
493             if(has == 3) fillReadBuffer();\r
494             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);\r
495             if(has == 4) fillReadBuffer();\r
496             return result;\r
497         }\r
498     }\r
499 \r
500     private void writeByte(int b) {\r
501         writeBytes[writeBufferIndex++] = (byte)b;\r
502         if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);\r
503         wroteSinceLastRead = true;\r
504     }\r
505 \r
506     private void writeInt(int i) {\r
507         if(writeBufferIndex < (BUFFER_SIZE-4)) {\r
508             writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
509             writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
510             writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
511             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
512         } else {\r
513             int has = BUFFER_SIZE-writeBufferIndex;\r
514             if(has == 0) flushWriteBuffer(BUFFER_SIZE);\r
515             writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
516             if(has == 1) flushWriteBuffer(BUFFER_SIZE);\r
517             writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
518             if(has == 2) flushWriteBuffer(BUFFER_SIZE);\r
519             writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
520             if(has == 3) flushWriteBuffer(BUFFER_SIZE);\r
521             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
522             if(has == 4) flushWriteBuffer(BUFFER_SIZE);\r
523         }\r
524         wroteSinceLastRead = true;\r
525     }\r
526 \r
527     private int tryFlushBuffer(int size) throws IOException {\r
528         if (DEBUG)\r
529             System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);\r
530 \r
531         if (tempFile == null) {\r
532             File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;\r
533             base.mkdirs();\r
534             tempFile = File.createTempFile("cset-", ".cs", base);\r
535             file = new RandomAccessFile(tempFile, "rwd");\r
536             channel = file.getChannel();\r
537             if (DEBUG)\r
538                 System.err.println("Allocated temporary file for changeset: " + tempFile);\r
539         }\r
540 \r
541         int wrote = 0;\r
542         writeBuffer.position(0);\r
543         writeBuffer.limit(size);\r
544         for (; wrote < size;) {\r
545             wrote += channel.write(writeBuffer);\r
546         }\r
547         return wrote;\r
548     }\r
549 \r
550     private void flushWriteBuffer(int size) {\r
551         if (DEBUG)\r
552             System.err.println("flushBuffer(" + size + ") overflow=" + overflow);\r
553         if (size == 0)\r
554             return;\r
555 \r
556         if (!overflow) {\r
557             // Change set is about to become file-backed.\r
558             // Initialize separate buffer for writing.\r
559             writeBytes = Arrays.copyOf(readBytes, readBytes.length);\r
560             writeBuffer = ByteBuffer.wrap(writeBytes);\r
561             overflow = true;\r
562         }\r
563 \r
564         writeBufferIndex = 0;\r
565         int wrote = 0;\r
566         try {\r
567             setFilePosition(writePosition);\r
568             wrote = tryFlushBuffer(size);\r
569         } catch (IOException e) {\r
570             e.printStackTrace();\r
571             throw new RuntimeDatabaseException(e);\r
572         } finally {\r
573             try {\r
574                 currentPosition = channel.position();\r
575                 writePosition = currentPosition;\r
576             } catch (IOException e) {\r
577                 e.printStackTrace();\r
578             }\r
579             if (DEBUG)\r
580                 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");\r
581         }\r
582     }\r
583 \r
584     private boolean resetRead() {\r
585         if (DEBUG)\r
586             System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);\r
587         try {\r
588             // Flush temporary file first if necessary.\r
589             if (wroteSinceLastRead) {\r
590                 if (DEBUG)\r
591                     System.err.println("\tTerminating write of changeset " + tempFile);\r
592                 writeByte(TERM);\r
593                 wroteSinceLastRead = false;\r
594             }\r
595             if (overflow) {\r
596                 if (DEBUG)\r
597                     System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());\r
598                 flushWriteBuffer(writeBufferIndex);\r
599                 readPosition = 0L;\r
600                 setFilePosition(readPosition);\r
601                 fillReadBuffer();\r
602             } else {\r
603                 readBufferIndex = 0;\r
604             }\r
605             return true;\r
606         } catch (IOException e) {\r
607             e.printStackTrace();\r
608             return false;\r
609         }\r
610     }\r
611 \r
612     private void fillReadBuffer() {\r
613         readBufferIndex = 0;\r
614         int read = 0;\r
615         try {\r
616             setFilePosition(readPosition);\r
617             readBuffer.position(0);\r
618             readBuffer.limit(BUFFER_SIZE);\r
619             for (;read < BUFFER_SIZE;) {\r
620                 int got = channel.read(readBuffer);\r
621                 if (got == -1)\r
622                     return;\r
623                 read += got;\r
624             }\r
625         } catch (IOException e) {\r
626             e.printStackTrace();\r
627         } finally {\r
628             try {\r
629                 currentPosition = channel.position();\r
630                 readPosition = currentPosition;\r
631             } catch (IOException e) {\r
632                 e.printStackTrace();\r
633             }\r
634             if (DEBUG)\r
635                 System.err.println("fillReadBuffer read " + read + " bytes");\r
636         }\r
637     }\r
638 \r
639     /**\r
640      * @param newPosition\r
641      * @return <code>true</code> if file position was modified\r
642      * @throws IOException\r
643      */\r
644     private boolean setFilePosition(long newPosition) throws IOException {\r
645         if (channel == null) {\r
646             return false;\r
647         }\r
648         if (newPosition != currentPosition) {\r
649             if (DEBUG)\r
650                 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);\r
651             channel.position(newPosition);\r
652             currentPosition = newPosition;\r
653             return true;\r
654         }\r
655         return false;\r
656     }\r
657 \r
658     @Override\r
659     protected void finalize() throws Throwable {\r
660         close();\r
661         super.finalize();\r
662     }\r
663 \r
664     public void close() {\r
665         session = null;\r
666         channel = null;\r
667         if (file != null) {\r
668             if (DEBUG)\r
669                 System.err.println("Closing temporary changeset input stream " + file);\r
670             try {\r
671                 file.close();\r
672             } catch (IOException e) {\r
673             } finally {\r
674                 file = null;\r
675             }\r
676         }\r
677         if (tempFile != null) {\r
678             if (DEBUG)\r
679                 System.err.println("Deleting temporary changeset file " + tempFile);\r
680             tempFile.delete();\r
681             tempFile = null;\r
682         }\r
683     }\r
684 \r
685     public static void main(String[] args) {\r
686         ClientChangesImpl ccs = new ClientChangesImpl(null);\r
687         System.out.println("isEmpty=" + ccs.isEmpty());\r
688         System.out.println("clientChanges=" + ccs.toString());\r
689         System.out.println("isEmpty=" + ccs.isEmpty());\r
690     }\r
691 \r
692 }\r