--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.io.File;\r
+import java.io.IOException;\r
+import java.io.RandomAccessFile;\r
+import java.nio.ByteBuffer;\r
+import java.nio.channels.FileChannel;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.Iterator;\r
+\r
+import org.simantics.db.ChangeSet;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.common.internal.config.InternalClientConfig;\r
+import org.simantics.db.exception.RuntimeDatabaseException;\r
+import org.simantics.db.impl.ResourceImpl;\r
+\r
+/**\r
+ * @author Antti Villberg\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public final class ClientChangesImpl implements ChangeSet {\r
+\r
+ private static final boolean DEBUG = false;\r
+\r
+ private static final int BUFFER_SIZE = 1 << 17;\r
+\r
+ private static final byte CLAIM = 1;\r
+ private static final byte DENY = 2;\r
+ private static final byte VALUE = 3;\r
+ private static final byte TERM = 4;\r
+ private static final byte INVALIDATE = 5;\r
+\r
+ private SessionImplSocket session;\r
+\r
+ /**\r
+ * True if enough data has been written to overflow the normal buffer which\r
+ * will result in data being written to disk.\r
+ */\r
+ private boolean overflow = false;\r
+\r
+ private final byte[] readBytes = new byte[BUFFER_SIZE];\r
+ // By default the write buffer will be the exact same as the read buffer\r
+ // since initially were not forced to write changes to the disk.\r
+ private byte[] writeBytes = readBytes;\r
+\r
+ private int readBufferIndex = 0;\r
+ private int writeBufferIndex = 0;\r
+\r
+ /**\r
+ * <code>true</code> if something has been written since last call to\r
+ * {@link #resetRead()}. By default it is <code>true</code> since\r
+ * {@link #resetRead()} must on its first invocation write the {@link #TERM}\r
+ * constant into the stream.\r
+ */\r
+ private boolean wroteSinceLastRead = true;\r
+\r
+ private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);\r
+ private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);\r
+\r
+ private File tempFile;\r
+ private RandomAccessFile file;\r
+ private FileChannel channel;\r
+ private long currentPosition = 0L;\r
+ private long readPosition = 0L;\r
+ private long writePosition = 0L;\r
+\r
+ static abstract class NoRemoveIterator<T> implements Iterator<T> {\r
+ @Override\r
+ public void remove() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Base class for all immutable collections returned by\r
+ * {@link ClientChangesImpl}.\r
+ * \r
+ * @param <T>\r
+ */\r
+ static class CollectionBase<T> implements Collection<T> {\r
+\r
+ @Override\r
+ public int size() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean isEmpty() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean contains(Object o) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public Iterator<T> iterator() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public Object[] toArray() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public <TT> TT[] toArray(TT[] a) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean add(T e) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean remove(Object o) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean containsAll(Collection<?> c) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean addAll(Collection<? extends T> c) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean removeAll(Collection<?> c) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public boolean retainAll(Collection<?> c) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ @Override\r
+ public void clear() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ };\r
+\r
+ ClientChangesImpl(SessionImplSocket session) {\r
+ this.session = session;\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ final int LIMIT = 10000;\r
+ String to = "";\r
+ for (StatementChange s : changedStatements()) {\r
+ String op = s.isClaim() ? "add" : "rem";\r
+ to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();\r
+ if (to.length() > LIMIT) {\r
+ to += "... more.\n";\r
+ return to;\r
+ }\r
+ }\r
+ for (Resource r : changedValues()) {\r
+ to += "\nmod " + r; \r
+ if (to.length() > LIMIT) {\r
+ to += "...etc.\n";\r
+ return to;\r
+ }\r
+ }\r
+ return to;\r
+ }\r
+\r
+ private Resource getResource(int id) {\r
+ return session.getResource(id);\r
+ }\r
+\r
+ @Override\r
+ public synchronized Collection<StatementChange> changedStatements() {\r
+\r
+ if (!resetRead())\r
+ return Collections.emptyList();\r
+\r
+ return new CollectionBase<StatementChange>() {\r
+\r
+ StatementChange seek() {\r
+ try {\r
+ while(true) {\r
+ byte method = readByte();\r
+ switch(method) {\r
+ case TERM: {\r
+ return null;\r
+ }\r
+ case CLAIM: {\r
+ Resource s = getResource(readInt());\r
+ Resource p = getResource(readInt());\r
+ Resource o = getResource(readInt());\r
+ return new StatementChangeImpl(s, p, o, true);\r
+ }\r
+ case DENY: {\r
+ Resource s = getResource(readInt());\r
+ Resource p = getResource(readInt());\r
+ Resource o = getResource(readInt());\r
+ return new StatementChangeImpl(s, p, o, false);\r
+ }\r
+ case VALUE:\r
+ case INVALIDATE: {\r
+ // Skipping value opererations.\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ } catch(Exception e) {\r
+ throw new RuntimeDatabaseException(e);\r
+ }\r
+ }\r
+\r
+ private StatementChange next = seek();\r
+\r
+ @Override\r
+ public Iterator<StatementChange> iterator() {\r
+ return new NoRemoveIterator<StatementChange>() {\r
+ @Override\r
+ public boolean hasNext() {\r
+ return next != null;\r
+ }\r
+ @Override\r
+ public StatementChange next() {\r
+ StatementChange result = next;\r
+ next = seek();\r
+ return result;\r
+ }\r
+ };\r
+ }\r
+\r
+ };\r
+\r
+ }\r
+\r
+ @Override\r
+ public Collection<Resource> changedValues() {\r
+\r
+ if (!resetRead())\r
+ return Collections.emptyList();\r
+\r
+ return new CollectionBase<Resource>() {\r
+\r
+ Resource seek() {\r
+ try {\r
+ while(true) {\r
+ byte method = readByte();\r
+ switch(method) {\r
+ case TERM: {\r
+ return null;\r
+ }\r
+ case CLAIM: {\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ case DENY: {\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ case VALUE:\r
+ case INVALIDATE: {\r
+ return getResource(readInt());\r
+ }\r
+ }\r
+ }\r
+ } catch(Exception e) {\r
+ return null;\r
+ }\r
+ }\r
+\r
+ private Resource next = seek();\r
+\r
+ @Override\r
+ public Iterator<Resource> iterator() {\r
+ return new NoRemoveIterator<Resource>() {\r
+ @Override\r
+ public boolean hasNext() {\r
+ return next != null;\r
+ }\r
+ @Override\r
+ public Resource next() {\r
+ Resource result = next;\r
+ next = seek();\r
+ return result;\r
+ }\r
+ };\r
+ }\r
+\r
+ };\r
+\r
+ }\r
+\r
+ @Override\r
+ public Collection<Resource> changedResources() {\r
+\r
+ if (!resetRead())\r
+ return Collections.emptyList();\r
+\r
+ return new CollectionBase<Resource>() {\r
+\r
+ Resource seek() {\r
+\r
+ try {\r
+ while(true) {\r
+ byte method = readByte();\r
+ switch(method) {\r
+ case TERM: {\r
+ return null;\r
+ }\r
+ case CLAIM: {\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ case DENY: {\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ case VALUE: {\r
+ getResource(readInt());\r
+ break;\r
+ }\r
+ case INVALIDATE: {\r
+ return getResource(readInt());\r
+ }\r
+ }\r
+ }\r
+ } catch(Exception e) {\r
+ return null;\r
+ }\r
+\r
+ }\r
+\r
+ private Resource next = seek();\r
+\r
+ @Override\r
+ public Iterator<Resource> iterator() {\r
+ return new NoRemoveIterator<Resource>() {\r
+ @Override\r
+ public boolean hasNext() {\r
+ return next != null;\r
+ }\r
+ @Override\r
+ public Resource next() {\r
+ Resource result = next;\r
+ next = seek();\r
+ return result;\r
+ }\r
+ };\r
+ }\r
+\r
+ };\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isEmpty() {\r
+ if (notEmpty)\r
+ return false;\r
+ return writeBufferIndex == 0 && !overflow;\r
+ }\r
+\r
+ private boolean notEmpty = false;\r
+ void setNotEmpty(boolean notEmpty) {\r
+ this.notEmpty = notEmpty;\r
+ }\r
+\r
+ void claim(Resource s, Resource p, Resource o) {\r
+ writeByte(CLAIM);\r
+ writeInt(session.getId((ResourceImpl) s));\r
+ writeInt(session.getId((ResourceImpl) p));\r
+ writeInt(session.getId((ResourceImpl) o));\r
+ if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());\r
+ }\r
+\r
+ private int sCache1 = -1;\r
+ private int sCache2 = -1;\r
+ private int sCache3 = -1;\r
+\r
+ final void invalidate(int s) {\r
+ \r
+ if(s == sCache1) {\r
+ return;\r
+ } else if (s == sCache2) {\r
+ sCache2 = sCache1;\r
+ sCache1 = s;\r
+ return;\r
+ } else if (s == sCache3) {\r
+ sCache3 = sCache2;\r
+ sCache2 = sCache1;\r
+ sCache1 = s;\r
+ return;\r
+ }\r
+\r
+ sCache3 = sCache2;\r
+ sCache2 = sCache1;\r
+ sCache1 = s;\r
+ \r
+ writeByte(INVALIDATE);\r
+ writeInt(s);\r
+\r
+ if(DEBUG) System.err.println("invalidate2 " + s);\r
+\r
+ }\r
+\r
+ void claim(int s, int p, int o) {\r
+ writeByte(CLAIM);\r
+ writeInt(s);\r
+ writeInt(p);\r
+ writeInt(o);\r
+ if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);\r
+ }\r
+\r
+ void deny(Resource s, Resource p, Resource o) {\r
+ writeByte(DENY);\r
+ writeInt(session.getId((ResourceImpl)s));\r
+ writeInt(session.getId((ResourceImpl)p));\r
+ writeInt(session.getId((ResourceImpl)o));\r
+ if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));\r
+ }\r
+\r
+ void deny(int s, int p, int o) {\r
+ writeByte(DENY);\r
+ writeInt(s);\r
+ writeInt(p);\r
+ writeInt(o);\r
+ if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);\r
+ }\r
+\r
+ void claimValue(Resource r) {\r
+ writeByte(VALUE);\r
+ writeInt(session.getId((ResourceImpl)r));\r
+ if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());\r
+ }\r
+\r
+ void claimValue(int r) {\r
+ writeByte(VALUE);\r
+ writeInt(r);\r
+ if(DEBUG) System.err.println("deny2 " + r);\r
+ }\r
+\r
+ // Streaming\r
+\r
+ private byte readByte() {\r
+ byte result = readBytes[readBufferIndex++]; \r
+ if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();\r
+ return result;\r
+ }\r
+\r
+ private int readInt() {\r
+ if(readBufferIndex < (BUFFER_SIZE-4)) {\r
+ int result = (int) \r
+ ((readBytes[readBufferIndex++] & 0xff) | \r
+ ((readBytes[readBufferIndex++] & 0xff) << 8) | \r
+ ((readBytes[readBufferIndex++] & 0xff) << 16) | \r
+ ((readBytes[readBufferIndex++] & 0xff) << 24)); \r
+ return result;\r
+ } else {\r
+ int has = BUFFER_SIZE-readBufferIndex;\r
+ int result = 0;\r
+ if(has == 0) fillReadBuffer();\r
+ result = (int)(readBytes[readBufferIndex++] & 0xff);\r
+ if(has == 1) fillReadBuffer();\r
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);\r
+ if(has == 2) fillReadBuffer();\r
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);\r
+ if(has == 3) fillReadBuffer();\r
+ result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);\r
+ if(has == 4) fillReadBuffer();\r
+ return result;\r
+ }\r
+ }\r
+\r
+ private void writeByte(int b) {\r
+ writeBytes[writeBufferIndex++] = (byte)b;\r
+ if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);\r
+ wroteSinceLastRead = true;\r
+ }\r
+\r
+ private void writeInt(int i) {\r
+ if(writeBufferIndex < (BUFFER_SIZE-4)) {\r
+ writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
+ } else {\r
+ int has = BUFFER_SIZE-writeBufferIndex;\r
+ if(has == 0) flushWriteBuffer(BUFFER_SIZE);\r
+ writeBytes[writeBufferIndex++] = (byte)(i&0xff);\r
+ if(has == 1) flushWriteBuffer(BUFFER_SIZE);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);\r
+ if(has == 2) flushWriteBuffer(BUFFER_SIZE);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);\r
+ if(has == 3) flushWriteBuffer(BUFFER_SIZE);\r
+ writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);\r
+ if(has == 4) flushWriteBuffer(BUFFER_SIZE);\r
+ }\r
+ wroteSinceLastRead = true;\r
+ }\r
+\r
+ private int tryFlushBuffer(int size) throws IOException {\r
+ if (DEBUG)\r
+ System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);\r
+\r
+ if (tempFile == null) {\r
+ File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;\r
+ base.mkdirs();\r
+ tempFile = File.createTempFile("cset-", ".cs", base);\r
+ file = new RandomAccessFile(tempFile, "rwd");\r
+ channel = file.getChannel();\r
+ if (DEBUG)\r
+ System.err.println("Allocated temporary file for changeset: " + tempFile);\r
+ }\r
+\r
+ int wrote = 0;\r
+ writeBuffer.position(0);\r
+ writeBuffer.limit(size);\r
+ for (; wrote < size;) {\r
+ wrote += channel.write(writeBuffer);\r
+ }\r
+ return wrote;\r
+ }\r
+\r
+ private void flushWriteBuffer(int size) {\r
+ if (DEBUG)\r
+ System.err.println("flushBuffer(" + size + ") overflow=" + overflow);\r
+ if (size == 0)\r
+ return;\r
+\r
+ if (!overflow) {\r
+ // Change set is about to become file-backed.\r
+ // Initialize separate buffer for writing.\r
+ writeBytes = Arrays.copyOf(readBytes, readBytes.length);\r
+ writeBuffer = ByteBuffer.wrap(writeBytes);\r
+ overflow = true;\r
+ }\r
+\r
+ writeBufferIndex = 0;\r
+ int wrote = 0;\r
+ try {\r
+ setFilePosition(writePosition);\r
+ wrote = tryFlushBuffer(size);\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ throw new RuntimeDatabaseException(e);\r
+ } finally {\r
+ try {\r
+ currentPosition = channel.position();\r
+ writePosition = currentPosition;\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ }\r
+ if (DEBUG)\r
+ System.err.println("flushWriteBuffer wrote " + wrote + " bytes");\r
+ }\r
+ }\r
+\r
+ private boolean resetRead() {\r
+ if (DEBUG)\r
+ System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);\r
+ try {\r
+ // Flush temporary file first if necessary.\r
+ if (wroteSinceLastRead) {\r
+ if (DEBUG)\r
+ System.err.println("\tTerminating write of changeset " + tempFile);\r
+ writeByte(TERM);\r
+ wroteSinceLastRead = false;\r
+ }\r
+ if (overflow) {\r
+ if (DEBUG)\r
+ System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());\r
+ flushWriteBuffer(writeBufferIndex);\r
+ readPosition = 0L;\r
+ setFilePosition(readPosition);\r
+ fillReadBuffer();\r
+ } else {\r
+ readBufferIndex = 0;\r
+ }\r
+ return true;\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ return false;\r
+ }\r
+ }\r
+\r
+ private void fillReadBuffer() {\r
+ readBufferIndex = 0;\r
+ int read = 0;\r
+ try {\r
+ setFilePosition(readPosition);\r
+ readBuffer.position(0);\r
+ readBuffer.limit(BUFFER_SIZE);\r
+ for (;read < BUFFER_SIZE;) {\r
+ int got = channel.read(readBuffer);\r
+ if (got == -1)\r
+ return;\r
+ read += got;\r
+ }\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ } finally {\r
+ try {\r
+ currentPosition = channel.position();\r
+ readPosition = currentPosition;\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ }\r
+ if (DEBUG)\r
+ System.err.println("fillReadBuffer read " + read + " bytes");\r
+ }\r
+ }\r
+\r
+ /**\r
+ * @param newPosition\r
+ * @return <code>true</code> if file position was modified\r
+ * @throws IOException\r
+ */\r
+ private boolean setFilePosition(long newPosition) throws IOException {\r
+ if (channel == null) {\r
+ return false;\r
+ }\r
+ if (newPosition != currentPosition) {\r
+ if (DEBUG)\r
+ System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);\r
+ channel.position(newPosition);\r
+ currentPosition = newPosition;\r
+ return true;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ @Override\r
+ protected void finalize() throws Throwable {\r
+ close();\r
+ super.finalize();\r
+ }\r
+\r
+ public void close() {\r
+ session = null;\r
+ channel = null;\r
+ if (file != null) {\r
+ if (DEBUG)\r
+ System.err.println("Closing temporary changeset input stream " + file);\r
+ try {\r
+ file.close();\r
+ } catch (IOException e) {\r
+ } finally {\r
+ file = null;\r
+ }\r
+ }\r
+ if (tempFile != null) {\r
+ if (DEBUG)\r
+ System.err.println("Deleting temporary changeset file " + tempFile);\r
+ tempFile.delete();\r
+ tempFile = null;\r
+ }\r
+ }\r
+\r
+ public static void main(String[] args) {\r
+ ClientChangesImpl ccs = new ClientChangesImpl(null);\r
+ System.out.println("isEmpty=" + ccs.isEmpty());\r
+ System.out.println("clientChanges=" + ccs.toString());\r
+ System.out.println("isEmpty=" + ccs.isEmpty());\r
+ }\r
+\r
+}\r