]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClientChangesImpl.java
diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
new file mode 100644 (file)
index 0000000..9c51e26
--- /dev/null
@@ -0,0 +1,692 @@
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.io.File;\r
+import java.io.IOException;\r
+import java.io.RandomAccessFile;\r
+import java.nio.ByteBuffer;\r
+import java.nio.channels.FileChannel;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.Iterator;\r
+\r
+import org.simantics.db.ChangeSet;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.common.internal.config.InternalClientConfig;\r
+import org.simantics.db.exception.RuntimeDatabaseException;\r
+import org.simantics.db.impl.ResourceImpl;\r
+\r
+/**\r
+ * @author Antti Villberg\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public final class ClientChangesImpl implements ChangeSet {\r
+\r
+    private static final boolean DEBUG = false;\r
+\r
+    private static final int BUFFER_SIZE = 1 << 17;\r
+\r
+    private static final byte CLAIM = 1;\r
+    private static final byte DENY = 2;\r
+    private static final byte VALUE = 3;\r
+    private static final byte TERM = 4;\r
+    private static final byte INVALIDATE = 5;\r
+\r
+    private SessionImplSocket session;\r
+\r
+    /**\r
+     * True if enough data has been written to overflow the normal buffer which\r
+     * will result in data being written to disk.\r
+     */\r
+    private boolean overflow = false;\r
+\r
+    private final byte[] readBytes = new byte[BUFFER_SIZE];\r
+    // By default the write buffer will be the exact same as the read buffer\r
+    // since initially were not forced to write changes to the disk.\r
+    private byte[] writeBytes = readBytes;\r
+\r
+    private int readBufferIndex = 0;\r
+    private int writeBufferIndex = 0;\r
+\r
+    /**\r
+     * <code>true</code> if something has been written since last call to\r
+     * {@link #resetRead()}. By default it is <code>true</code> since\r
+     * {@link #resetRead()} must on its first invocation write the {@link #TERM}\r
+     * constant into the stream.\r
+     */\r
+    private boolean wroteSinceLastRead = true;\r
+\r
+    private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);\r
+    private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);\r
+\r
+    private File tempFile;\r
+    private RandomAccessFile file;\r
+    private FileChannel channel;\r
+    private long currentPosition = 0L;\r
+    private long readPosition = 0L;\r
+    private long writePosition = 0L;\r
+\r
+    static abstract class NoRemoveIterator<T> implements Iterator<T> {\r
+        @Override\r
+        public void remove() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+    }\r
+\r
+    /**\r
+     * Base class for all immutable collections returned by\r
+     * {@link ClientChangesImpl}.\r
+     * \r
+     * @param <T>\r
+     */\r
+    static class CollectionBase<T> implements Collection<T> {\r
+\r
+        @Override\r
+        public int size() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean isEmpty() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean contains(Object o) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public Iterator<T> iterator() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public Object[] toArray() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public <TT> TT[] toArray(TT[] a) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean add(T e) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean remove(Object o) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean containsAll(Collection<?> c) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean addAll(Collection<? extends T> c) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean removeAll(Collection<?> c) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public boolean retainAll(Collection<?> c) {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+        @Override\r
+        public void clear() {\r
+            throw new UnsupportedOperationException();\r
+        }\r
+\r
+    };\r
+\r
+    ClientChangesImpl(SessionImplSocket session) {\r
+        this.session = session;\r
+    }\r
+\r
+    @Override\r
+    public String toString() {\r
+        final int LIMIT = 10000;\r
+        String to = "";\r
+        for (StatementChange s : changedStatements()) {\r
+            String op = s.isClaim() ? "add" : "rem";\r
+            to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();\r
+            if (to.length() > LIMIT) {\r
+                to += "... more.\n";\r
+                return to;\r
+            }\r
+        }\r
+        for (Resource r : changedValues()) {\r
+            to += "\nmod " + r; \r
+            if (to.length() > LIMIT) {\r
+                to += "...etc.\n";\r
+                return to;\r
+            }\r
+        }\r
+        return to;\r
+    }\r
+\r
+    private Resource getResource(int id) {\r
+        return session.getResource(id);\r
+    }\r
+\r
+    @Override\r
+    public synchronized Collection<StatementChange> changedStatements() {\r
+\r
+        if (!resetRead())\r
+            return Collections.emptyList();\r
+\r
+        return new CollectionBase<StatementChange>() {\r
+\r
+            StatementChange seek() {\r
+                try {\r
+                    while(true) {\r
+                        byte method = readByte();\r
+                        switch(method) {\r
+                        case TERM: {\r
+                            return null;\r
+                        }\r
+                        case CLAIM: {\r
+                            Resource s = getResource(readInt());\r
+                            Resource p = getResource(readInt());\r
+                            Resource o = getResource(readInt());\r
+                            return new StatementChangeImpl(s, p, o, true);\r
+                        }\r
+                        case DENY: {\r
+                            Resource s = getResource(readInt());\r
+                            Resource p = getResource(readInt());\r
+                            Resource o = getResource(readInt());\r
+                            return new StatementChangeImpl(s, p, o, false);\r
+                        }\r
+                        case VALUE:\r
+                        case INVALIDATE: {\r
+                            // Skipping value opererations.\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        }\r
+                    }\r
+                } catch(Exception e) {\r
+                    throw new RuntimeDatabaseException(e);\r
+                }\r
+            }\r
+\r
+            private StatementChange next = seek();\r
+\r
+            @Override\r
+            public Iterator<StatementChange> iterator() {\r
+                return new NoRemoveIterator<StatementChange>() {\r
+                    @Override\r
+                    public boolean hasNext() {\r
+                        return next != null;\r
+                    }\r
+                    @Override\r
+                    public StatementChange next() {\r
+                        StatementChange result = next;\r
+                        next = seek();\r
+                        return result;\r
+                    }\r
+                };\r
+            }\r
+\r
+        };\r
+\r
+    }\r
+\r
+    @Override\r
+    public Collection<Resource> changedValues() {\r
+\r
+        if (!resetRead())\r
+            return Collections.emptyList();\r
+\r
+        return new CollectionBase<Resource>() {\r
+\r
+            Resource seek() {\r
+                try {\r
+                    while(true) {\r
+                        byte method = readByte();\r
+                        switch(method) {\r
+                        case TERM: {\r
+                            return null;\r
+                        }\r
+                        case CLAIM: {\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        case DENY: {\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        case VALUE:\r
+                        case INVALIDATE: {\r
+                            return getResource(readInt());\r
+                        }\r
+                        }\r
+                    }\r
+                } catch(Exception e) {\r
+                    return null;\r
+                }\r
+            }\r
+\r
+            private Resource next = seek();\r
+\r
+            @Override\r
+            public Iterator<Resource> iterator() {\r
+                return new NoRemoveIterator<Resource>() {\r
+                    @Override\r
+                    public boolean hasNext() {\r
+                        return next != null;\r
+                    }\r
+                    @Override\r
+                    public Resource next() {\r
+                        Resource result = next;\r
+                        next = seek();\r
+                        return result;\r
+                    }\r
+                };\r
+            }\r
+\r
+        };\r
+\r
+    }\r
+\r
+    @Override\r
+    public Collection<Resource> changedResources() {\r
+\r
+        if (!resetRead())\r
+            return Collections.emptyList();\r
+\r
+        return new CollectionBase<Resource>() {\r
+\r
+            Resource seek() {\r
+\r
+                try {\r
+                    while(true) {\r
+                        byte method = readByte();\r
+                        switch(method) {\r
+                        case TERM: {\r
+                            return null;\r
+                        }\r
+                        case CLAIM: {\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        case DENY: {\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        case VALUE: {\r
+                            getResource(readInt());\r
+                            break;\r
+                        }\r
+                        case INVALIDATE: {\r
+                            return getResource(readInt());\r
+                        }\r
+                        }\r
+                    }\r
+                } catch(Exception e) {\r
+                    return null;\r
+                }\r
+\r
+            }\r
+\r
+            private Resource next = seek();\r
+\r
+            @Override\r
+            public Iterator<Resource> iterator() {\r
+                return new NoRemoveIterator<Resource>() {\r
+                    @Override\r
+                    public boolean hasNext() {\r
+                        return next != null;\r
+                    }\r
+                    @Override\r
+                    public Resource next() {\r
+                        Resource result = next;\r
+                        next = seek();\r
+                        return result;\r
+                    }\r
+                };\r
+            }\r
+\r
+        };\r
+\r
+    }\r
+\r
+    @Override\r
+    public boolean isEmpty() {\r
+        if (notEmpty)\r
+            return false;\r
+        return writeBufferIndex == 0 && !overflow;\r
+    }\r
+\r
+    private boolean notEmpty = false;\r
+    void setNotEmpty(boolean notEmpty) {\r
+        this.notEmpty = notEmpty;\r
+    }\r
+\r
+    void claim(Resource s, Resource p, Resource o) {\r
+        writeByte(CLAIM);\r
+        writeInt(session.getId((ResourceImpl) s));\r
+        writeInt(session.getId((ResourceImpl) p));\r
+        writeInt(session.getId((ResourceImpl) o));\r
+        if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());\r
+    }\r
+\r
+    private int sCache1 = -1;\r
+    private int sCache2 = -1;\r
+    private int sCache3 = -1;\r
+\r
+    final void invalidate(int s) {\r
+       \r
+       if(s == sCache1) {\r
+               return;\r
+       } else if (s == sCache2) {\r
+               sCache2 = sCache1;\r
+               sCache1 = s;\r
+               return;\r
+       } else if (s == sCache3) {\r
+               sCache3 = sCache2;\r
+               sCache2 = sCache1;\r
+               sCache1 = s;\r
+               return;\r
+       }\r
+\r
+       sCache3 = sCache2;\r
+       sCache2 = sCache1;\r
+       sCache1 = s;\r
+       \r
+        writeByte(INVALIDATE);\r
+        writeInt(s);\r
+\r
+        if(DEBUG) System.err.println("invalidate2 " + s);\r
+\r
+    }\r
+\r
+    void claim(int s, int p, int o) {\r
+        writeByte(CLAIM);\r
+        writeInt(s);\r
+        writeInt(p);\r
+        writeInt(o);\r
+        if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);\r
+    }\r
+\r
+    void deny(Resource s, Resource p, Resource o) {\r
+        writeByte(DENY);\r
+        writeInt(session.getId((ResourceImpl)s));\r
+        writeInt(session.getId((ResourceImpl)p));\r
+        writeInt(session.getId((ResourceImpl)o));\r
+        if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));\r
+    }\r
+\r
+    void deny(int s, int p, int o) {\r
+        writeByte(DENY);\r
+        writeInt(s);\r
+        writeInt(p);\r
+        writeInt(o);\r
+        if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);\r
+    }\r
+\r
+    void claimValue(Resource r) {\r
+        writeByte(VALUE);\r
+        writeInt(session.getId((ResourceImpl)r));\r
+        if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());\r
+    }\r
+\r
+    void claimValue(int r) {\r
+        writeByte(VALUE);\r
+        writeInt(r);\r
+        if(DEBUG) System.err.println("deny2 " + r);\r
+    }\r
+\r
+    // Streaming\r
+\r
+    private byte readByte() {\r
+        byte result = readBytes[readBufferIndex++]; \r
+        if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();\r
+        return result;\r
+    }\r
+\r
+       private int readInt() {\r
+        if(readBufferIndex < (BUFFER_SIZE-4)) {\r
+            int result = (int) \r
+                ((readBytes[readBufferIndex++] & 0xff) | \r
+                ((readBytes[readBufferIndex++] & 0xff) << 8) | \r
+                ((readBytes[readBufferIndex++] & 0xff) << 16) | \r
+                ((readBytes[readBufferIndex++] & 0xff) << 24)); \r
+            return result;\r
+        } else {\r
+            int has = BUFFER_SIZE-readBufferIndex;\r
+            int result = 0;\r
+            if(has == 0) fillReadBuffer();\r
+            result = (int)(readBytes[readBufferIndex++] & 0xff);\r
+            if(has == 1) fillReadBuffer();\r
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);\r
+            if(has == 2) fillReadBuffer();\r
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);\r
+            if(has == 3) fillReadBuffer();\r
+            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);\r
+            if(has == 4) fillReadBuffer();\r
+            return result;\r
+        }\r
+    }\r
+\r
+    private void writeByte(int b) {\r
+        writeBytes[writeBufferIndex++] = (byte)b;\r
+        if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);\r
+        wroteSinceLastRead = true;\r
+    }\r
+\r
+    private void writeInt(int i) {\r
+        if(writeBufferIndex < (BUFFER_SIZE-4)) {\r
+            writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
+        } else {\r
+            int has = BUFFER_SIZE-writeBufferIndex;\r
+            if(has == 0) flushWriteBuffer(BUFFER_SIZE);\r
+            writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
+            if(has == 1) flushWriteBuffer(BUFFER_SIZE);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
+            if(has == 2) flushWriteBuffer(BUFFER_SIZE);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
+            if(has == 3) flushWriteBuffer(BUFFER_SIZE);\r
+            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
+            if(has == 4) flushWriteBuffer(BUFFER_SIZE);\r
+        }\r
+        wroteSinceLastRead = true;\r
+    }\r
+\r
+    private int tryFlushBuffer(int size) throws IOException {\r
+        if (DEBUG)\r
+            System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);\r
+\r
+        if (tempFile == null) {\r
+            File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;\r
+            base.mkdirs();\r
+            tempFile = File.createTempFile("cset-", ".cs", base);\r
+            file = new RandomAccessFile(tempFile, "rwd");\r
+            channel = file.getChannel();\r
+            if (DEBUG)\r
+                System.err.println("Allocated temporary file for changeset: " + tempFile);\r
+        }\r
+\r
+        int wrote = 0;\r
+        writeBuffer.position(0);\r
+        writeBuffer.limit(size);\r
+        for (; wrote < size;) {\r
+            wrote += channel.write(writeBuffer);\r
+        }\r
+        return wrote;\r
+    }\r
+\r
+    private void flushWriteBuffer(int size) {\r
+        if (DEBUG)\r
+            System.err.println("flushBuffer(" + size + ") overflow=" + overflow);\r
+        if (size == 0)\r
+            return;\r
+\r
+        if (!overflow) {\r
+            // Change set is about to become file-backed.\r
+            // Initialize separate buffer for writing.\r
+            writeBytes = Arrays.copyOf(readBytes, readBytes.length);\r
+            writeBuffer = ByteBuffer.wrap(writeBytes);\r
+            overflow = true;\r
+        }\r
+\r
+        writeBufferIndex = 0;\r
+        int wrote = 0;\r
+        try {\r
+            setFilePosition(writePosition);\r
+            wrote = tryFlushBuffer(size);\r
+        } catch (IOException e) {\r
+            e.printStackTrace();\r
+            throw new RuntimeDatabaseException(e);\r
+        } finally {\r
+            try {\r
+                currentPosition = channel.position();\r
+                writePosition = currentPosition;\r
+            } catch (IOException e) {\r
+                e.printStackTrace();\r
+            }\r
+            if (DEBUG)\r
+                System.err.println("flushWriteBuffer wrote " + wrote + " bytes");\r
+        }\r
+    }\r
+\r
+    private boolean resetRead() {\r
+        if (DEBUG)\r
+            System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);\r
+        try {\r
+            // Flush temporary file first if necessary.\r
+            if (wroteSinceLastRead) {\r
+                if (DEBUG)\r
+                    System.err.println("\tTerminating write of changeset " + tempFile);\r
+                writeByte(TERM);\r
+                wroteSinceLastRead = false;\r
+            }\r
+            if (overflow) {\r
+                if (DEBUG)\r
+                    System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());\r
+                flushWriteBuffer(writeBufferIndex);\r
+                readPosition = 0L;\r
+                setFilePosition(readPosition);\r
+                fillReadBuffer();\r
+            } else {\r
+                readBufferIndex = 0;\r
+            }\r
+            return true;\r
+        } catch (IOException e) {\r
+            e.printStackTrace();\r
+            return false;\r
+        }\r
+    }\r
+\r
+    private void fillReadBuffer() {\r
+        readBufferIndex = 0;\r
+        int read = 0;\r
+        try {\r
+            setFilePosition(readPosition);\r
+            readBuffer.position(0);\r
+            readBuffer.limit(BUFFER_SIZE);\r
+            for (;read < BUFFER_SIZE;) {\r
+                int got = channel.read(readBuffer);\r
+                if (got == -1)\r
+                    return;\r
+                read += got;\r
+            }\r
+        } catch (IOException e) {\r
+            e.printStackTrace();\r
+        } finally {\r
+            try {\r
+                currentPosition = channel.position();\r
+                readPosition = currentPosition;\r
+            } catch (IOException e) {\r
+                e.printStackTrace();\r
+            }\r
+            if (DEBUG)\r
+                System.err.println("fillReadBuffer read " + read + " bytes");\r
+        }\r
+    }\r
+\r
+    /**\r
+     * @param newPosition\r
+     * @return <code>true</code> if file position was modified\r
+     * @throws IOException\r
+     */\r
+    private boolean setFilePosition(long newPosition) throws IOException {\r
+        if (channel == null) {\r
+            return false;\r
+        }\r
+        if (newPosition != currentPosition) {\r
+            if (DEBUG)\r
+                System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);\r
+            channel.position(newPosition);\r
+            currentPosition = newPosition;\r
+            return true;\r
+        }\r
+        return false;\r
+    }\r
+\r
+    @Override\r
+    protected void finalize() throws Throwable {\r
+        close();\r
+        super.finalize();\r
+    }\r
+\r
+    public void close() {\r
+        session = null;\r
+        channel = null;\r
+        if (file != null) {\r
+            if (DEBUG)\r
+                System.err.println("Closing temporary changeset input stream " + file);\r
+            try {\r
+                file.close();\r
+            } catch (IOException e) {\r
+            } finally {\r
+                file = null;\r
+            }\r
+        }\r
+        if (tempFile != null) {\r
+            if (DEBUG)\r
+                System.err.println("Deleting temporary changeset file " + tempFile);\r
+            tempFile.delete();\r
+            tempFile = null;\r
+        }\r
+    }\r
+\r
+    public static void main(String[] args) {\r
+        ClientChangesImpl ccs = new ClientChangesImpl(null);\r
+        System.out.println("isEmpty=" + ccs.isEmpty());\r
+        System.out.println("clientChanges=" + ccs.toString());\r
+        System.out.println("isEmpty=" + ccs.isEmpty());\r
+    }\r
+\r
+}\r