-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.io.RandomAccessFile;\r
-import java.nio.ByteBuffer;\r
-import java.nio.channels.FileChannel;\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.Iterator;\r
-\r
-import org.simantics.db.ChangeSet;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.common.internal.config.InternalClientConfig;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.ResourceImpl;\r
-\r
-/**\r
- * @author Antti Villberg\r
- * @author Tuukka Lehtonen\r
- */\r
-public final class ClientChangesImpl implements ChangeSet {\r
-\r
- private static final boolean DEBUG = false;\r
-\r
- private static final int BUFFER_SIZE = 1 << 17;\r
-\r
- private static final byte CLAIM = 1;\r
- private static final byte DENY = 2;\r
- private static final byte VALUE = 3;\r
- private static final byte TERM = 4;\r
- private static final byte INVALIDATE = 5;\r
-\r
- private SessionImplSocket session;\r
-\r
- /**\r
- * True if enough data has been written to overflow the normal buffer which\r
- * will result in data being written to disk.\r
- */\r
- private boolean overflow = false;\r
-\r
- private final byte[] readBytes = new byte[BUFFER_SIZE];\r
- // By default the write buffer will be the exact same as the read buffer\r
- // since initially were not forced to write changes to the disk.\r
- private byte[] writeBytes = readBytes;\r
-\r
- private int readBufferIndex = 0;\r
- private int writeBufferIndex = 0;\r
-\r
- /**\r
- * <code>true</code> if something has been written since last call to\r
- * {@link #resetRead()}. By default it is <code>true</code> since\r
- * {@link #resetRead()} must on its first invocation write the {@link #TERM}\r
- * constant into the stream.\r
- */\r
- private boolean wroteSinceLastRead = true;\r
-\r
- private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);\r
- private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);\r
-\r
- private File tempFile;\r
- private RandomAccessFile file;\r
- private FileChannel channel;\r
- private long currentPosition = 0L;\r
- private long readPosition = 0L;\r
- private long writePosition = 0L;\r
-\r
- static abstract class NoRemoveIterator<T> implements Iterator<T> {\r
- @Override\r
- public void remove() {\r
- throw new UnsupportedOperationException();\r
- }\r
- }\r
-\r
- /**\r
- * Base class for all immutable collections returned by\r
- * {@link ClientChangesImpl}.\r
- * \r
- * @param <T>\r
- */\r
- static class CollectionBase<T> implements Collection<T> {\r
-\r
- @Override\r
- public int size() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean isEmpty() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean contains(Object o) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public Iterator<T> iterator() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public Object[] toArray() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <TT> TT[] toArray(TT[] a) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean add(T e) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean remove(Object o) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean containsAll(Collection<?> c) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean addAll(Collection<? extends T> c) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean removeAll(Collection<?> c) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean retainAll(Collection<?> c) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void clear() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- };\r
-\r
- ClientChangesImpl(SessionImplSocket session) {\r
- this.session = session;\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- final int LIMIT = 10000;\r
- String to = "";\r
- for (StatementChange s : changedStatements()) {\r
- String op = s.isClaim() ? "add" : "rem";\r
- to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();\r
- if (to.length() > LIMIT) {\r
- to += "... more.\n";\r
- return to;\r
- }\r
- }\r
- for (Resource r : changedValues()) {\r
- to += "\nmod " + r; \r
- if (to.length() > LIMIT) {\r
- to += "...etc.\n";\r
- return to;\r
- }\r
- }\r
- return to;\r
- }\r
-\r
- private Resource getResource(int id) {\r
- return session.getResource(id);\r
- }\r
-\r
- @Override\r
- public synchronized Collection<StatementChange> changedStatements() {\r
-\r
- if (!resetRead())\r
- return Collections.emptyList();\r
-\r
- return new CollectionBase<StatementChange>() {\r
-\r
- StatementChange seek() {\r
- try {\r
- while(true) {\r
- byte method = readByte();\r
- switch(method) {\r
- case TERM: {\r
- return null;\r
- }\r
- case CLAIM: {\r
- Resource s = getResource(readInt());\r
- Resource p = getResource(readInt());\r
- Resource o = getResource(readInt());\r
- return new StatementChangeImpl(s, p, o, true);\r
- }\r
- case DENY: {\r
- Resource s = getResource(readInt());\r
- Resource p = getResource(readInt());\r
- Resource o = getResource(readInt());\r
- return new StatementChangeImpl(s, p, o, false);\r
- }\r
- case VALUE:\r
- case INVALIDATE: {\r
- // Skipping value opererations.\r
- getResource(readInt());\r
- break;\r
- }\r
- }\r
- }\r
- } catch(Exception e) {\r
- throw new RuntimeDatabaseException(e);\r
- }\r
- }\r
-\r
- private StatementChange next = seek();\r
-\r
- @Override\r
- public Iterator<StatementChange> iterator() {\r
- return new NoRemoveIterator<StatementChange>() {\r
- @Override\r
- public boolean hasNext() {\r
- return next != null;\r
- }\r
- @Override\r
- public StatementChange next() {\r
- StatementChange result = next;\r
- next = seek();\r
- return result;\r
- }\r
- };\r
- }\r
-\r
- };\r
-\r
- }\r
-\r
- @Override\r
- public Collection<Resource> changedValues() {\r
-\r
- if (!resetRead())\r
- return Collections.emptyList();\r
-\r
- return new CollectionBase<Resource>() {\r
-\r
- Resource seek() {\r
- try {\r
- while(true) {\r
- byte method = readByte();\r
- switch(method) {\r
- case TERM: {\r
- return null;\r
- }\r
- case CLAIM: {\r
- getResource(readInt());\r
- getResource(readInt());\r
- getResource(readInt());\r
- break;\r
- }\r
- case DENY: {\r
- getResource(readInt());\r
- getResource(readInt());\r
- getResource(readInt());\r
- break;\r
- }\r
- case VALUE:\r
- case INVALIDATE: {\r
- return getResource(readInt());\r
- }\r
- }\r
- }\r
- } catch(Exception e) {\r
- return null;\r
- }\r
- }\r
-\r
- private Resource next = seek();\r
-\r
- @Override\r
- public Iterator<Resource> iterator() {\r
- return new NoRemoveIterator<Resource>() {\r
- @Override\r
- public boolean hasNext() {\r
- return next != null;\r
- }\r
- @Override\r
- public Resource next() {\r
- Resource result = next;\r
- next = seek();\r
- return result;\r
- }\r
- };\r
- }\r
-\r
- };\r
-\r
- }\r
-\r
- @Override\r
- public Collection<Resource> changedResources() {\r
-\r
- if (!resetRead())\r
- return Collections.emptyList();\r
-\r
- return new CollectionBase<Resource>() {\r
-\r
- Resource seek() {\r
-\r
- try {\r
- while(true) {\r
- byte method = readByte();\r
- switch(method) {\r
- case TERM: {\r
- return null;\r
- }\r
- case CLAIM: {\r
- getResource(readInt());\r
- getResource(readInt());\r
- getResource(readInt());\r
- break;\r
- }\r
- case DENY: {\r
- getResource(readInt());\r
- getResource(readInt());\r
- getResource(readInt());\r
- break;\r
- }\r
- case VALUE: {\r
- getResource(readInt());\r
- break;\r
- }\r
- case INVALIDATE: {\r
- return getResource(readInt());\r
- }\r
- }\r
- }\r
- } catch(Exception e) {\r
- return null;\r
- }\r
-\r
- }\r
-\r
- private Resource next = seek();\r
-\r
- @Override\r
- public Iterator<Resource> iterator() {\r
- return new NoRemoveIterator<Resource>() {\r
- @Override\r
- public boolean hasNext() {\r
- return next != null;\r
- }\r
- @Override\r
- public Resource next() {\r
- Resource result = next;\r
- next = seek();\r
- return result;\r
- }\r
- };\r
- }\r
-\r
- };\r
-\r
- }\r
-\r
- @Override\r
- public boolean isEmpty() {\r
- if (notEmpty)\r
- return false;\r
- return writeBufferIndex == 0 && !overflow;\r
- }\r
-\r
- private boolean notEmpty = false;\r
- void setNotEmpty(boolean notEmpty) {\r
- this.notEmpty = notEmpty;\r
- }\r
-\r
- void claim(Resource s, Resource p, Resource o) {\r
- writeByte(CLAIM);\r
- writeInt(session.getId((ResourceImpl) s));\r
- writeInt(session.getId((ResourceImpl) p));\r
- writeInt(session.getId((ResourceImpl) o));\r
- if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());\r
- }\r
-\r
- private int sCache1 = -1;\r
- private int sCache2 = -1;\r
- private int sCache3 = -1;\r
-\r
- final void invalidate(int s) {\r
- \r
- if(s == sCache1) {\r
- return;\r
- } else if (s == sCache2) {\r
- sCache2 = sCache1;\r
- sCache1 = s;\r
- return;\r
- } else if (s == sCache3) {\r
- sCache3 = sCache2;\r
- sCache2 = sCache1;\r
- sCache1 = s;\r
- return;\r
- }\r
-\r
- sCache3 = sCache2;\r
- sCache2 = sCache1;\r
- sCache1 = s;\r
- \r
- writeByte(INVALIDATE);\r
- writeInt(s);\r
-\r
- if(DEBUG) System.err.println("invalidate2 " + s);\r
-\r
- }\r
-\r
- void claim(int s, int p, int o) {\r
- writeByte(CLAIM);\r
- writeInt(s);\r
- writeInt(p);\r
- writeInt(o);\r
- if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);\r
- }\r
-\r
- void deny(Resource s, Resource p, Resource o) {\r
- writeByte(DENY);\r
- writeInt(session.getId((ResourceImpl)s));\r
- writeInt(session.getId((ResourceImpl)p));\r
- writeInt(session.getId((ResourceImpl)o));\r
- if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));\r
- }\r
-\r
- void deny(int s, int p, int o) {\r
- writeByte(DENY);\r
- writeInt(s);\r
- writeInt(p);\r
- writeInt(o);\r
- if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);\r
- }\r
-\r
- void claimValue(Resource r) {\r
- writeByte(VALUE);\r
- writeInt(session.getId((ResourceImpl)r));\r
- if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());\r
- }\r
-\r
- void claimValue(int r) {\r
- writeByte(VALUE);\r
- writeInt(r);\r
- if(DEBUG) System.err.println("deny2 " + r);\r
- }\r
-\r
- // Streaming\r
-\r
- private byte readByte() {\r
- byte result = readBytes[readBufferIndex++]; \r
- if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();\r
- return result;\r
- }\r
-\r
- private int readInt() {\r
- if(readBufferIndex < (BUFFER_SIZE-4)) {\r
- int result = (int) \r
- ((readBytes[readBufferIndex++] & 0xff) | \r
- ((readBytes[readBufferIndex++] & 0xff) << 8) | \r
- ((readBytes[readBufferIndex++] & 0xff) << 16) | \r
- ((readBytes[readBufferIndex++] & 0xff) << 24)); \r
- return result;\r
- } else {\r
- int has = BUFFER_SIZE-readBufferIndex;\r
- int result = 0;\r
- if(has == 0) fillReadBuffer();\r
- result = (int)(readBytes[readBufferIndex++] & 0xff);\r
- if(has == 1) fillReadBuffer();\r
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);\r
- if(has == 2) fillReadBuffer();\r
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);\r
- if(has == 3) fillReadBuffer();\r
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);\r
- if(has == 4) fillReadBuffer();\r
- return result;\r
- }\r
- }\r
-\r
- private void writeByte(int b) {\r
- writeBytes[writeBufferIndex++] = (byte)b;\r
- if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);\r
- wroteSinceLastRead = true;\r
- }\r
-\r
- private void writeInt(int i) {\r
- if(writeBufferIndex < (BUFFER_SIZE-4)) {\r
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
- } else {\r
- int has = BUFFER_SIZE-writeBufferIndex;\r
- if(has == 0) flushWriteBuffer(BUFFER_SIZE);\r
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
- if(has == 1) flushWriteBuffer(BUFFER_SIZE);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
- if(has == 2) flushWriteBuffer(BUFFER_SIZE);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
- if(has == 3) flushWriteBuffer(BUFFER_SIZE);\r
- writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
- if(has == 4) flushWriteBuffer(BUFFER_SIZE);\r
- }\r
- wroteSinceLastRead = true;\r
- }\r
-\r
- private int tryFlushBuffer(int size) throws IOException {\r
- if (DEBUG)\r
- System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);\r
-\r
- if (tempFile == null) {\r
- File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;\r
- base.mkdirs();\r
- tempFile = File.createTempFile("cset-", ".cs", base);\r
- file = new RandomAccessFile(tempFile, "rwd");\r
- channel = file.getChannel();\r
- if (DEBUG)\r
- System.err.println("Allocated temporary file for changeset: " + tempFile);\r
- }\r
-\r
- int wrote = 0;\r
- writeBuffer.position(0);\r
- writeBuffer.limit(size);\r
- for (; wrote < size;) {\r
- wrote += channel.write(writeBuffer);\r
- }\r
- return wrote;\r
- }\r
-\r
- private void flushWriteBuffer(int size) {\r
- if (DEBUG)\r
- System.err.println("flushBuffer(" + size + ") overflow=" + overflow);\r
- if (size == 0)\r
- return;\r
-\r
- if (!overflow) {\r
- // Change set is about to become file-backed.\r
- // Initialize separate buffer for writing.\r
- writeBytes = Arrays.copyOf(readBytes, readBytes.length);\r
- writeBuffer = ByteBuffer.wrap(writeBytes);\r
- overflow = true;\r
- }\r
-\r
- writeBufferIndex = 0;\r
- int wrote = 0;\r
- try {\r
- setFilePosition(writePosition);\r
- wrote = tryFlushBuffer(size);\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- throw new RuntimeDatabaseException(e);\r
- } finally {\r
- try {\r
- currentPosition = channel.position();\r
- writePosition = currentPosition;\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- if (DEBUG)\r
- System.err.println("flushWriteBuffer wrote " + wrote + " bytes");\r
- }\r
- }\r
-\r
- private boolean resetRead() {\r
- if (DEBUG)\r
- System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);\r
- try {\r
- // Flush temporary file first if necessary.\r
- if (wroteSinceLastRead) {\r
- if (DEBUG)\r
- System.err.println("\tTerminating write of changeset " + tempFile);\r
- writeByte(TERM);\r
- wroteSinceLastRead = false;\r
- }\r
- if (overflow) {\r
- if (DEBUG)\r
- System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());\r
- flushWriteBuffer(writeBufferIndex);\r
- readPosition = 0L;\r
- setFilePosition(readPosition);\r
- fillReadBuffer();\r
- } else {\r
- readBufferIndex = 0;\r
- }\r
- return true;\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- return false;\r
- }\r
- }\r
-\r
- private void fillReadBuffer() {\r
- readBufferIndex = 0;\r
- int read = 0;\r
- try {\r
- setFilePosition(readPosition);\r
- readBuffer.position(0);\r
- readBuffer.limit(BUFFER_SIZE);\r
- for (;read < BUFFER_SIZE;) {\r
- int got = channel.read(readBuffer);\r
- if (got == -1)\r
- return;\r
- read += got;\r
- }\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- } finally {\r
- try {\r
- currentPosition = channel.position();\r
- readPosition = currentPosition;\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- if (DEBUG)\r
- System.err.println("fillReadBuffer read " + read + " bytes");\r
- }\r
- }\r
-\r
- /**\r
- * @param newPosition\r
- * @return <code>true</code> if file position was modified\r
- * @throws IOException\r
- */\r
- private boolean setFilePosition(long newPosition) throws IOException {\r
- if (channel == null) {\r
- return false;\r
- }\r
- if (newPosition != currentPosition) {\r
- if (DEBUG)\r
- System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);\r
- channel.position(newPosition);\r
- currentPosition = newPosition;\r
- return true;\r
- }\r
- return false;\r
- }\r
-\r
- @Override\r
- protected void finalize() throws Throwable {\r
- close();\r
- super.finalize();\r
- }\r
-\r
- public void close() {\r
- session = null;\r
- channel = null;\r
- if (file != null) {\r
- if (DEBUG)\r
- System.err.println("Closing temporary changeset input stream " + file);\r
- try {\r
- file.close();\r
- } catch (IOException e) {\r
- } finally {\r
- file = null;\r
- }\r
- }\r
- if (tempFile != null) {\r
- if (DEBUG)\r
- System.err.println("Deleting temporary changeset file " + tempFile);\r
- tempFile.delete();\r
- tempFile = null;\r
- }\r
- }\r
-\r
- public static void main(String[] args) {\r
- ClientChangesImpl ccs = new ClientChangesImpl(null);\r
- System.out.println("isEmpty=" + ccs.isEmpty());\r
- System.out.println("clientChanges=" + ccs.toString());\r
- System.out.println("isEmpty=" + ccs.isEmpty());\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.simantics.db.ChangeSet;
+import org.simantics.db.Disposable;
+import org.simantics.db.Resource;
+import org.simantics.db.common.internal.config.InternalClientConfig;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ResourceImpl;
+
+/**
+ * @author Antti Villberg
+ * @author Tuukka Lehtonen
+ */
+public final class ClientChangesImpl implements ChangeSet, Disposable {
+
+ private static final boolean ASSERT_ALIVE = true;
+
+ private static final byte[] EMPTY = {};
+
+ private static final boolean DEBUG = false;
+
+ private static final int BUFFER_SIZE = 1 << 17;
+
+ private static final byte CLAIM = 1;
+ private static final byte DENY = 2;
+ private static final byte VALUE = 3;
+ private static final byte TERM = 4;
+ private static final byte INVALIDATE = 5;
+
+ private boolean disposed = false;
+
+ private SessionImplSocket session;
+
+ /**
+ * True if enough data has been written to overflow the normal buffer which
+ * will result in data being written to disk.
+ */
+ private boolean overflow = false;
+
+ private byte[] readBytes = new byte[BUFFER_SIZE];
+ // By default the write buffer will be the exact same as the read buffer
+ // since initially were not forced to write changes to the disk.
+ private byte[] writeBytes = readBytes;
+
+ private int readBufferIndex = 0;
+ private int writeBufferIndex = 0;
+
+ /**
+ * <code>true</code> if something has been written since last call to
+ * {@link #resetRead()}. By default it is <code>true</code> since
+ * {@link #resetRead()} must on its first invocation write the {@link #TERM}
+ * constant into the stream.
+ */
+ private boolean wroteSinceLastRead = true;
+
+ private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
+ private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
+
+ private File tempFile;
+ private RandomAccessFile file;
+ private FileChannel channel;
+ private long currentPosition = 0L;
+ private long readPosition = 0L;
+ private long writePosition = 0L;
+
+ static abstract class NoRemoveIterator<T> implements Iterator<T> {
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Base class for all immutable collections returned by
+ * {@link ClientChangesImpl}.
+ *
+ * @param <T>
+ */
+ static class CollectionBase<T> implements Collection<T> {
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <TT> TT[] toArray(TT[] a) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean add(T e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+
+ ClientChangesImpl(SessionImplSocket session) {
+ this.session = session;
+ //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
+ }
+
+ @Override
+ public String toString() {
+ final int LIMIT = 10000;
+ String to = "";
+ for (StatementChange s : changedStatements()) {
+ String op = s.isClaim() ? "add" : "rem";
+ to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
+ if (to.length() > LIMIT) {
+ to += "... more.\n";
+ return to;
+ }
+ }
+ for (Resource r : changedValues()) {
+ to += "\nmod " + r;
+ if (to.length() > LIMIT) {
+ to += "...etc.\n";
+ return to;
+ }
+ }
+ return to;
+ }
+
+ private Resource getResource(int id) {
+ return session.getResource(id);
+ }
+
+ @Override
+ public synchronized Collection<StatementChange> changedStatements() {
+ if (ASSERT_ALIVE)
+ assertAlive();
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase<StatementChange>() {
+
+ StatementChange seek() {
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ Resource s = getResource(readInt());
+ Resource p = getResource(readInt());
+ Resource o = getResource(readInt());
+ return new StatementChangeImpl(s, p, o, true);
+ }
+ case DENY: {
+ Resource s = getResource(readInt());
+ Resource p = getResource(readInt());
+ Resource o = getResource(readInt());
+ return new StatementChangeImpl(s, p, o, false);
+ }
+ case VALUE:
+ case INVALIDATE: {
+ // Skipping value opererations.
+ getResource(readInt());
+ break;
+ }
+ }
+ }
+ } catch(Exception e) {
+ throw new RuntimeDatabaseException(e);
+ }
+ }
+
+ private StatementChange next = seek();
+
+ @Override
+ public Iterator<StatementChange> iterator() {
+ return new NoRemoveIterator<StatementChange>() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public StatementChange next() {
+ StatementChange result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public Collection<Resource> changedValues() {
+ if (ASSERT_ALIVE)
+ assertAlive();
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase<Resource>() {
+
+ Resource seek() {
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case DENY: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case VALUE:
+ case INVALIDATE: {
+ return getResource(readInt());
+ }
+ }
+ }
+ } catch(Exception e) {
+ return null;
+ }
+ }
+
+ private Resource next = seek();
+
+ @Override
+ public Iterator<Resource> iterator() {
+ return new NoRemoveIterator<Resource>() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public Resource next() {
+ Resource result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public Collection<Resource> changedResources() {
+ if (ASSERT_ALIVE)
+ assertAlive();
+
+ if (!resetRead())
+ return Collections.emptyList();
+
+ return new CollectionBase<Resource>() {
+
+ Resource seek() {
+
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ return null;
+ }
+ case CLAIM: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case DENY: {
+ getResource(readInt());
+ getResource(readInt());
+ getResource(readInt());
+ break;
+ }
+ case VALUE: {
+ getResource(readInt());
+ break;
+ }
+ case INVALIDATE: {
+ return getResource(readInt());
+ }
+ }
+ }
+ } catch(Exception e) {
+ return null;
+ }
+
+ }
+
+ private Resource next = seek();
+
+ @Override
+ public Iterator<Resource> iterator() {
+ return new NoRemoveIterator<Resource>() {
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+ @Override
+ public Resource next() {
+ Resource result = next;
+ next = seek();
+ return result;
+ }
+ };
+ }
+
+ };
+
+ }
+
+ @Override
+ public boolean isEmpty() {
+ if (notEmpty)
+ return false;
+ return writeBufferIndex == 0 && !overflow;
+ }
+
+ private boolean notEmpty = false;
+ void setNotEmpty(boolean notEmpty) {
+ this.notEmpty = notEmpty;
+ }
+
+ void claim(Resource s, Resource p, Resource o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(CLAIM);
+ writeInt(session.getId((ResourceImpl) s));
+ writeInt(session.getId((ResourceImpl) p));
+ writeInt(session.getId((ResourceImpl) o));
+ if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
+ }
+
+ private int sCache1 = -1;
+ private int sCache2 = -1;
+ private int sCache3 = -1;
+
+ final void invalidate(int s) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+
+ if(s == sCache1) {
+ return;
+ } else if (s == sCache2) {
+ sCache2 = sCache1;
+ sCache1 = s;
+ return;
+ } else if (s == sCache3) {
+ sCache3 = sCache2;
+ sCache2 = sCache1;
+ sCache1 = s;
+ return;
+ }
+
+ sCache3 = sCache2;
+ sCache2 = sCache1;
+ sCache1 = s;
+
+ writeByte(INVALIDATE);
+ writeInt(s);
+
+ if(DEBUG) System.err.println("invalidate2 " + s);
+
+ }
+
+ void claim(int s, int p, int o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(CLAIM);
+ writeInt(s);
+ writeInt(p);
+ writeInt(o);
+ if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
+ }
+
+ void deny(Resource s, Resource p, Resource o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(DENY);
+ writeInt(session.getId((ResourceImpl)s));
+ writeInt(session.getId((ResourceImpl)p));
+ writeInt(session.getId((ResourceImpl)o));
+ if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
+ }
+
+ void deny(int s, int p, int o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(DENY);
+ writeInt(s);
+ writeInt(p);
+ writeInt(o);
+ if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
+ }
+
+ void claimValue(Resource r) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(VALUE);
+ writeInt(session.getId((ResourceImpl)r));
+ if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
+ }
+
+ void claimValue(int r) {
+ if (ASSERT_ALIVE)
+ assertAlive();
+ writeByte(VALUE);
+ writeInt(r);
+ if(DEBUG) System.err.println("deny2 " + r);
+ }
+
+ // Streaming
+
+ private byte readByte() {
+ byte result = readBytes[readBufferIndex++];
+ if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
+ return result;
+ }
+
+ private int readInt() {
+ int ri = readBufferIndex;
+ int remaining = BUFFER_SIZE - ri;
+ int result;
+ switch (remaining) {
+ case 0:
+ fillReadBuffer();
+ // Intentional fall-through
+ default:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16) |
+ ((readBytes[ri+3] & 0xff) << 24));
+ readBufferIndex += 4;
+ return result;
+
+ case 4:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16) |
+ ((readBytes[ri+3] & 0xff) << 24));
+ fillReadBuffer();
+ return result;
+
+ case 3:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16));
+ fillReadBuffer();
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
+ return result;
+
+ case 2:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8));
+ fillReadBuffer();
+ ri = readBufferIndex;
+ result |= (int)(
+ ((readBytes[ri] & 0xff) << 16) |
+ ((readBytes[ri+1] & 0xff) << 24));
+ readBufferIndex += 2;
+ return result;
+
+ case 1:
+ result = (int)(readBytes[ri] & 0xff);
+ fillReadBuffer();
+ ri = readBufferIndex;
+ result |= (int)(
+ ((readBytes[ri] & 0xff) << 8) |
+ ((readBytes[ri+1] & 0xff) << 16) |
+ ((readBytes[ri+2] & 0xff) << 24));
+ readBufferIndex += 3;
+ return result;
+ }
+ }
+
+ private void writeByte(int b) {
+ writeBytes[writeBufferIndex++] = (byte)b;
+ if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
+ wroteSinceLastRead = true;
+ }
+
+ private void writeInt(int i) {
+ int wi = writeBufferIndex;
+ int room = BUFFER_SIZE - wi;
+ switch (room) {
+ case 0:
+ flushWriteBuffer(BUFFER_SIZE);
+ // Intentional fall-through
+ default:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 4;
+ break;
+
+ case 4:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ break;
+
+ case 3:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
+ break;
+
+ case 2:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ wi = writeBufferIndex;
+ writeBytes[wi] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+1] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 2;
+ break;
+
+ case 1:
+ writeBytes[wi] = (byte)(i&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ wi = writeBufferIndex;
+ writeBytes[wi] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+1] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 3;
+ break;
+ }
+ wroteSinceLastRead = true;
+ }
+
+ private int tryFlushBuffer(int size) throws IOException {
+ if (DEBUG)
+ System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
+
+ if (tempFile == null) {
+ File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
+ base.mkdirs();
+ tempFile = File.createTempFile("cset-", ".cs", base);
+ file = new RandomAccessFile(tempFile, "rwd");
+ channel = file.getChannel();
+ if (DEBUG)
+ System.err.println("Allocated temporary file for changeset: " + tempFile);
+ }
+
+ int wrote = 0;
+ writeBuffer.position(0);
+ writeBuffer.limit(size);
+ for (; wrote < size;) {
+ wrote += channel.write(writeBuffer);
+ }
+ return wrote;
+ }
+
+ private void flushWriteBuffer(int size) {
+ if (DEBUG)
+ System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
+ if (size == 0)
+ return;
+
+ if (!overflow) {
+ // Change set is about to become file-backed.
+ // Initialize separate buffer for writing.
+ writeBytes = Arrays.copyOf(readBytes, readBytes.length);
+ writeBuffer = ByteBuffer.wrap(writeBytes);
+ overflow = true;
+ }
+
+ writeBufferIndex = 0;
+ int wrote = 0;
+ try {
+ setFilePosition(writePosition);
+ wrote = tryFlushBuffer(size);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeDatabaseException(e);
+ } finally {
+ try {
+ currentPosition = channel.position();
+ writePosition = currentPosition;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (DEBUG)
+ System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
+ }
+ }
+
+ private boolean resetRead() {
+ if (DEBUG)
+ System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
+ try {
+ // Flush temporary file first if necessary.
+ if (wroteSinceLastRead) {
+ if (DEBUG)
+ System.err.println("\tTerminating write of changeset " + tempFile);
+ writeByte(TERM);
+ wroteSinceLastRead = false;
+ }
+ if (overflow) {
+ if (DEBUG)
+ System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
+ flushWriteBuffer(writeBufferIndex);
+ readPosition = 0L;
+ setFilePosition(readPosition);
+ fillReadBuffer();
+ } else {
+ readBufferIndex = 0;
+ }
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ private void fillReadBuffer() {
+ readBufferIndex = 0;
+ int read = 0;
+ try {
+ setFilePosition(readPosition);
+ readBuffer.position(0);
+ readBuffer.limit(BUFFER_SIZE);
+ for (;read < BUFFER_SIZE;) {
+ int got = channel.read(readBuffer);
+ if (got == -1)
+ return;
+ read += got;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ currentPosition = channel.position();
+ readPosition = currentPosition;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (DEBUG)
+ System.err.println("fillReadBuffer read " + read + " bytes");
+ }
+ }
+
+ /**
+ * @param newPosition
+ * @return <code>true</code> if file position was modified
+ * @throws IOException
+ */
+ private boolean setFilePosition(long newPosition) throws IOException {
+ if (channel == null) {
+ return false;
+ }
+ if (newPosition != currentPosition) {
+ if (DEBUG)
+ System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
+ channel.position(newPosition);
+ currentPosition = newPosition;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+
+ public void close() {
+ if (disposed)
+ return;
+
+ disposed = true;
+ session = null;
+ channel = null;
+ if (file != null) {
+ if (DEBUG)
+ System.err.println("Closing temporary changeset input stream " + file);
+ try {
+ file.close();
+ } catch (IOException e) {
+ } finally {
+ file = null;
+ }
+ }
+ if (tempFile != null) {
+ if (DEBUG)
+ System.err.println("Deleting temporary changeset file " + tempFile);
+ tempFile.delete();
+ tempFile = null;
+ }
+
+ // Release large buffers to minimize memory consumption
+ readBytes = EMPTY;
+ readBuffer = ByteBuffer.wrap(EMPTY);
+ writeBytes = EMPTY;
+ writeBuffer = ByteBuffer.wrap(EMPTY);
+ }
+
+ private void assertAlive() {
+ if (disposed)
+ throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
+ }
+
+ @Override
+ public void dispose() {
+ //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
+ close();
+ }
+
+ public static void main(String[] args) {
+ ClientChangesImpl ccs = new ClientChangesImpl(null);
+ System.out.println("isEmpty=" + ccs.isEmpty());
+ System.out.println("clientChanges=" + ccs.toString());
+ System.out.println("isEmpty=" + ccs.isEmpty());
+ }
+
+}