]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
Declare asyncCount volatile since it is accessed from multiple threads
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClientChangesImpl.java
index 9c51e267d3af422fe0e8876fb3274c47221fa29e..000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.io.RandomAccessFile;\r
-import java.nio.ByteBuffer;\r
-import java.nio.channels.FileChannel;\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.Iterator;\r
-\r
-import org.simantics.db.ChangeSet;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.common.internal.config.InternalClientConfig;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.ResourceImpl;\r
-\r
-/**\r
- * @author Antti Villberg\r
- * @author Tuukka Lehtonen\r
- */\r
-public final class ClientChangesImpl implements ChangeSet {\r
-\r
-    private static final boolean DEBUG = false;\r
-\r
-    private static final int BUFFER_SIZE = 1 << 17;\r
-\r
-    private static final byte CLAIM = 1;\r
-    private static final byte DENY = 2;\r
-    private static final byte VALUE = 3;\r
-    private static final byte TERM = 4;\r
-    private static final byte INVALIDATE = 5;\r
-\r
-    private SessionImplSocket session;\r
-\r
-    /**\r
-     * True if enough data has been written to overflow the normal buffer which\r
-     * will result in data being written to disk.\r
-     */\r
-    private boolean overflow = false;\r
-\r
-    private final byte[] readBytes = new byte[BUFFER_SIZE];\r
-    // By default the write buffer will be the exact same as the read buffer\r
-    // since initially were not forced to write changes to the disk.\r
-    private byte[] writeBytes = readBytes;\r
-\r
-    private int readBufferIndex = 0;\r
-    private int writeBufferIndex = 0;\r
-\r
-    /**\r
-     * <code>true</code> if something has been written since last call to\r
-     * {@link #resetRead()}. By default it is <code>true</code> since\r
-     * {@link #resetRead()} must on its first invocation write the {@link #TERM}\r
-     * constant into the stream.\r
-     */\r
-    private boolean wroteSinceLastRead = true;\r
-\r
-    private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);\r
-    private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);\r
-\r
-    private File tempFile;\r
-    private RandomAccessFile file;\r
-    private FileChannel channel;\r
-    private long currentPosition = 0L;\r
-    private long readPosition = 0L;\r
-    private long writePosition = 0L;\r
-\r
-    static abstract class NoRemoveIterator<T> implements Iterator<T> {\r
-        @Override\r
-        public void remove() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Base class for all immutable collections returned by\r
-     * {@link ClientChangesImpl}.\r
-     * \r
-     * @param <T>\r
-     */\r
-    static class CollectionBase<T> implements Collection<T> {\r
-\r
-        @Override\r
-        public int size() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean isEmpty() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean contains(Object o) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public Iterator<T> iterator() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public Object[] toArray() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public <TT> TT[] toArray(TT[] a) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean add(T e) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean remove(Object o) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean containsAll(Collection<?> c) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean addAll(Collection<? extends T> c) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean removeAll(Collection<?> c) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public boolean retainAll(Collection<?> c) {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-        @Override\r
-        public void clear() {\r
-            throw new UnsupportedOperationException();\r
-        }\r
-\r
-    };\r
-\r
-    ClientChangesImpl(SessionImplSocket session) {\r
-        this.session = session;\r
-    }\r
-\r
-    @Override\r
-    public String toString() {\r
-        final int LIMIT = 10000;\r
-        String to = "";\r
-        for (StatementChange s : changedStatements()) {\r
-            String op = s.isClaim() ? "add" : "rem";\r
-            to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();\r
-            if (to.length() > LIMIT) {\r
-                to += "... more.\n";\r
-                return to;\r
-            }\r
-        }\r
-        for (Resource r : changedValues()) {\r
-            to += "\nmod " + r; \r
-            if (to.length() > LIMIT) {\r
-                to += "...etc.\n";\r
-                return to;\r
-            }\r
-        }\r
-        return to;\r
-    }\r
-\r
-    private Resource getResource(int id) {\r
-        return session.getResource(id);\r
-    }\r
-\r
-    @Override\r
-    public synchronized Collection<StatementChange> changedStatements() {\r
-\r
-        if (!resetRead())\r
-            return Collections.emptyList();\r
-\r
-        return new CollectionBase<StatementChange>() {\r
-\r
-            StatementChange seek() {\r
-                try {\r
-                    while(true) {\r
-                        byte method = readByte();\r
-                        switch(method) {\r
-                        case TERM: {\r
-                            return null;\r
-                        }\r
-                        case CLAIM: {\r
-                            Resource s = getResource(readInt());\r
-                            Resource p = getResource(readInt());\r
-                            Resource o = getResource(readInt());\r
-                            return new StatementChangeImpl(s, p, o, true);\r
-                        }\r
-                        case DENY: {\r
-                            Resource s = getResource(readInt());\r
-                            Resource p = getResource(readInt());\r
-                            Resource o = getResource(readInt());\r
-                            return new StatementChangeImpl(s, p, o, false);\r
-                        }\r
-                        case VALUE:\r
-                        case INVALIDATE: {\r
-                            // Skipping value opererations.\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        }\r
-                    }\r
-                } catch(Exception e) {\r
-                    throw new RuntimeDatabaseException(e);\r
-                }\r
-            }\r
-\r
-            private StatementChange next = seek();\r
-\r
-            @Override\r
-            public Iterator<StatementChange> iterator() {\r
-                return new NoRemoveIterator<StatementChange>() {\r
-                    @Override\r
-                    public boolean hasNext() {\r
-                        return next != null;\r
-                    }\r
-                    @Override\r
-                    public StatementChange next() {\r
-                        StatementChange result = next;\r
-                        next = seek();\r
-                        return result;\r
-                    }\r
-                };\r
-            }\r
-\r
-        };\r
-\r
-    }\r
-\r
-    @Override\r
-    public Collection<Resource> changedValues() {\r
-\r
-        if (!resetRead())\r
-            return Collections.emptyList();\r
-\r
-        return new CollectionBase<Resource>() {\r
-\r
-            Resource seek() {\r
-                try {\r
-                    while(true) {\r
-                        byte method = readByte();\r
-                        switch(method) {\r
-                        case TERM: {\r
-                            return null;\r
-                        }\r
-                        case CLAIM: {\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        case DENY: {\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        case VALUE:\r
-                        case INVALIDATE: {\r
-                            return getResource(readInt());\r
-                        }\r
-                        }\r
-                    }\r
-                } catch(Exception e) {\r
-                    return null;\r
-                }\r
-            }\r
-\r
-            private Resource next = seek();\r
-\r
-            @Override\r
-            public Iterator<Resource> iterator() {\r
-                return new NoRemoveIterator<Resource>() {\r
-                    @Override\r
-                    public boolean hasNext() {\r
-                        return next != null;\r
-                    }\r
-                    @Override\r
-                    public Resource next() {\r
-                        Resource result = next;\r
-                        next = seek();\r
-                        return result;\r
-                    }\r
-                };\r
-            }\r
-\r
-        };\r
-\r
-    }\r
-\r
-    @Override\r
-    public Collection<Resource> changedResources() {\r
-\r
-        if (!resetRead())\r
-            return Collections.emptyList();\r
-\r
-        return new CollectionBase<Resource>() {\r
-\r
-            Resource seek() {\r
-\r
-                try {\r
-                    while(true) {\r
-                        byte method = readByte();\r
-                        switch(method) {\r
-                        case TERM: {\r
-                            return null;\r
-                        }\r
-                        case CLAIM: {\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        case DENY: {\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        case VALUE: {\r
-                            getResource(readInt());\r
-                            break;\r
-                        }\r
-                        case INVALIDATE: {\r
-                            return getResource(readInt());\r
-                        }\r
-                        }\r
-                    }\r
-                } catch(Exception e) {\r
-                    return null;\r
-                }\r
-\r
-            }\r
-\r
-            private Resource next = seek();\r
-\r
-            @Override\r
-            public Iterator<Resource> iterator() {\r
-                return new NoRemoveIterator<Resource>() {\r
-                    @Override\r
-                    public boolean hasNext() {\r
-                        return next != null;\r
-                    }\r
-                    @Override\r
-                    public Resource next() {\r
-                        Resource result = next;\r
-                        next = seek();\r
-                        return result;\r
-                    }\r
-                };\r
-            }\r
-\r
-        };\r
-\r
-    }\r
-\r
-    @Override\r
-    public boolean isEmpty() {\r
-        if (notEmpty)\r
-            return false;\r
-        return writeBufferIndex == 0 && !overflow;\r
-    }\r
-\r
-    private boolean notEmpty = false;\r
-    void setNotEmpty(boolean notEmpty) {\r
-        this.notEmpty = notEmpty;\r
-    }\r
-\r
-    void claim(Resource s, Resource p, Resource o) {\r
-        writeByte(CLAIM);\r
-        writeInt(session.getId((ResourceImpl) s));\r
-        writeInt(session.getId((ResourceImpl) p));\r
-        writeInt(session.getId((ResourceImpl) o));\r
-        if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());\r
-    }\r
-\r
-    private int sCache1 = -1;\r
-    private int sCache2 = -1;\r
-    private int sCache3 = -1;\r
-\r
-    final void invalidate(int s) {\r
-       \r
-       if(s == sCache1) {\r
-               return;\r
-       } else if (s == sCache2) {\r
-               sCache2 = sCache1;\r
-               sCache1 = s;\r
-               return;\r
-       } else if (s == sCache3) {\r
-               sCache3 = sCache2;\r
-               sCache2 = sCache1;\r
-               sCache1 = s;\r
-               return;\r
-       }\r
-\r
-       sCache3 = sCache2;\r
-       sCache2 = sCache1;\r
-       sCache1 = s;\r
-       \r
-        writeByte(INVALIDATE);\r
-        writeInt(s);\r
-\r
-        if(DEBUG) System.err.println("invalidate2 " + s);\r
-\r
-    }\r
-\r
-    void claim(int s, int p, int o) {\r
-        writeByte(CLAIM);\r
-        writeInt(s);\r
-        writeInt(p);\r
-        writeInt(o);\r
-        if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);\r
-    }\r
-\r
-    void deny(Resource s, Resource p, Resource o) {\r
-        writeByte(DENY);\r
-        writeInt(session.getId((ResourceImpl)s));\r
-        writeInt(session.getId((ResourceImpl)p));\r
-        writeInt(session.getId((ResourceImpl)o));\r
-        if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));\r
-    }\r
-\r
-    void deny(int s, int p, int o) {\r
-        writeByte(DENY);\r
-        writeInt(s);\r
-        writeInt(p);\r
-        writeInt(o);\r
-        if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);\r
-    }\r
-\r
-    void claimValue(Resource r) {\r
-        writeByte(VALUE);\r
-        writeInt(session.getId((ResourceImpl)r));\r
-        if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());\r
-    }\r
-\r
-    void claimValue(int r) {\r
-        writeByte(VALUE);\r
-        writeInt(r);\r
-        if(DEBUG) System.err.println("deny2 " + r);\r
-    }\r
-\r
-    // Streaming\r
-\r
-    private byte readByte() {\r
-        byte result = readBytes[readBufferIndex++]; \r
-        if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();\r
-        return result;\r
-    }\r
-\r
-       private int readInt() {\r
-        if(readBufferIndex < (BUFFER_SIZE-4)) {\r
-            int result = (int) \r
-                ((readBytes[readBufferIndex++] & 0xff) | \r
-                ((readBytes[readBufferIndex++] & 0xff) << 8) | \r
-                ((readBytes[readBufferIndex++] & 0xff) << 16) | \r
-                ((readBytes[readBufferIndex++] & 0xff) << 24)); \r
-            return result;\r
-        } else {\r
-            int has = BUFFER_SIZE-readBufferIndex;\r
-            int result = 0;\r
-            if(has == 0) fillReadBuffer();\r
-            result = (int)(readBytes[readBufferIndex++] & 0xff);\r
-            if(has == 1) fillReadBuffer();\r
-            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);\r
-            if(has == 2) fillReadBuffer();\r
-            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);\r
-            if(has == 3) fillReadBuffer();\r
-            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);\r
-            if(has == 4) fillReadBuffer();\r
-            return result;\r
-        }\r
-    }\r
-\r
-    private void writeByte(int b) {\r
-        writeBytes[writeBufferIndex++] = (byte)b;\r
-        if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);\r
-        wroteSinceLastRead = true;\r
-    }\r
-\r
-    private void writeInt(int i) {\r
-        if(writeBufferIndex < (BUFFER_SIZE-4)) {\r
-            writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
-        } else {\r
-            int has = BUFFER_SIZE-writeBufferIndex;\r
-            if(has == 0) flushWriteBuffer(BUFFER_SIZE);\r
-            writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
-            if(has == 1) flushWriteBuffer(BUFFER_SIZE);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
-            if(has == 2) flushWriteBuffer(BUFFER_SIZE);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
-            if(has == 3) flushWriteBuffer(BUFFER_SIZE);\r
-            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
-            if(has == 4) flushWriteBuffer(BUFFER_SIZE);\r
-        }\r
-        wroteSinceLastRead = true;\r
-    }\r
-\r
-    private int tryFlushBuffer(int size) throws IOException {\r
-        if (DEBUG)\r
-            System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);\r
-\r
-        if (tempFile == null) {\r
-            File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;\r
-            base.mkdirs();\r
-            tempFile = File.createTempFile("cset-", ".cs", base);\r
-            file = new RandomAccessFile(tempFile, "rwd");\r
-            channel = file.getChannel();\r
-            if (DEBUG)\r
-                System.err.println("Allocated temporary file for changeset: " + tempFile);\r
-        }\r
-\r
-        int wrote = 0;\r
-        writeBuffer.position(0);\r
-        writeBuffer.limit(size);\r
-        for (; wrote < size;) {\r
-            wrote += channel.write(writeBuffer);\r
-        }\r
-        return wrote;\r
-    }\r
-\r
-    private void flushWriteBuffer(int size) {\r
-        if (DEBUG)\r
-            System.err.println("flushBuffer(" + size + ") overflow=" + overflow);\r
-        if (size == 0)\r
-            return;\r
-\r
-        if (!overflow) {\r
-            // Change set is about to become file-backed.\r
-            // Initialize separate buffer for writing.\r
-            writeBytes = Arrays.copyOf(readBytes, readBytes.length);\r
-            writeBuffer = ByteBuffer.wrap(writeBytes);\r
-            overflow = true;\r
-        }\r
-\r
-        writeBufferIndex = 0;\r
-        int wrote = 0;\r
-        try {\r
-            setFilePosition(writePosition);\r
-            wrote = tryFlushBuffer(size);\r
-        } catch (IOException e) {\r
-            e.printStackTrace();\r
-            throw new RuntimeDatabaseException(e);\r
-        } finally {\r
-            try {\r
-                currentPosition = channel.position();\r
-                writePosition = currentPosition;\r
-            } catch (IOException e) {\r
-                e.printStackTrace();\r
-            }\r
-            if (DEBUG)\r
-                System.err.println("flushWriteBuffer wrote " + wrote + " bytes");\r
-        }\r
-    }\r
-\r
-    private boolean resetRead() {\r
-        if (DEBUG)\r
-            System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);\r
-        try {\r
-            // Flush temporary file first if necessary.\r
-            if (wroteSinceLastRead) {\r
-                if (DEBUG)\r
-                    System.err.println("\tTerminating write of changeset " + tempFile);\r
-                writeByte(TERM);\r
-                wroteSinceLastRead = false;\r
-            }\r
-            if (overflow) {\r
-                if (DEBUG)\r
-                    System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());\r
-                flushWriteBuffer(writeBufferIndex);\r
-                readPosition = 0L;\r
-                setFilePosition(readPosition);\r
-                fillReadBuffer();\r
-            } else {\r
-                readBufferIndex = 0;\r
-            }\r
-            return true;\r
-        } catch (IOException e) {\r
-            e.printStackTrace();\r
-            return false;\r
-        }\r
-    }\r
-\r
-    private void fillReadBuffer() {\r
-        readBufferIndex = 0;\r
-        int read = 0;\r
-        try {\r
-            setFilePosition(readPosition);\r
-            readBuffer.position(0);\r
-            readBuffer.limit(BUFFER_SIZE);\r
-            for (;read < BUFFER_SIZE;) {\r
-                int got = channel.read(readBuffer);\r
-                if (got == -1)\r
-                    return;\r
-                read += got;\r
-            }\r
-        } catch (IOException e) {\r
-            e.printStackTrace();\r
-        } finally {\r
-            try {\r
-                currentPosition = channel.position();\r
-                readPosition = currentPosition;\r
-            } catch (IOException e) {\r
-                e.printStackTrace();\r
-            }\r
-            if (DEBUG)\r
-                System.err.println("fillReadBuffer read " + read + " bytes");\r
-        }\r
-    }\r
-\r
-    /**\r
-     * @param newPosition\r
-     * @return <code>true</code> if file position was modified\r
-     * @throws IOException\r
-     */\r
-    private boolean setFilePosition(long newPosition) throws IOException {\r
-        if (channel == null) {\r
-            return false;\r
-        }\r
-        if (newPosition != currentPosition) {\r
-            if (DEBUG)\r
-                System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);\r
-            channel.position(newPosition);\r
-            currentPosition = newPosition;\r
-            return true;\r
-        }\r
-        return false;\r
-    }\r
-\r
-    @Override\r
-    protected void finalize() throws Throwable {\r
-        close();\r
-        super.finalize();\r
-    }\r
-\r
-    public void close() {\r
-        session = null;\r
-        channel = null;\r
-        if (file != null) {\r
-            if (DEBUG)\r
-                System.err.println("Closing temporary changeset input stream " + file);\r
-            try {\r
-                file.close();\r
-            } catch (IOException e) {\r
-            } finally {\r
-                file = null;\r
-            }\r
-        }\r
-        if (tempFile != null) {\r
-            if (DEBUG)\r
-                System.err.println("Deleting temporary changeset file " + tempFile);\r
-            tempFile.delete();\r
-            tempFile = null;\r
-        }\r
-    }\r
-\r
-    public static void main(String[] args) {\r
-        ClientChangesImpl ccs = new ClientChangesImpl(null);\r
-        System.out.println("isEmpty=" + ccs.isEmpty());\r
-        System.out.println("clientChanges=" + ccs.toString());\r
-        System.out.println("isEmpty=" + ccs.isEmpty());\r
-    }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.simantics.db.ChangeSet;
+import org.simantics.db.Resource;
+import org.simantics.db.common.internal.config.InternalClientConfig;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ResourceImpl;
+
+/**
+ * @author Antti Villberg
+ * @author Tuukka Lehtonen
+ */
+public final class ClientChangesImpl implements ChangeSet {
+
+    private static final boolean DEBUG = false;
+
+    private static final int BUFFER_SIZE = 1 << 17;
+
+    private static final byte CLAIM = 1;
+    private static final byte DENY = 2;
+    private static final byte VALUE = 3;
+    private static final byte TERM = 4;
+    private static final byte INVALIDATE = 5;
+
+    private SessionImplSocket session;
+
+    /**
+     * True if enough data has been written to overflow the normal buffer which
+     * will result in data being written to disk.
+     */
+    private boolean overflow = false;
+
+    private final byte[] readBytes = new byte[BUFFER_SIZE];
+    // By default the write buffer will be the exact same as the read buffer
+    // since initially were not forced to write changes to the disk.
+    private byte[] writeBytes = readBytes;
+
+    private int readBufferIndex = 0;
+    private int writeBufferIndex = 0;
+
+    /**
+     * <code>true</code> if something has been written since last call to
+     * {@link #resetRead()}. By default it is <code>true</code> since
+     * {@link #resetRead()} must on its first invocation write the {@link #TERM}
+     * constant into the stream.
+     */
+    private boolean wroteSinceLastRead = true;
+
+    private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
+    private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
+
+    private File tempFile;
+    private RandomAccessFile file;
+    private FileChannel channel;
+    private long currentPosition = 0L;
+    private long readPosition = 0L;
+    private long writePosition = 0L;
+
+    static abstract class NoRemoveIterator<T> implements Iterator<T> {
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Base class for all immutable collections returned by
+     * {@link ClientChangesImpl}.
+     * 
+     * @param <T>
+     */
+    static class CollectionBase<T> implements Collection<T> {
+
+        @Override
+        public int size() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean contains(Object o) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object[] toArray() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <TT> TT[] toArray(TT[] a) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean add(T e) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean remove(Object o) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean containsAll(Collection<?> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean addAll(Collection<? extends T> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean removeAll(Collection<?> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean retainAll(Collection<?> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void clear() {
+            throw new UnsupportedOperationException();
+        }
+
+    };
+
+    ClientChangesImpl(SessionImplSocket session) {
+        this.session = session;
+    }
+
+    @Override
+    public String toString() {
+        final int LIMIT = 10000;
+        String to = "";
+        for (StatementChange s : changedStatements()) {
+            String op = s.isClaim() ? "add" : "rem";
+            to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
+            if (to.length() > LIMIT) {
+                to += "... more.\n";
+                return to;
+            }
+        }
+        for (Resource r : changedValues()) {
+            to += "\nmod " + r; 
+            if (to.length() > LIMIT) {
+                to += "...etc.\n";
+                return to;
+            }
+        }
+        return to;
+    }
+
+    private Resource getResource(int id) {
+        return session.getResource(id);
+    }
+
+    @Override
+    public synchronized Collection<StatementChange> changedStatements() {
+
+        if (!resetRead())
+            return Collections.emptyList();
+
+        return new CollectionBase<StatementChange>() {
+
+            StatementChange seek() {
+                try {
+                    while(true) {
+                        byte method = readByte();
+                        switch(method) {
+                        case TERM: {
+                            return null;
+                        }
+                        case CLAIM: {
+                            Resource s = getResource(readInt());
+                            Resource p = getResource(readInt());
+                            Resource o = getResource(readInt());
+                            return new StatementChangeImpl(s, p, o, true);
+                        }
+                        case DENY: {
+                            Resource s = getResource(readInt());
+                            Resource p = getResource(readInt());
+                            Resource o = getResource(readInt());
+                            return new StatementChangeImpl(s, p, o, false);
+                        }
+                        case VALUE:
+                        case INVALIDATE: {
+                            // Skipping value opererations.
+                            getResource(readInt());
+                            break;
+                        }
+                        }
+                    }
+                } catch(Exception e) {
+                    throw new RuntimeDatabaseException(e);
+                }
+            }
+
+            private StatementChange next = seek();
+
+            @Override
+            public Iterator<StatementChange> iterator() {
+                return new NoRemoveIterator<StatementChange>() {
+                    @Override
+                    public boolean hasNext() {
+                        return next != null;
+                    }
+                    @Override
+                    public StatementChange next() {
+                        StatementChange result = next;
+                        next = seek();
+                        return result;
+                    }
+                };
+            }
+
+        };
+
+    }
+
+    @Override
+    public Collection<Resource> changedValues() {
+
+        if (!resetRead())
+            return Collections.emptyList();
+
+        return new CollectionBase<Resource>() {
+
+            Resource seek() {
+                try {
+                    while(true) {
+                        byte method = readByte();
+                        switch(method) {
+                        case TERM: {
+                            return null;
+                        }
+                        case CLAIM: {
+                            getResource(readInt());
+                            getResource(readInt());
+                            getResource(readInt());
+                            break;
+                        }
+                        case DENY: {
+                            getResource(readInt());
+                            getResource(readInt());
+                            getResource(readInt());
+                            break;
+                        }
+                        case VALUE:
+                        case INVALIDATE: {
+                            return getResource(readInt());
+                        }
+                        }
+                    }
+                } catch(Exception e) {
+                    return null;
+                }
+            }
+
+            private Resource next = seek();
+
+            @Override
+            public Iterator<Resource> iterator() {
+                return new NoRemoveIterator<Resource>() {
+                    @Override
+                    public boolean hasNext() {
+                        return next != null;
+                    }
+                    @Override
+                    public Resource next() {
+                        Resource result = next;
+                        next = seek();
+                        return result;
+                    }
+                };
+            }
+
+        };
+
+    }
+
+    @Override
+    public Collection<Resource> changedResources() {
+
+        if (!resetRead())
+            return Collections.emptyList();
+
+        return new CollectionBase<Resource>() {
+
+            Resource seek() {
+
+                try {
+                    while(true) {
+                        byte method = readByte();
+                        switch(method) {
+                        case TERM: {
+                            return null;
+                        }
+                        case CLAIM: {
+                            getResource(readInt());
+                            getResource(readInt());
+                            getResource(readInt());
+                            break;
+                        }
+                        case DENY: {
+                            getResource(readInt());
+                            getResource(readInt());
+                            getResource(readInt());
+                            break;
+                        }
+                        case VALUE: {
+                            getResource(readInt());
+                            break;
+                        }
+                        case INVALIDATE: {
+                            return getResource(readInt());
+                        }
+                        }
+                    }
+                } catch(Exception e) {
+                    return null;
+                }
+
+            }
+
+            private Resource next = seek();
+
+            @Override
+            public Iterator<Resource> iterator() {
+                return new NoRemoveIterator<Resource>() {
+                    @Override
+                    public boolean hasNext() {
+                        return next != null;
+                    }
+                    @Override
+                    public Resource next() {
+                        Resource result = next;
+                        next = seek();
+                        return result;
+                    }
+                };
+            }
+
+        };
+
+    }
+
+    @Override
+    public boolean isEmpty() {
+        if (notEmpty)
+            return false;
+        return writeBufferIndex == 0 && !overflow;
+    }
+
+    private boolean notEmpty = false;
+    void setNotEmpty(boolean notEmpty) {
+        this.notEmpty = notEmpty;
+    }
+
+    void claim(Resource s, Resource p, Resource o) {
+        writeByte(CLAIM);
+        writeInt(session.getId((ResourceImpl) s));
+        writeInt(session.getId((ResourceImpl) p));
+        writeInt(session.getId((ResourceImpl) o));
+        if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
+    }
+
+    private int sCache1 = -1;
+    private int sCache2 = -1;
+    private int sCache3 = -1;
+
+    final void invalidate(int s) {
+       
+       if(s == sCache1) {
+               return;
+       } else if (s == sCache2) {
+               sCache2 = sCache1;
+               sCache1 = s;
+               return;
+       } else if (s == sCache3) {
+               sCache3 = sCache2;
+               sCache2 = sCache1;
+               sCache1 = s;
+               return;
+       }
+
+       sCache3 = sCache2;
+       sCache2 = sCache1;
+       sCache1 = s;
+       
+        writeByte(INVALIDATE);
+        writeInt(s);
+
+        if(DEBUG) System.err.println("invalidate2 " + s);
+
+    }
+
+    void claim(int s, int p, int o) {
+        writeByte(CLAIM);
+        writeInt(s);
+        writeInt(p);
+        writeInt(o);
+        if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
+    }
+
+    void deny(Resource s, Resource p, Resource o) {
+        writeByte(DENY);
+        writeInt(session.getId((ResourceImpl)s));
+        writeInt(session.getId((ResourceImpl)p));
+        writeInt(session.getId((ResourceImpl)o));
+        if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
+    }
+
+    void deny(int s, int p, int o) {
+        writeByte(DENY);
+        writeInt(s);
+        writeInt(p);
+        writeInt(o);
+        if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
+    }
+
+    void claimValue(Resource r) {
+        writeByte(VALUE);
+        writeInt(session.getId((ResourceImpl)r));
+        if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
+    }
+
+    void claimValue(int r) {
+        writeByte(VALUE);
+        writeInt(r);
+        if(DEBUG) System.err.println("deny2 " + r);
+    }
+
+    // Streaming
+
+    private byte readByte() {
+        byte result = readBytes[readBufferIndex++]; 
+        if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
+        return result;
+    }
+
+       private int readInt() {
+        if(readBufferIndex < (BUFFER_SIZE-4)) {
+            int result = (int) 
+                ((readBytes[readBufferIndex++] & 0xff) | 
+                ((readBytes[readBufferIndex++] & 0xff) << 8) | 
+                ((readBytes[readBufferIndex++] & 0xff) << 16) | 
+                ((readBytes[readBufferIndex++] & 0xff) << 24)); 
+            return result;
+        } else {
+            int has = BUFFER_SIZE-readBufferIndex;
+            int result = 0;
+            if(has == 0) fillReadBuffer();
+            result = (int)(readBytes[readBufferIndex++] & 0xff);
+            if(has == 1) fillReadBuffer();
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
+            if(has == 2) fillReadBuffer();
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
+            if(has == 3) fillReadBuffer();
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
+            if(has == 4) fillReadBuffer();
+            return result;
+        }
+    }
+
+    private void writeByte(int b) {
+        writeBytes[writeBufferIndex++] = (byte)b;
+        if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
+        wroteSinceLastRead = true;
+    }
+
+    private void writeInt(int i) {
+        if(writeBufferIndex < (BUFFER_SIZE-4)) {
+            writeBytes[writeBufferIndex++] = (byte)(i&0xff);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
+        } else {
+            int has = BUFFER_SIZE-writeBufferIndex;
+            if(has == 0) flushWriteBuffer(BUFFER_SIZE);
+            writeBytes[writeBufferIndex++] = (byte)(i&0xff);
+            if(has == 1) flushWriteBuffer(BUFFER_SIZE);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
+            if(has == 2) flushWriteBuffer(BUFFER_SIZE);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+            if(has == 3) flushWriteBuffer(BUFFER_SIZE);
+            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
+            if(has == 4) flushWriteBuffer(BUFFER_SIZE);
+        }
+        wroteSinceLastRead = true;
+    }
+
+    private int tryFlushBuffer(int size) throws IOException {
+        if (DEBUG)
+            System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
+
+        if (tempFile == null) {
+            File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
+            base.mkdirs();
+            tempFile = File.createTempFile("cset-", ".cs", base);
+            file = new RandomAccessFile(tempFile, "rwd");
+            channel = file.getChannel();
+            if (DEBUG)
+                System.err.println("Allocated temporary file for changeset: " + tempFile);
+        }
+
+        int wrote = 0;
+        writeBuffer.position(0);
+        writeBuffer.limit(size);
+        for (; wrote < size;) {
+            wrote += channel.write(writeBuffer);
+        }
+        return wrote;
+    }
+
+    private void flushWriteBuffer(int size) {
+        if (DEBUG)
+            System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
+        if (size == 0)
+            return;
+
+        if (!overflow) {
+            // Change set is about to become file-backed.
+            // Initialize separate buffer for writing.
+            writeBytes = Arrays.copyOf(readBytes, readBytes.length);
+            writeBuffer = ByteBuffer.wrap(writeBytes);
+            overflow = true;
+        }
+
+        writeBufferIndex = 0;
+        int wrote = 0;
+        try {
+            setFilePosition(writePosition);
+            wrote = tryFlushBuffer(size);
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeDatabaseException(e);
+        } finally {
+            try {
+                currentPosition = channel.position();
+                writePosition = currentPosition;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (DEBUG)
+                System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
+        }
+    }
+
+    private boolean resetRead() {
+        if (DEBUG)
+            System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
+        try {
+            // Flush temporary file first if necessary.
+            if (wroteSinceLastRead) {
+                if (DEBUG)
+                    System.err.println("\tTerminating write of changeset " + tempFile);
+                writeByte(TERM);
+                wroteSinceLastRead = false;
+            }
+            if (overflow) {
+                if (DEBUG)
+                    System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
+                flushWriteBuffer(writeBufferIndex);
+                readPosition = 0L;
+                setFilePosition(readPosition);
+                fillReadBuffer();
+            } else {
+                readBufferIndex = 0;
+            }
+            return true;
+        } catch (IOException e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    private void fillReadBuffer() {
+        readBufferIndex = 0;
+        int read = 0;
+        try {
+            setFilePosition(readPosition);
+            readBuffer.position(0);
+            readBuffer.limit(BUFFER_SIZE);
+            for (;read < BUFFER_SIZE;) {
+                int got = channel.read(readBuffer);
+                if (got == -1)
+                    return;
+                read += got;
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                currentPosition = channel.position();
+                readPosition = currentPosition;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (DEBUG)
+                System.err.println("fillReadBuffer read " + read + " bytes");
+        }
+    }
+
+    /**
+     * @param newPosition
+     * @return <code>true</code> if file position was modified
+     * @throws IOException
+     */
+    private boolean setFilePosition(long newPosition) throws IOException {
+        if (channel == null) {
+            return false;
+        }
+        if (newPosition != currentPosition) {
+            if (DEBUG)
+                System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
+            channel.position(newPosition);
+            currentPosition = newPosition;
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
+    }
+
+    public void close() {
+        session = null;
+        channel = null;
+        if (file != null) {
+            if (DEBUG)
+                System.err.println("Closing temporary changeset input stream " + file);
+            try {
+                file.close();
+            } catch (IOException e) {
+            } finally {
+                file = null;
+            }
+        }
+        if (tempFile != null) {
+            if (DEBUG)
+                System.err.println("Deleting temporary changeset file " + tempFile);
+            tempFile.delete();
+            tempFile = null;
+        }
+    }
+
+    public static void main(String[] args) {
+        ClientChangesImpl ccs = new ClientChangesImpl(null);
+        System.out.println("isEmpty=" + ccs.isEmpty());
+        System.out.println("clientChanges=" + ccs.toString());
+        System.out.println("isEmpty=" + ccs.isEmpty());
+    }
+
+}