/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import org.simantics.db.ChangeSet; import org.simantics.db.Resource; import org.simantics.db.common.internal.config.InternalClientConfig; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.ResourceImpl; /** * @author Antti Villberg * @author Tuukka Lehtonen */ public final class ClientChangesImpl implements ChangeSet { private static final boolean DEBUG = false; private static final int BUFFER_SIZE = 1 << 17; private static final byte CLAIM = 1; private static final byte DENY = 2; private static final byte VALUE = 3; private static final byte TERM = 4; private static final byte INVALIDATE = 5; private SessionImplSocket session; /** * True if enough data has been written to overflow the normal buffer which * will result in data being written to disk. */ private boolean overflow = false; private final byte[] readBytes = new byte[BUFFER_SIZE]; // By default the write buffer will be the exact same as the read buffer // since initially were not forced to write changes to the disk. private byte[] writeBytes = readBytes; private int readBufferIndex = 0; private int writeBufferIndex = 0; /** * true if something has been written since last call to * {@link #resetRead()}. By default it is true since * {@link #resetRead()} must on its first invocation write the {@link #TERM} * constant into the stream. */ private boolean wroteSinceLastRead = true; private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes); private File tempFile; private RandomAccessFile file; private FileChannel channel; private long currentPosition = 0L; private long readPosition = 0L; private long writePosition = 0L; static abstract class NoRemoveIterator implements Iterator { @Override public void remove() { throw new UnsupportedOperationException(); } } /** * Base class for all immutable collections returned by * {@link ClientChangesImpl}. * * @param */ static class CollectionBase implements Collection { @Override public int size() { throw new UnsupportedOperationException(); } @Override public boolean isEmpty() { throw new UnsupportedOperationException(); } @Override public boolean contains(Object o) { throw new UnsupportedOperationException(); } @Override public Iterator iterator() { throw new UnsupportedOperationException(); } @Override public Object[] toArray() { throw new UnsupportedOperationException(); } @Override public TT[] toArray(TT[] a) { throw new UnsupportedOperationException(); } @Override public boolean add(T e) { throw new UnsupportedOperationException(); } @Override public boolean remove(Object o) { throw new UnsupportedOperationException(); } @Override public boolean containsAll(Collection c) { throw new UnsupportedOperationException(); } @Override public boolean addAll(Collection c) { throw new UnsupportedOperationException(); } @Override public boolean removeAll(Collection c) { throw new UnsupportedOperationException(); } @Override public boolean retainAll(Collection c) { throw new UnsupportedOperationException(); } @Override public void clear() { throw new UnsupportedOperationException(); } }; ClientChangesImpl(SessionImplSocket session) { this.session = session; } @Override public String toString() { final int LIMIT = 10000; String to = ""; for (StatementChange s : changedStatements()) { String op = s.isClaim() ? "add" : "rem"; to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject(); if (to.length() > LIMIT) { to += "... more.\n"; return to; } } for (Resource r : changedValues()) { to += "\nmod " + r; if (to.length() > LIMIT) { to += "...etc.\n"; return to; } } return to; } private Resource getResource(int id) { return session.getResource(id); } @Override public synchronized Collection changedStatements() { if (!resetRead()) return Collections.emptyList(); return new CollectionBase() { StatementChange seek() { try { while(true) { byte method = readByte(); switch(method) { case TERM: { return null; } case CLAIM: { Resource s = getResource(readInt()); Resource p = getResource(readInt()); Resource o = getResource(readInt()); return new StatementChangeImpl(s, p, o, true); } case DENY: { Resource s = getResource(readInt()); Resource p = getResource(readInt()); Resource o = getResource(readInt()); return new StatementChangeImpl(s, p, o, false); } case VALUE: case INVALIDATE: { // Skipping value opererations. getResource(readInt()); break; } } } } catch(Exception e) { throw new RuntimeDatabaseException(e); } } private StatementChange next = seek(); @Override public Iterator iterator() { return new NoRemoveIterator() { @Override public boolean hasNext() { return next != null; } @Override public StatementChange next() { StatementChange result = next; next = seek(); return result; } }; } }; } @Override public Collection changedValues() { if (!resetRead()) return Collections.emptyList(); return new CollectionBase() { Resource seek() { try { while(true) { byte method = readByte(); switch(method) { case TERM: { return null; } case CLAIM: { getResource(readInt()); getResource(readInt()); getResource(readInt()); break; } case DENY: { getResource(readInt()); getResource(readInt()); getResource(readInt()); break; } case VALUE: case INVALIDATE: { return getResource(readInt()); } } } } catch(Exception e) { return null; } } private Resource next = seek(); @Override public Iterator iterator() { return new NoRemoveIterator() { @Override public boolean hasNext() { return next != null; } @Override public Resource next() { Resource result = next; next = seek(); return result; } }; } }; } @Override public Collection changedResources() { if (!resetRead()) return Collections.emptyList(); return new CollectionBase() { Resource seek() { try { while(true) { byte method = readByte(); switch(method) { case TERM: { return null; } case CLAIM: { getResource(readInt()); getResource(readInt()); getResource(readInt()); break; } case DENY: { getResource(readInt()); getResource(readInt()); getResource(readInt()); break; } case VALUE: { getResource(readInt()); break; } case INVALIDATE: { return getResource(readInt()); } } } } catch(Exception e) { return null; } } private Resource next = seek(); @Override public Iterator iterator() { return new NoRemoveIterator() { @Override public boolean hasNext() { return next != null; } @Override public Resource next() { Resource result = next; next = seek(); return result; } }; } }; } @Override public boolean isEmpty() { if (notEmpty) return false; return writeBufferIndex == 0 && !overflow; } private boolean notEmpty = false; void setNotEmpty(boolean notEmpty) { this.notEmpty = notEmpty; } void claim(Resource s, Resource p, Resource o) { writeByte(CLAIM); writeInt(session.getId((ResourceImpl) s)); writeInt(session.getId((ResourceImpl) p)); writeInt(session.getId((ResourceImpl) o)); if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId()); } private int sCache1 = -1; private int sCache2 = -1; private int sCache3 = -1; final void invalidate(int s) { if(s == sCache1) { return; } else if (s == sCache2) { sCache2 = sCache1; sCache1 = s; return; } else if (s == sCache3) { sCache3 = sCache2; sCache2 = sCache1; sCache1 = s; return; } sCache3 = sCache2; sCache2 = sCache1; sCache1 = s; writeByte(INVALIDATE); writeInt(s); if(DEBUG) System.err.println("invalidate2 " + s); } void claim(int s, int p, int o) { writeByte(CLAIM); writeInt(s); writeInt(p); writeInt(o); if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o); } void deny(Resource s, Resource p, Resource o) { writeByte(DENY); writeInt(session.getId((ResourceImpl)s)); writeInt(session.getId((ResourceImpl)p)); writeInt(session.getId((ResourceImpl)o)); if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o)); } void deny(int s, int p, int o) { writeByte(DENY); writeInt(s); writeInt(p); writeInt(o); if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o); } void claimValue(Resource r) { writeByte(VALUE); writeInt(session.getId((ResourceImpl)r)); if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId()); } void claimValue(int r) { writeByte(VALUE); writeInt(r); if(DEBUG) System.err.println("deny2 " + r); } // Streaming private byte readByte() { byte result = readBytes[readBufferIndex++]; if(readBufferIndex == BUFFER_SIZE) fillReadBuffer(); return result; } private int readInt() { if(readBufferIndex < (BUFFER_SIZE-4)) { int result = (int) ((readBytes[readBufferIndex++] & 0xff) | ((readBytes[readBufferIndex++] & 0xff) << 8) | ((readBytes[readBufferIndex++] & 0xff) << 16) | ((readBytes[readBufferIndex++] & 0xff) << 24)); return result; } else { int has = BUFFER_SIZE-readBufferIndex; int result = 0; if(has == 0) fillReadBuffer(); result = (int)(readBytes[readBufferIndex++] & 0xff); if(has == 1) fillReadBuffer(); result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8); if(has == 2) fillReadBuffer(); result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16); if(has == 3) fillReadBuffer(); result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24); if(has == 4) fillReadBuffer(); return result; } } private void writeByte(int b) { writeBytes[writeBufferIndex++] = (byte)b; if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE); wroteSinceLastRead = true; } private void writeInt(int i) { if(writeBufferIndex < (BUFFER_SIZE-4)) { writeBytes[writeBufferIndex++] = (byte)(i&0xff); writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); } else { int has = BUFFER_SIZE-writeBufferIndex; if(has == 0) flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)(i&0xff); if(has == 1) flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); if(has == 2) flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); if(has == 3) flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); if(has == 4) flushWriteBuffer(BUFFER_SIZE); } wroteSinceLastRead = true; } private int tryFlushBuffer(int size) throws IOException { if (DEBUG) System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile); if (tempFile == null) { File base = InternalClientConfig.DB_CLIENT_TEMP_DIR; base.mkdirs(); tempFile = File.createTempFile("cset-", ".cs", base); file = new RandomAccessFile(tempFile, "rwd"); channel = file.getChannel(); if (DEBUG) System.err.println("Allocated temporary file for changeset: " + tempFile); } int wrote = 0; writeBuffer.position(0); writeBuffer.limit(size); for (; wrote < size;) { wrote += channel.write(writeBuffer); } return wrote; } private void flushWriteBuffer(int size) { if (DEBUG) System.err.println("flushBuffer(" + size + ") overflow=" + overflow); if (size == 0) return; if (!overflow) { // Change set is about to become file-backed. // Initialize separate buffer for writing. writeBytes = Arrays.copyOf(readBytes, readBytes.length); writeBuffer = ByteBuffer.wrap(writeBytes); overflow = true; } writeBufferIndex = 0; int wrote = 0; try { setFilePosition(writePosition); wrote = tryFlushBuffer(size); } catch (IOException e) { e.printStackTrace(); throw new RuntimeDatabaseException(e); } finally { try { currentPosition = channel.position(); writePosition = currentPosition; } catch (IOException e) { e.printStackTrace(); } if (DEBUG) System.err.println("flushWriteBuffer wrote " + wrote + " bytes"); } } private boolean resetRead() { if (DEBUG) System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile); try { // Flush temporary file first if necessary. if (wroteSinceLastRead) { if (DEBUG) System.err.println("\tTerminating write of changeset " + tempFile); writeByte(TERM); wroteSinceLastRead = false; } if (overflow) { if (DEBUG) System.err.println("\tReading changeset from " + tempFile.getAbsolutePath()); flushWriteBuffer(writeBufferIndex); readPosition = 0L; setFilePosition(readPosition); fillReadBuffer(); } else { readBufferIndex = 0; } return true; } catch (IOException e) { e.printStackTrace(); return false; } } private void fillReadBuffer() { readBufferIndex = 0; int read = 0; try { setFilePosition(readPosition); readBuffer.position(0); readBuffer.limit(BUFFER_SIZE); for (;read < BUFFER_SIZE;) { int got = channel.read(readBuffer); if (got == -1) return; read += got; } } catch (IOException e) { e.printStackTrace(); } finally { try { currentPosition = channel.position(); readPosition = currentPosition; } catch (IOException e) { e.printStackTrace(); } if (DEBUG) System.err.println("fillReadBuffer read " + read + " bytes"); } } /** * @param newPosition * @return true if file position was modified * @throws IOException */ private boolean setFilePosition(long newPosition) throws IOException { if (channel == null) { return false; } if (newPosition != currentPosition) { if (DEBUG) System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition); channel.position(newPosition); currentPosition = newPosition; return true; } return false; } @Override protected void finalize() throws Throwable { close(); super.finalize(); } public void close() { session = null; channel = null; if (file != null) { if (DEBUG) System.err.println("Closing temporary changeset input stream " + file); try { file.close(); } catch (IOException e) { } finally { file = null; } } if (tempFile != null) { if (DEBUG) System.err.println("Deleting temporary changeset file " + tempFile); tempFile.delete(); tempFile = null; } } public static void main(String[] args) { ClientChangesImpl ccs = new ClientChangesImpl(null); System.out.println("isEmpty=" + ccs.isEmpty()); System.out.println("clientChanges=" + ccs.toString()); System.out.println("isEmpty=" + ccs.isEmpty()); } }