X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=inline;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FClientChangesImpl.java;h=000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0;hb=e19c37f84fd1ce2d946578f7c05f3e45444ba67a;hp=9c51e267d3af422fe0e8876fb3274c47221fa29e;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git
diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
index 9c51e267d..000880c3d 100644
--- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
+++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
@@ -1,692 +1,692 @@
-/*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
- * in Industry THTH ry.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * VTT Technical Research Centre of Finland - initial API and implementation
- *******************************************************************************/
-package fi.vtt.simantics.procore.internal;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.simantics.db.ChangeSet;
-import org.simantics.db.Resource;
-import org.simantics.db.common.internal.config.InternalClientConfig;
-import org.simantics.db.exception.RuntimeDatabaseException;
-import org.simantics.db.impl.ResourceImpl;
-
-/**
- * @author Antti Villberg
- * @author Tuukka Lehtonen
- */
-public final class ClientChangesImpl implements ChangeSet {
-
- private static final boolean DEBUG = false;
-
- private static final int BUFFER_SIZE = 1 << 17;
-
- private static final byte CLAIM = 1;
- private static final byte DENY = 2;
- private static final byte VALUE = 3;
- private static final byte TERM = 4;
- private static final byte INVALIDATE = 5;
-
- private SessionImplSocket session;
-
- /**
- * True if enough data has been written to overflow the normal buffer which
- * will result in data being written to disk.
- */
- private boolean overflow = false;
-
- private final byte[] readBytes = new byte[BUFFER_SIZE];
- // By default the write buffer will be the exact same as the read buffer
- // since initially were not forced to write changes to the disk.
- private byte[] writeBytes = readBytes;
-
- private int readBufferIndex = 0;
- private int writeBufferIndex = 0;
-
- /**
- * true
if something has been written since last call to
- * {@link #resetRead()}. By default it is true
since
- * {@link #resetRead()} must on its first invocation write the {@link #TERM}
- * constant into the stream.
- */
- private boolean wroteSinceLastRead = true;
-
- private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
- private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
-
- private File tempFile;
- private RandomAccessFile file;
- private FileChannel channel;
- private long currentPosition = 0L;
- private long readPosition = 0L;
- private long writePosition = 0L;
-
- static abstract class NoRemoveIterator implements Iterator {
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Base class for all immutable collections returned by
- * {@link ClientChangesImpl}.
- *
- * @param
- */
- static class CollectionBase implements Collection {
-
- @Override
- public int size() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isEmpty() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean contains(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator iterator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object[] toArray() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TT[] toArray(TT[] a) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean add(T e) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean remove(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean addAll(Collection extends T> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- throw new UnsupportedOperationException();
- }
-
- };
-
- ClientChangesImpl(SessionImplSocket session) {
- this.session = session;
- }
-
- @Override
- public String toString() {
- final int LIMIT = 10000;
- String to = "";
- for (StatementChange s : changedStatements()) {
- String op = s.isClaim() ? "add" : "rem";
- to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
- if (to.length() > LIMIT) {
- to += "... more.\n";
- return to;
- }
- }
- for (Resource r : changedValues()) {
- to += "\nmod " + r;
- if (to.length() > LIMIT) {
- to += "...etc.\n";
- return to;
- }
- }
- return to;
- }
-
- private Resource getResource(int id) {
- return session.getResource(id);
- }
-
- @Override
- public synchronized Collection changedStatements() {
-
- if (!resetRead())
- return Collections.emptyList();
-
- return new CollectionBase() {
-
- StatementChange seek() {
- try {
- while(true) {
- byte method = readByte();
- switch(method) {
- case TERM: {
- return null;
- }
- case CLAIM: {
- Resource s = getResource(readInt());
- Resource p = getResource(readInt());
- Resource o = getResource(readInt());
- return new StatementChangeImpl(s, p, o, true);
- }
- case DENY: {
- Resource s = getResource(readInt());
- Resource p = getResource(readInt());
- Resource o = getResource(readInt());
- return new StatementChangeImpl(s, p, o, false);
- }
- case VALUE:
- case INVALIDATE: {
- // Skipping value opererations.
- getResource(readInt());
- break;
- }
- }
- }
- } catch(Exception e) {
- throw new RuntimeDatabaseException(e);
- }
- }
-
- private StatementChange next = seek();
-
- @Override
- public Iterator iterator() {
- return new NoRemoveIterator() {
- @Override
- public boolean hasNext() {
- return next != null;
- }
- @Override
- public StatementChange next() {
- StatementChange result = next;
- next = seek();
- return result;
- }
- };
- }
-
- };
-
- }
-
- @Override
- public Collection changedValues() {
-
- if (!resetRead())
- return Collections.emptyList();
-
- return new CollectionBase() {
-
- Resource seek() {
- try {
- while(true) {
- byte method = readByte();
- switch(method) {
- case TERM: {
- return null;
- }
- case CLAIM: {
- getResource(readInt());
- getResource(readInt());
- getResource(readInt());
- break;
- }
- case DENY: {
- getResource(readInt());
- getResource(readInt());
- getResource(readInt());
- break;
- }
- case VALUE:
- case INVALIDATE: {
- return getResource(readInt());
- }
- }
- }
- } catch(Exception e) {
- return null;
- }
- }
-
- private Resource next = seek();
-
- @Override
- public Iterator iterator() {
- return new NoRemoveIterator() {
- @Override
- public boolean hasNext() {
- return next != null;
- }
- @Override
- public Resource next() {
- Resource result = next;
- next = seek();
- return result;
- }
- };
- }
-
- };
-
- }
-
- @Override
- public Collection changedResources() {
-
- if (!resetRead())
- return Collections.emptyList();
-
- return new CollectionBase() {
-
- Resource seek() {
-
- try {
- while(true) {
- byte method = readByte();
- switch(method) {
- case TERM: {
- return null;
- }
- case CLAIM: {
- getResource(readInt());
- getResource(readInt());
- getResource(readInt());
- break;
- }
- case DENY: {
- getResource(readInt());
- getResource(readInt());
- getResource(readInt());
- break;
- }
- case VALUE: {
- getResource(readInt());
- break;
- }
- case INVALIDATE: {
- return getResource(readInt());
- }
- }
- }
- } catch(Exception e) {
- return null;
- }
-
- }
-
- private Resource next = seek();
-
- @Override
- public Iterator iterator() {
- return new NoRemoveIterator() {
- @Override
- public boolean hasNext() {
- return next != null;
- }
- @Override
- public Resource next() {
- Resource result = next;
- next = seek();
- return result;
- }
- };
- }
-
- };
-
- }
-
- @Override
- public boolean isEmpty() {
- if (notEmpty)
- return false;
- return writeBufferIndex == 0 && !overflow;
- }
-
- private boolean notEmpty = false;
- void setNotEmpty(boolean notEmpty) {
- this.notEmpty = notEmpty;
- }
-
- void claim(Resource s, Resource p, Resource o) {
- writeByte(CLAIM);
- writeInt(session.getId((ResourceImpl) s));
- writeInt(session.getId((ResourceImpl) p));
- writeInt(session.getId((ResourceImpl) o));
- if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
- }
-
- private int sCache1 = -1;
- private int sCache2 = -1;
- private int sCache3 = -1;
-
- final void invalidate(int s) {
-
- if(s == sCache1) {
- return;
- } else if (s == sCache2) {
- sCache2 = sCache1;
- sCache1 = s;
- return;
- } else if (s == sCache3) {
- sCache3 = sCache2;
- sCache2 = sCache1;
- sCache1 = s;
- return;
- }
-
- sCache3 = sCache2;
- sCache2 = sCache1;
- sCache1 = s;
-
- writeByte(INVALIDATE);
- writeInt(s);
-
- if(DEBUG) System.err.println("invalidate2 " + s);
-
- }
-
- void claim(int s, int p, int o) {
- writeByte(CLAIM);
- writeInt(s);
- writeInt(p);
- writeInt(o);
- if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
- }
-
- void deny(Resource s, Resource p, Resource o) {
- writeByte(DENY);
- writeInt(session.getId((ResourceImpl)s));
- writeInt(session.getId((ResourceImpl)p));
- writeInt(session.getId((ResourceImpl)o));
- if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
- }
-
- void deny(int s, int p, int o) {
- writeByte(DENY);
- writeInt(s);
- writeInt(p);
- writeInt(o);
- if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
- }
-
- void claimValue(Resource r) {
- writeByte(VALUE);
- writeInt(session.getId((ResourceImpl)r));
- if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
- }
-
- void claimValue(int r) {
- writeByte(VALUE);
- writeInt(r);
- if(DEBUG) System.err.println("deny2 " + r);
- }
-
- // Streaming
-
- private byte readByte() {
- byte result = readBytes[readBufferIndex++];
- if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
- return result;
- }
-
- private int readInt() {
- if(readBufferIndex < (BUFFER_SIZE-4)) {
- int result = (int)
- ((readBytes[readBufferIndex++] & 0xff) |
- ((readBytes[readBufferIndex++] & 0xff) << 8) |
- ((readBytes[readBufferIndex++] & 0xff) << 16) |
- ((readBytes[readBufferIndex++] & 0xff) << 24));
- return result;
- } else {
- int has = BUFFER_SIZE-readBufferIndex;
- int result = 0;
- if(has == 0) fillReadBuffer();
- result = (int)(readBytes[readBufferIndex++] & 0xff);
- if(has == 1) fillReadBuffer();
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
- if(has == 2) fillReadBuffer();
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
- if(has == 3) fillReadBuffer();
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
- if(has == 4) fillReadBuffer();
- return result;
- }
- }
-
- private void writeByte(int b) {
- writeBytes[writeBufferIndex++] = (byte)b;
- if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
- wroteSinceLastRead = true;
- }
-
- private void writeInt(int i) {
- if(writeBufferIndex < (BUFFER_SIZE-4)) {
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
- writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
- } else {
- int has = BUFFER_SIZE-writeBufferIndex;
- if(has == 0) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);
- if(has == 1) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
- if(has == 2) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
- if(has == 3) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
- if(has == 4) flushWriteBuffer(BUFFER_SIZE);
- }
- wroteSinceLastRead = true;
- }
-
- private int tryFlushBuffer(int size) throws IOException {
- if (DEBUG)
- System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
-
- if (tempFile == null) {
- File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
- base.mkdirs();
- tempFile = File.createTempFile("cset-", ".cs", base);
- file = new RandomAccessFile(tempFile, "rwd");
- channel = file.getChannel();
- if (DEBUG)
- System.err.println("Allocated temporary file for changeset: " + tempFile);
- }
-
- int wrote = 0;
- writeBuffer.position(0);
- writeBuffer.limit(size);
- for (; wrote < size;) {
- wrote += channel.write(writeBuffer);
- }
- return wrote;
- }
-
- private void flushWriteBuffer(int size) {
- if (DEBUG)
- System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
- if (size == 0)
- return;
-
- if (!overflow) {
- // Change set is about to become file-backed.
- // Initialize separate buffer for writing.
- writeBytes = Arrays.copyOf(readBytes, readBytes.length);
- writeBuffer = ByteBuffer.wrap(writeBytes);
- overflow = true;
- }
-
- writeBufferIndex = 0;
- int wrote = 0;
- try {
- setFilePosition(writePosition);
- wrote = tryFlushBuffer(size);
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeDatabaseException(e);
- } finally {
- try {
- currentPosition = channel.position();
- writePosition = currentPosition;
- } catch (IOException e) {
- e.printStackTrace();
- }
- if (DEBUG)
- System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
- }
- }
-
- private boolean resetRead() {
- if (DEBUG)
- System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
- try {
- // Flush temporary file first if necessary.
- if (wroteSinceLastRead) {
- if (DEBUG)
- System.err.println("\tTerminating write of changeset " + tempFile);
- writeByte(TERM);
- wroteSinceLastRead = false;
- }
- if (overflow) {
- if (DEBUG)
- System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
- flushWriteBuffer(writeBufferIndex);
- readPosition = 0L;
- setFilePosition(readPosition);
- fillReadBuffer();
- } else {
- readBufferIndex = 0;
- }
- return true;
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
- private void fillReadBuffer() {
- readBufferIndex = 0;
- int read = 0;
- try {
- setFilePosition(readPosition);
- readBuffer.position(0);
- readBuffer.limit(BUFFER_SIZE);
- for (;read < BUFFER_SIZE;) {
- int got = channel.read(readBuffer);
- if (got == -1)
- return;
- read += got;
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- currentPosition = channel.position();
- readPosition = currentPosition;
- } catch (IOException e) {
- e.printStackTrace();
- }
- if (DEBUG)
- System.err.println("fillReadBuffer read " + read + " bytes");
- }
- }
-
- /**
- * @param newPosition
- * @return true
if file position was modified
- * @throws IOException
- */
- private boolean setFilePosition(long newPosition) throws IOException {
- if (channel == null) {
- return false;
- }
- if (newPosition != currentPosition) {
- if (DEBUG)
- System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
- channel.position(newPosition);
- currentPosition = newPosition;
- return true;
- }
- return false;
- }
-
- @Override
- protected void finalize() throws Throwable {
- close();
- super.finalize();
- }
-
- public void close() {
- session = null;
- channel = null;
- if (file != null) {
- if (DEBUG)
- System.err.println("Closing temporary changeset input stream " + file);
- try {
- file.close();
- } catch (IOException e) {
- } finally {
- file = null;
- }
- }
- if (tempFile != null) {
- if (DEBUG)
- System.err.println("Deleting temporary changeset file " + tempFile);
- tempFile.delete();
- tempFile = null;
- }
- }
-
- public static void main(String[] args) {
- ClientChangesImpl ccs = new ClientChangesImpl(null);
- System.out.println("isEmpty=" + ccs.isEmpty());
- System.out.println("clientChanges=" + ccs.toString());
- System.out.println("isEmpty=" + ccs.isEmpty());
- }
-
-}
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.simantics.db.ChangeSet;
+import org.simantics.db.Resource;
+import org.simantics.db.common.internal.config.InternalClientConfig;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ResourceImpl;
+
+/**
+ * @author Antti Villberg
+ * @author Tuukka Lehtonen
+ */
+public final class ClientChangesImpl implements ChangeSet {
+
+ private static final boolean DEBUG = false;
+
+ private static final int BUFFER_SIZE = 1 << 17;
+
+ private static final byte CLAIM = 1;
+ private static final byte DENY = 2;
+ private static final byte VALUE = 3;
+ private static final byte TERM = 4;
+ private static final byte INVALIDATE = 5;
+
+ private SessionImplSocket session;
+
+ /**
+ * True if enough data has been written to overflow the normal buffer which
+ * will result in data being written to disk.
+ */
+ private boolean overflow = false;
+
+ private final byte[] readBytes = new byte[BUFFER_SIZE];
+ // By default the write buffer will be the exact same as the read buffer
+ // since initially were not forced to write changes to the disk.
+ private byte[] writeBytes = readBytes;
+
+ private int readBufferIndex = 0;
+ private int writeBufferIndex = 0;
+
+ /**
+ * true
if something has been written since last call to
+ * {@link #resetRead()}. By default it is true
since
+ * {@link #resetRead()} must on its first invocation write the {@link #TERM}
+ * constant into the stream.
+ */
+ private boolean wroteSinceLastRead = true;
+
+ private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
+ private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
+
+ private File tempFile;
+ private RandomAccessFile file;
+ private FileChannel channel;
+ private long currentPosition = 0L;
+ private long readPosition = 0L;
+ private long writePosition = 0L;
+
+ static abstract class NoRemoveIterator implements Iterator {
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Base class for all immutable collections returned by
+ * {@link ClientChangesImpl}.
+ *
+ * @param
+ */
+ static class CollectionBase implements Collection {
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TT[] toArray(TT[] a) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean add(T e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean addAll(Collection extends T> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+
+ ClientChangesImpl(SessionImplSocket session) {
+ this.session = session;
+ }
+
+ @Override
+ public String toString() {
+ final int LIMIT = 10000;
+ String to = "";
+ for (StatementChange s : changedStatements()) {
+ String op = s.isClaim() ? "add" : "rem";
+ to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
+ if (to.length() > LIMIT) {
+ to += "... more.\n";
+ return to;
+ }
+ }
+ for (Resource r : changedValues()) {
+ to += "\nmod " + r;
+ if (to.length() > LIMIT) {
+ to += "...etc.\n";
+ return to;
+ }
+ }
+ return to;
+ }
+
+ private Resource getResource(int id) {
+ return session.getResource(id);
+ }
+
+ @Override
+ public synchronized Collection changedStatements() {
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase() {
+
+ StatementChange seek() {
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ Resource s = getResource(readInt());
+ Resource p = getResource(readInt());
+ Resource o = getResource(readInt());
+ return new StatementChangeImpl(s, p, o, true);
+ }
+ case DENY: {
+ Resource s = getResource(readInt());
+ Resource p = getResource(readInt());
+ Resource o = getResource(readInt());
+ return new StatementChangeImpl(s, p, o, false);
+ }
+ case VALUE:
+ case INVALIDATE: {
+ // Skipping value opererations.
+ getResource(readInt());
+ break;
+ }
+ }
+ }
+ } catch(Exception e) {
+ throw new RuntimeDatabaseException(e);
+ }
+ }
+
+ private StatementChange next = seek();
+
+ @Override
+ public Iterator iterator() {
+ return new NoRemoveIterator() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public StatementChange next() {
+ StatementChange result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public Collection changedValues() {
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase() {
+
+ Resource seek() {
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case DENY: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case VALUE:
+ case INVALIDATE: {
+ return getResource(readInt());
+ }
+ }
+ }
+ } catch(Exception e) {
+ return null;
+ }
+ }
+
+ private Resource next = seek();
+
+ @Override
+ public Iterator iterator() {
+ return new NoRemoveIterator() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public Resource next() {
+ Resource result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public Collection changedResources() {
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase() {
+
+ Resource seek() {
+
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case DENY: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case VALUE: {
+ getResource(readInt());
+ break;
+ }
+ case INVALIDATE: {
+ return getResource(readInt());
+ }
+ }
+ }
+ } catch(Exception e) {
+ return null;
+ }
+
+ }
+
+ private Resource next = seek();
+
+ @Override
+ public Iterator iterator() {
+ return new NoRemoveIterator() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public Resource next() {
+ Resource result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public boolean isEmpty() {
+ if (notEmpty)
+ return false;
+ return writeBufferIndex == 0 && !overflow;
+ }
+
+ private boolean notEmpty = false;
+ void setNotEmpty(boolean notEmpty) {
+ this.notEmpty = notEmpty;
+ }
+
+ void claim(Resource s, Resource p, Resource o) {
+ writeByte(CLAIM);
+ writeInt(session.getId((ResourceImpl) s));
+ writeInt(session.getId((ResourceImpl) p));
+ writeInt(session.getId((ResourceImpl) o));
+ if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
+ }
+
+ private int sCache1 = -1;
+ private int sCache2 = -1;
+ private int sCache3 = -1;
+
+ final void invalidate(int s) {
+
+ if(s == sCache1) {
+ return;
+ } else if (s == sCache2) {
+ sCache2 = sCache1;
+ sCache1 = s;
+ return;
+ } else if (s == sCache3) {
+ sCache3 = sCache2;
+ sCache2 = sCache1;
+ sCache1 = s;
+ return;
+ }
+
+ sCache3 = sCache2;
+ sCache2 = sCache1;
+ sCache1 = s;
+
+ writeByte(INVALIDATE);
+ writeInt(s);
+
+ if(DEBUG) System.err.println("invalidate2 " + s);
+
+ }
+
+ void claim(int s, int p, int o) {
+ writeByte(CLAIM);
+ writeInt(s);
+ writeInt(p);
+ writeInt(o);
+ if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
+ }
+
+ void deny(Resource s, Resource p, Resource o) {
+ writeByte(DENY);
+ writeInt(session.getId((ResourceImpl)s));
+ writeInt(session.getId((ResourceImpl)p));
+ writeInt(session.getId((ResourceImpl)o));
+ if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
+ }
+
+ void deny(int s, int p, int o) {
+ writeByte(DENY);
+ writeInt(s);
+ writeInt(p);
+ writeInt(o);
+ if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
+ }
+
+ void claimValue(Resource r) {
+ writeByte(VALUE);
+ writeInt(session.getId((ResourceImpl)r));
+ if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
+ }
+
+ void claimValue(int r) {
+ writeByte(VALUE);
+ writeInt(r);
+ if(DEBUG) System.err.println("deny2 " + r);
+ }
+
+ // Streaming
+
+ private byte readByte() {
+ byte result = readBytes[readBufferIndex++];
+ if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
+ return result;
+ }
+
+ private int readInt() {
+ if(readBufferIndex < (BUFFER_SIZE-4)) {
+ int result = (int)
+ ((readBytes[readBufferIndex++] & 0xff) |
+ ((readBytes[readBufferIndex++] & 0xff) << 8) |
+ ((readBytes[readBufferIndex++] & 0xff) << 16) |
+ ((readBytes[readBufferIndex++] & 0xff) << 24));
+ return result;
+ } else {
+ int has = BUFFER_SIZE-readBufferIndex;
+ int result = 0;
+ if(has == 0) fillReadBuffer();
+ result = (int)(readBytes[readBufferIndex++] & 0xff);
+ if(has == 1) fillReadBuffer();
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
+ if(has == 2) fillReadBuffer();
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
+ if(has == 3) fillReadBuffer();
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
+ if(has == 4) fillReadBuffer();
+ return result;
+ }
+ }
+
+ private void writeByte(int b) {
+ writeBytes[writeBufferIndex++] = (byte)b;
+ if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
+ wroteSinceLastRead = true;
+ }
+
+ private void writeInt(int i) {
+ if(writeBufferIndex < (BUFFER_SIZE-4)) {
+ writeBytes[writeBufferIndex++] = (byte)(i&0xff);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
+ } else {
+ int has = BUFFER_SIZE-writeBufferIndex;
+ if(has == 0) flushWriteBuffer(BUFFER_SIZE);
+ writeBytes[writeBufferIndex++] = (byte)(i&0xff);
+ if(has == 1) flushWriteBuffer(BUFFER_SIZE);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
+ if(has == 2) flushWriteBuffer(BUFFER_SIZE);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+ if(has == 3) flushWriteBuffer(BUFFER_SIZE);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
+ if(has == 4) flushWriteBuffer(BUFFER_SIZE);
+ }
+ wroteSinceLastRead = true;
+ }
+
+ private int tryFlushBuffer(int size) throws IOException {
+ if (DEBUG)
+ System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
+
+ if (tempFile == null) {
+ File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
+ base.mkdirs();
+ tempFile = File.createTempFile("cset-", ".cs", base);
+ file = new RandomAccessFile(tempFile, "rwd");
+ channel = file.getChannel();
+ if (DEBUG)
+ System.err.println("Allocated temporary file for changeset: " + tempFile);
+ }
+
+ int wrote = 0;
+ writeBuffer.position(0);
+ writeBuffer.limit(size);
+ for (; wrote < size;) {
+ wrote += channel.write(writeBuffer);
+ }
+ return wrote;
+ }
+
+ private void flushWriteBuffer(int size) {
+ if (DEBUG)
+ System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
+ if (size == 0)
+ return;
+
+ if (!overflow) {
+ // Change set is about to become file-backed.
+ // Initialize separate buffer for writing.
+ writeBytes = Arrays.copyOf(readBytes, readBytes.length);
+ writeBuffer = ByteBuffer.wrap(writeBytes);
+ overflow = true;
+ }
+
+ writeBufferIndex = 0;
+ int wrote = 0;
+ try {
+ setFilePosition(writePosition);
+ wrote = tryFlushBuffer(size);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeDatabaseException(e);
+ } finally {
+ try {
+ currentPosition = channel.position();
+ writePosition = currentPosition;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (DEBUG)
+ System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
+ }
+ }
+
+ private boolean resetRead() {
+ if (DEBUG)
+ System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
+ try {
+ // Flush temporary file first if necessary.
+ if (wroteSinceLastRead) {
+ if (DEBUG)
+ System.err.println("\tTerminating write of changeset " + tempFile);
+ writeByte(TERM);
+ wroteSinceLastRead = false;
+ }
+ if (overflow) {
+ if (DEBUG)
+ System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
+ flushWriteBuffer(writeBufferIndex);
+ readPosition = 0L;
+ setFilePosition(readPosition);
+ fillReadBuffer();
+ } else {
+ readBufferIndex = 0;
+ }
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ private void fillReadBuffer() {
+ readBufferIndex = 0;
+ int read = 0;
+ try {
+ setFilePosition(readPosition);
+ readBuffer.position(0);
+ readBuffer.limit(BUFFER_SIZE);
+ for (;read < BUFFER_SIZE;) {
+ int got = channel.read(readBuffer);
+ if (got == -1)
+ return;
+ read += got;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ currentPosition = channel.position();
+ readPosition = currentPosition;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (DEBUG)
+ System.err.println("fillReadBuffer read " + read + " bytes");
+ }
+ }
+
+ /**
+ * @param newPosition
+ * @return true
if file position was modified
+ * @throws IOException
+ */
+ private boolean setFilePosition(long newPosition) throws IOException {
+ if (channel == null) {
+ return false;
+ }
+ if (newPosition != currentPosition) {
+ if (DEBUG)
+ System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
+ channel.position(newPosition);
+ currentPosition = newPosition;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+
+ public void close() {
+ session = null;
+ channel = null;
+ if (file != null) {
+ if (DEBUG)
+ System.err.println("Closing temporary changeset input stream " + file);
+ try {
+ file.close();
+ } catch (IOException e) {
+ } finally {
+ file = null;
+ }
+ }
+ if (tempFile != null) {
+ if (DEBUG)
+ System.err.println("Deleting temporary changeset file " + tempFile);
+ tempFile.delete();
+ tempFile = null;
+ }
+ }
+
+ public static void main(String[] args) {
+ ClientChangesImpl ccs = new ClientChangesImpl(null);
+ System.out.println("isEmpty=" + ccs.isEmpty());
+ System.out.println("clientChanges=" + ccs.toString());
+ System.out.println("isEmpty=" + ccs.isEmpty());
+ }
+
+}