1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.RandomAccessFile;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.FileChannel;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Iterator;
24 import org.simantics.db.ChangeSet;
25 import org.simantics.db.Disposable;
26 import org.simantics.db.Resource;
27 import org.simantics.db.common.internal.config.InternalClientConfig;
28 import org.simantics.db.exception.RuntimeDatabaseException;
29 import org.simantics.db.impl.ResourceImpl;
32 * @author Antti Villberg
33 * @author Tuukka Lehtonen
35 public final class ClientChangesImpl implements ChangeSet, Disposable {
37 private static final boolean ASSERT_ALIVE = true;
39 private static final byte[] EMPTY = {};
41 private static final boolean DEBUG = false;
43 private static final int BUFFER_SIZE = 1 << 17;
45 private static final byte CLAIM = 1;
46 private static final byte DENY = 2;
47 private static final byte VALUE = 3;
48 private static final byte TERM = 4;
49 private static final byte INVALIDATE = 5;
51 private boolean disposed = false;
53 private SessionImplSocket session;
56 * True if enough data has been written to overflow the normal buffer which
57 * will result in data being written to disk.
59 private boolean overflow = false;
61 private byte[] readBytes = new byte[BUFFER_SIZE];
62 // By default the write buffer will be the exact same as the read buffer
63 // since initially were not forced to write changes to the disk.
64 private byte[] writeBytes = readBytes;
66 private int readBufferIndex = 0;
67 private int writeBufferIndex = 0;
70 * <code>true</code> if something has been written since last call to
71 * {@link #resetRead()}. By default it is <code>true</code> since
72 * {@link #resetRead()} must on its first invocation write the {@link #TERM}
73 * constant into the stream.
75 private boolean wroteSinceLastRead = true;
77 private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
78 private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
80 private File tempFile;
81 private RandomAccessFile file;
82 private FileChannel channel;
83 private long currentPosition = 0L;
84 private long readPosition = 0L;
85 private long writePosition = 0L;
87 static abstract class NoRemoveIterator<T> implements Iterator<T> {
89 public void remove() {
90 throw new UnsupportedOperationException();
95 * Base class for all immutable collections returned by
96 * {@link ClientChangesImpl}.
100 static class CollectionBase<T> implements Collection<T> {
104 throw new UnsupportedOperationException();
108 public boolean isEmpty() {
109 throw new UnsupportedOperationException();
113 public boolean contains(Object o) {
114 throw new UnsupportedOperationException();
118 public Iterator<T> iterator() {
119 throw new UnsupportedOperationException();
123 public Object[] toArray() {
124 throw new UnsupportedOperationException();
128 public <TT> TT[] toArray(TT[] a) {
129 throw new UnsupportedOperationException();
133 public boolean add(T e) {
134 throw new UnsupportedOperationException();
138 public boolean remove(Object o) {
139 throw new UnsupportedOperationException();
143 public boolean containsAll(Collection<?> c) {
144 throw new UnsupportedOperationException();
148 public boolean addAll(Collection<? extends T> c) {
149 throw new UnsupportedOperationException();
153 public boolean removeAll(Collection<?> c) {
154 throw new UnsupportedOperationException();
158 public boolean retainAll(Collection<?> c) {
159 throw new UnsupportedOperationException();
163 public void clear() {
164 throw new UnsupportedOperationException();
169 ClientChangesImpl(SessionImplSocket session) {
170 this.session = session;
171 //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
175 public String toString() {
176 final int LIMIT = 10000;
178 for (StatementChange s : changedStatements()) {
179 String op = s.isClaim() ? "add" : "rem";
180 to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
181 if (to.length() > LIMIT) {
186 for (Resource r : changedValues()) {
188 if (to.length() > LIMIT) {
196 private Resource getResource(int id) {
197 return session.getResource(id);
201 public synchronized Collection<StatementChange> changedStatements() {
206 return Collections.emptyList();
208 return new CollectionBase<StatementChange>() {
210 StatementChange seek() {
213 byte method = readByte();
219 Resource s = getResource(readInt());
220 Resource p = getResource(readInt());
221 Resource o = getResource(readInt());
222 return new StatementChangeImpl(s, p, o, true);
225 Resource s = getResource(readInt());
226 Resource p = getResource(readInt());
227 Resource o = getResource(readInt());
228 return new StatementChangeImpl(s, p, o, false);
232 // Skipping value opererations.
233 getResource(readInt());
238 } catch(Exception e) {
239 throw new RuntimeDatabaseException(e);
243 private StatementChange next = seek();
246 public Iterator<StatementChange> iterator() {
247 return new NoRemoveIterator<StatementChange>() {
249 public boolean hasNext() {
253 public StatementChange next() {
254 StatementChange result = next;
266 public Collection<Resource> changedValues() {
271 return Collections.emptyList();
273 return new CollectionBase<Resource>() {
278 byte method = readByte();
284 getResource(readInt());
285 getResource(readInt());
286 getResource(readInt());
290 getResource(readInt());
291 getResource(readInt());
292 getResource(readInt());
297 return getResource(readInt());
301 } catch(Exception e) {
306 private Resource next = seek();
309 public Iterator<Resource> iterator() {
310 return new NoRemoveIterator<Resource>() {
312 public boolean hasNext() {
316 public Resource next() {
317 Resource result = next;
329 public Collection<Resource> changedResources() {
334 return Collections.emptyList();
336 return new CollectionBase<Resource>() {
342 byte method = readByte();
348 getResource(readInt());
349 getResource(readInt());
350 getResource(readInt());
354 getResource(readInt());
355 getResource(readInt());
356 getResource(readInt());
360 getResource(readInt());
364 return getResource(readInt());
368 } catch(Exception e) {
374 private Resource next = seek();
377 public Iterator<Resource> iterator() {
378 return new NoRemoveIterator<Resource>() {
380 public boolean hasNext() {
384 public Resource next() {
385 Resource result = next;
397 public boolean isEmpty() {
400 return writeBufferIndex == 0 && !overflow;
403 private boolean notEmpty = false;
404 void setNotEmpty(boolean notEmpty) {
405 this.notEmpty = notEmpty;
408 void claim(Resource s, Resource p, Resource o) {
412 writeInt(session.getId((ResourceImpl) s));
413 writeInt(session.getId((ResourceImpl) p));
414 writeInt(session.getId((ResourceImpl) o));
415 if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
418 private int sCache1 = -1;
419 private int sCache2 = -1;
420 private int sCache3 = -1;
422 final void invalidate(int s) {
428 } else if (s == sCache2) {
432 } else if (s == sCache3) {
443 writeByte(INVALIDATE);
446 if(DEBUG) System.err.println("invalidate2 " + s);
450 void claim(int s, int p, int o) {
457 if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
460 void deny(Resource s, Resource p, Resource o) {
464 writeInt(session.getId((ResourceImpl)s));
465 writeInt(session.getId((ResourceImpl)p));
466 writeInt(session.getId((ResourceImpl)o));
467 if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
470 void deny(int s, int p, int o) {
477 if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
480 void claimValue(Resource r) {
484 writeInt(session.getId((ResourceImpl)r));
485 if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
488 void claimValue(int r) {
493 if(DEBUG) System.err.println("deny2 " + r);
498 private byte readByte() {
499 byte result = readBytes[readBufferIndex++];
500 if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
504 private int readInt() {
505 int ri = readBufferIndex;
506 int remaining = BUFFER_SIZE - ri;
511 // Intentional fall-through
514 (readBytes[ri] & 0xff) |
515 ((readBytes[ri+1] & 0xff) << 8) |
516 ((readBytes[ri+2] & 0xff) << 16) |
517 ((readBytes[ri+3] & 0xff) << 24));
518 readBufferIndex += 4;
523 (readBytes[ri] & 0xff) |
524 ((readBytes[ri+1] & 0xff) << 8) |
525 ((readBytes[ri+2] & 0xff) << 16) |
526 ((readBytes[ri+3] & 0xff) << 24));
532 (readBytes[ri] & 0xff) |
533 ((readBytes[ri+1] & 0xff) << 8) |
534 ((readBytes[ri+2] & 0xff) << 16));
536 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
541 (readBytes[ri] & 0xff) |
542 ((readBytes[ri+1] & 0xff) << 8));
544 ri = readBufferIndex;
546 ((readBytes[ri] & 0xff) << 16) |
547 ((readBytes[ri+1] & 0xff) << 24));
548 readBufferIndex += 2;
552 result = (int)(readBytes[ri] & 0xff);
554 ri = readBufferIndex;
556 ((readBytes[ri] & 0xff) << 8) |
557 ((readBytes[ri+1] & 0xff) << 16) |
558 ((readBytes[ri+2] & 0xff) << 24));
559 readBufferIndex += 3;
564 private void writeByte(int b) {
565 writeBytes[writeBufferIndex++] = (byte)b;
566 if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
567 wroteSinceLastRead = true;
570 private void writeInt(int i) {
571 int wi = writeBufferIndex;
572 int room = BUFFER_SIZE - wi;
575 flushWriteBuffer(BUFFER_SIZE);
576 // Intentional fall-through
578 writeBytes[wi] = (byte)(i&0xff);
579 writeBytes[wi+1] = (byte)((i>>>8)&0xff);
580 writeBytes[wi+2] = (byte)((i>>>16)&0xff);
581 writeBytes[wi+3] = (byte)((i>>>24)&0xff);
582 writeBufferIndex += 4;
586 writeBytes[wi] = (byte)(i&0xff);
587 writeBytes[wi+1] = (byte)((i>>>8)&0xff);
588 writeBytes[wi+2] = (byte)((i>>>16)&0xff);
589 writeBytes[wi+3] = (byte)((i>>>24)&0xff);
590 flushWriteBuffer(BUFFER_SIZE);
594 writeBytes[wi] = (byte)(i&0xff);
595 writeBytes[wi+1] = (byte)((i>>>8)&0xff);
596 writeBytes[wi+2] = (byte)((i>>>16)&0xff);
597 flushWriteBuffer(BUFFER_SIZE);
598 writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
602 writeBytes[wi] = (byte)(i&0xff);
603 writeBytes[wi+1] = (byte)((i>>>8)&0xff);
604 flushWriteBuffer(BUFFER_SIZE);
605 wi = writeBufferIndex;
606 writeBytes[wi] = (byte)((i>>>16)&0xff);
607 writeBytes[wi+1] = (byte)((i>>>24)&0xff);
608 writeBufferIndex += 2;
612 writeBytes[wi] = (byte)(i&0xff);
613 flushWriteBuffer(BUFFER_SIZE);
614 wi = writeBufferIndex;
615 writeBytes[wi] = (byte)((i>>>8)&0xff);
616 writeBytes[wi+1] = (byte)((i>>>16)&0xff);
617 writeBytes[wi+2] = (byte)((i>>>24)&0xff);
618 writeBufferIndex += 3;
621 wroteSinceLastRead = true;
624 private int tryFlushBuffer(int size) throws IOException {
626 System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
628 if (tempFile == null) {
629 File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
631 tempFile = File.createTempFile("cset-", ".cs", base);
632 file = new RandomAccessFile(tempFile, "rwd");
633 channel = file.getChannel();
635 System.err.println("Allocated temporary file for changeset: " + tempFile);
639 writeBuffer.position(0);
640 writeBuffer.limit(size);
641 for (; wrote < size;) {
642 wrote += channel.write(writeBuffer);
647 private void flushWriteBuffer(int size) {
649 System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
654 // Change set is about to become file-backed.
655 // Initialize separate buffer for writing.
656 writeBytes = Arrays.copyOf(readBytes, readBytes.length);
657 writeBuffer = ByteBuffer.wrap(writeBytes);
661 writeBufferIndex = 0;
664 setFilePosition(writePosition);
665 wrote = tryFlushBuffer(size);
666 } catch (IOException e) {
668 throw new RuntimeDatabaseException(e);
671 currentPosition = channel.position();
672 writePosition = currentPosition;
673 } catch (IOException e) {
677 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
681 private boolean resetRead() {
683 System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
685 // Flush temporary file first if necessary.
686 if (wroteSinceLastRead) {
688 System.err.println("\tTerminating write of changeset " + tempFile);
690 wroteSinceLastRead = false;
694 System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
695 flushWriteBuffer(writeBufferIndex);
697 setFilePosition(readPosition);
703 } catch (IOException e) {
709 private void fillReadBuffer() {
713 setFilePosition(readPosition);
714 readBuffer.position(0);
715 readBuffer.limit(BUFFER_SIZE);
716 for (;read < BUFFER_SIZE;) {
717 int got = channel.read(readBuffer);
722 } catch (IOException e) {
726 currentPosition = channel.position();
727 readPosition = currentPosition;
728 } catch (IOException e) {
732 System.err.println("fillReadBuffer read " + read + " bytes");
738 * @return <code>true</code> if file position was modified
739 * @throws IOException
741 private boolean setFilePosition(long newPosition) throws IOException {
742 if (channel == null) {
745 if (newPosition != currentPosition) {
747 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
748 channel.position(newPosition);
749 currentPosition = newPosition;
756 protected void finalize() throws Throwable {
761 public void close() {
770 System.err.println("Closing temporary changeset input stream " + file);
773 } catch (IOException e) {
778 if (tempFile != null) {
780 System.err.println("Deleting temporary changeset file " + tempFile);
785 // Release large buffers to minimize memory consumption
787 readBuffer = ByteBuffer.wrap(EMPTY);
789 writeBuffer = ByteBuffer.wrap(EMPTY);
792 private void assertAlive() {
794 throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
798 public void dispose() {
799 //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
803 public static void main(String[] args) {
804 ClientChangesImpl ccs = new ClientChangesImpl(null);
805 System.out.println("isEmpty=" + ccs.isEmpty());
806 System.out.println("clientChanges=" + ccs.toString());
807 System.out.println("isEmpty=" + ccs.isEmpty());