X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FClientChangesImpl.java;h=000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0;hp=9c51e267d3af422fe0e8876fb3274c47221fa29e;hb=782dfcb9fc5ce3db46598f660abf642b9c8849b7;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java index 9c51e267d..000880c3d 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java @@ -1,692 +1,692 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package fi.vtt.simantics.procore.internal; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; - -import org.simantics.db.ChangeSet; -import org.simantics.db.Resource; -import org.simantics.db.common.internal.config.InternalClientConfig; -import org.simantics.db.exception.RuntimeDatabaseException; -import org.simantics.db.impl.ResourceImpl; - -/** - * @author Antti Villberg - * @author Tuukka Lehtonen - */ -public final class ClientChangesImpl implements ChangeSet { - - private static final boolean DEBUG = false; - - private static final int BUFFER_SIZE = 1 << 17; - - private static final byte CLAIM = 1; - private static final byte DENY = 2; - private static final byte VALUE = 3; - private static final byte TERM = 4; - private static final byte INVALIDATE = 5; - - private SessionImplSocket session; - - /** - * True if enough data has been written to overflow the normal buffer which - * will result in data being written to disk. - */ - private boolean overflow = false; - - private final byte[] readBytes = new byte[BUFFER_SIZE]; - // By default the write buffer will be the exact same as the read buffer - // since initially were not forced to write changes to the disk. - private byte[] writeBytes = readBytes; - - private int readBufferIndex = 0; - private int writeBufferIndex = 0; - - /** - * true if something has been written since last call to - * {@link #resetRead()}. By default it is true since - * {@link #resetRead()} must on its first invocation write the {@link #TERM} - * constant into the stream. - */ - private boolean wroteSinceLastRead = true; - - private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); - private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes); - - private File tempFile; - private RandomAccessFile file; - private FileChannel channel; - private long currentPosition = 0L; - private long readPosition = 0L; - private long writePosition = 0L; - - static abstract class NoRemoveIterator implements Iterator { - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - /** - * Base class for all immutable collections returned by - * {@link ClientChangesImpl}. - * - * @param - */ - static class CollectionBase implements Collection { - - @Override - public int size() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public TT[] toArray(TT[] a) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean add(T e) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - throw new UnsupportedOperationException(); - } - - }; - - ClientChangesImpl(SessionImplSocket session) { - this.session = session; - } - - @Override - public String toString() { - final int LIMIT = 10000; - String to = ""; - for (StatementChange s : changedStatements()) { - String op = s.isClaim() ? "add" : "rem"; - to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject(); - if (to.length() > LIMIT) { - to += "... more.\n"; - return to; - } - } - for (Resource r : changedValues()) { - to += "\nmod " + r; - if (to.length() > LIMIT) { - to += "...etc.\n"; - return to; - } - } - return to; - } - - private Resource getResource(int id) { - return session.getResource(id); - } - - @Override - public synchronized Collection changedStatements() { - - if (!resetRead()) - return Collections.emptyList(); - - return new CollectionBase() { - - StatementChange seek() { - try { - while(true) { - byte method = readByte(); - switch(method) { - case TERM: { - return null; - } - case CLAIM: { - Resource s = getResource(readInt()); - Resource p = getResource(readInt()); - Resource o = getResource(readInt()); - return new StatementChangeImpl(s, p, o, true); - } - case DENY: { - Resource s = getResource(readInt()); - Resource p = getResource(readInt()); - Resource o = getResource(readInt()); - return new StatementChangeImpl(s, p, o, false); - } - case VALUE: - case INVALIDATE: { - // Skipping value opererations. - getResource(readInt()); - break; - } - } - } - } catch(Exception e) { - throw new RuntimeDatabaseException(e); - } - } - - private StatementChange next = seek(); - - @Override - public Iterator iterator() { - return new NoRemoveIterator() { - @Override - public boolean hasNext() { - return next != null; - } - @Override - public StatementChange next() { - StatementChange result = next; - next = seek(); - return result; - } - }; - } - - }; - - } - - @Override - public Collection changedValues() { - - if (!resetRead()) - return Collections.emptyList(); - - return new CollectionBase() { - - Resource seek() { - try { - while(true) { - byte method = readByte(); - switch(method) { - case TERM: { - return null; - } - case CLAIM: { - getResource(readInt()); - getResource(readInt()); - getResource(readInt()); - break; - } - case DENY: { - getResource(readInt()); - getResource(readInt()); - getResource(readInt()); - break; - } - case VALUE: - case INVALIDATE: { - return getResource(readInt()); - } - } - } - } catch(Exception e) { - return null; - } - } - - private Resource next = seek(); - - @Override - public Iterator iterator() { - return new NoRemoveIterator() { - @Override - public boolean hasNext() { - return next != null; - } - @Override - public Resource next() { - Resource result = next; - next = seek(); - return result; - } - }; - } - - }; - - } - - @Override - public Collection changedResources() { - - if (!resetRead()) - return Collections.emptyList(); - - return new CollectionBase() { - - Resource seek() { - - try { - while(true) { - byte method = readByte(); - switch(method) { - case TERM: { - return null; - } - case CLAIM: { - getResource(readInt()); - getResource(readInt()); - getResource(readInt()); - break; - } - case DENY: { - getResource(readInt()); - getResource(readInt()); - getResource(readInt()); - break; - } - case VALUE: { - getResource(readInt()); - break; - } - case INVALIDATE: { - return getResource(readInt()); - } - } - } - } catch(Exception e) { - return null; - } - - } - - private Resource next = seek(); - - @Override - public Iterator iterator() { - return new NoRemoveIterator() { - @Override - public boolean hasNext() { - return next != null; - } - @Override - public Resource next() { - Resource result = next; - next = seek(); - return result; - } - }; - } - - }; - - } - - @Override - public boolean isEmpty() { - if (notEmpty) - return false; - return writeBufferIndex == 0 && !overflow; - } - - private boolean notEmpty = false; - void setNotEmpty(boolean notEmpty) { - this.notEmpty = notEmpty; - } - - void claim(Resource s, Resource p, Resource o) { - writeByte(CLAIM); - writeInt(session.getId((ResourceImpl) s)); - writeInt(session.getId((ResourceImpl) p)); - writeInt(session.getId((ResourceImpl) o)); - if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId()); - } - - private int sCache1 = -1; - private int sCache2 = -1; - private int sCache3 = -1; - - final void invalidate(int s) { - - if(s == sCache1) { - return; - } else if (s == sCache2) { - sCache2 = sCache1; - sCache1 = s; - return; - } else if (s == sCache3) { - sCache3 = sCache2; - sCache2 = sCache1; - sCache1 = s; - return; - } - - sCache3 = sCache2; - sCache2 = sCache1; - sCache1 = s; - - writeByte(INVALIDATE); - writeInt(s); - - if(DEBUG) System.err.println("invalidate2 " + s); - - } - - void claim(int s, int p, int o) { - writeByte(CLAIM); - writeInt(s); - writeInt(p); - writeInt(o); - if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o); - } - - void deny(Resource s, Resource p, Resource o) { - writeByte(DENY); - writeInt(session.getId((ResourceImpl)s)); - writeInt(session.getId((ResourceImpl)p)); - writeInt(session.getId((ResourceImpl)o)); - if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o)); - } - - void deny(int s, int p, int o) { - writeByte(DENY); - writeInt(s); - writeInt(p); - writeInt(o); - if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o); - } - - void claimValue(Resource r) { - writeByte(VALUE); - writeInt(session.getId((ResourceImpl)r)); - if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId()); - } - - void claimValue(int r) { - writeByte(VALUE); - writeInt(r); - if(DEBUG) System.err.println("deny2 " + r); - } - - // Streaming - - private byte readByte() { - byte result = readBytes[readBufferIndex++]; - if(readBufferIndex == BUFFER_SIZE) fillReadBuffer(); - return result; - } - - private int readInt() { - if(readBufferIndex < (BUFFER_SIZE-4)) { - int result = (int) - ((readBytes[readBufferIndex++] & 0xff) | - ((readBytes[readBufferIndex++] & 0xff) << 8) | - ((readBytes[readBufferIndex++] & 0xff) << 16) | - ((readBytes[readBufferIndex++] & 0xff) << 24)); - return result; - } else { - int has = BUFFER_SIZE-readBufferIndex; - int result = 0; - if(has == 0) fillReadBuffer(); - result = (int)(readBytes[readBufferIndex++] & 0xff); - if(has == 1) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8); - if(has == 2) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16); - if(has == 3) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24); - if(has == 4) fillReadBuffer(); - return result; - } - } - - private void writeByte(int b) { - writeBytes[writeBufferIndex++] = (byte)b; - if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE); - wroteSinceLastRead = true; - } - - private void writeInt(int i) { - if(writeBufferIndex < (BUFFER_SIZE-4)) { - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - } else { - int has = BUFFER_SIZE-writeBufferIndex; - if(has == 0) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - if(has == 1) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) flushWriteBuffer(BUFFER_SIZE); - } - wroteSinceLastRead = true; - } - - private int tryFlushBuffer(int size) throws IOException { - if (DEBUG) - System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile); - - if (tempFile == null) { - File base = InternalClientConfig.DB_CLIENT_TEMP_DIR; - base.mkdirs(); - tempFile = File.createTempFile("cset-", ".cs", base); - file = new RandomAccessFile(tempFile, "rwd"); - channel = file.getChannel(); - if (DEBUG) - System.err.println("Allocated temporary file for changeset: " + tempFile); - } - - int wrote = 0; - writeBuffer.position(0); - writeBuffer.limit(size); - for (; wrote < size;) { - wrote += channel.write(writeBuffer); - } - return wrote; - } - - private void flushWriteBuffer(int size) { - if (DEBUG) - System.err.println("flushBuffer(" + size + ") overflow=" + overflow); - if (size == 0) - return; - - if (!overflow) { - // Change set is about to become file-backed. - // Initialize separate buffer for writing. - writeBytes = Arrays.copyOf(readBytes, readBytes.length); - writeBuffer = ByteBuffer.wrap(writeBytes); - overflow = true; - } - - writeBufferIndex = 0; - int wrote = 0; - try { - setFilePosition(writePosition); - wrote = tryFlushBuffer(size); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeDatabaseException(e); - } finally { - try { - currentPosition = channel.position(); - writePosition = currentPosition; - } catch (IOException e) { - e.printStackTrace(); - } - if (DEBUG) - System.err.println("flushWriteBuffer wrote " + wrote + " bytes"); - } - } - - private boolean resetRead() { - if (DEBUG) - System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile); - try { - // Flush temporary file first if necessary. - if (wroteSinceLastRead) { - if (DEBUG) - System.err.println("\tTerminating write of changeset " + tempFile); - writeByte(TERM); - wroteSinceLastRead = false; - } - if (overflow) { - if (DEBUG) - System.err.println("\tReading changeset from " + tempFile.getAbsolutePath()); - flushWriteBuffer(writeBufferIndex); - readPosition = 0L; - setFilePosition(readPosition); - fillReadBuffer(); - } else { - readBufferIndex = 0; - } - return true; - } catch (IOException e) { - e.printStackTrace(); - return false; - } - } - - private void fillReadBuffer() { - readBufferIndex = 0; - int read = 0; - try { - setFilePosition(readPosition); - readBuffer.position(0); - readBuffer.limit(BUFFER_SIZE); - for (;read < BUFFER_SIZE;) { - int got = channel.read(readBuffer); - if (got == -1) - return; - read += got; - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - currentPosition = channel.position(); - readPosition = currentPosition; - } catch (IOException e) { - e.printStackTrace(); - } - if (DEBUG) - System.err.println("fillReadBuffer read " + read + " bytes"); - } - } - - /** - * @param newPosition - * @return true if file position was modified - * @throws IOException - */ - private boolean setFilePosition(long newPosition) throws IOException { - if (channel == null) { - return false; - } - if (newPosition != currentPosition) { - if (DEBUG) - System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition); - channel.position(newPosition); - currentPosition = newPosition; - return true; - } - return false; - } - - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - public void close() { - session = null; - channel = null; - if (file != null) { - if (DEBUG) - System.err.println("Closing temporary changeset input stream " + file); - try { - file.close(); - } catch (IOException e) { - } finally { - file = null; - } - } - if (tempFile != null) { - if (DEBUG) - System.err.println("Deleting temporary changeset file " + tempFile); - tempFile.delete(); - tempFile = null; - } - } - - public static void main(String[] args) { - ClientChangesImpl ccs = new ClientChangesImpl(null); - System.out.println("isEmpty=" + ccs.isEmpty()); - System.out.println("clientChanges=" + ccs.toString()); - System.out.println("isEmpty=" + ccs.isEmpty()); - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +import org.simantics.db.ChangeSet; +import org.simantics.db.Resource; +import org.simantics.db.common.internal.config.InternalClientConfig; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.ResourceImpl; + +/** + * @author Antti Villberg + * @author Tuukka Lehtonen + */ +public final class ClientChangesImpl implements ChangeSet { + + private static final boolean DEBUG = false; + + private static final int BUFFER_SIZE = 1 << 17; + + private static final byte CLAIM = 1; + private static final byte DENY = 2; + private static final byte VALUE = 3; + private static final byte TERM = 4; + private static final byte INVALIDATE = 5; + + private SessionImplSocket session; + + /** + * True if enough data has been written to overflow the normal buffer which + * will result in data being written to disk. + */ + private boolean overflow = false; + + private final byte[] readBytes = new byte[BUFFER_SIZE]; + // By default the write buffer will be the exact same as the read buffer + // since initially were not forced to write changes to the disk. + private byte[] writeBytes = readBytes; + + private int readBufferIndex = 0; + private int writeBufferIndex = 0; + + /** + * true if something has been written since last call to + * {@link #resetRead()}. By default it is true since + * {@link #resetRead()} must on its first invocation write the {@link #TERM} + * constant into the stream. + */ + private boolean wroteSinceLastRead = true; + + private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); + private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes); + + private File tempFile; + private RandomAccessFile file; + private FileChannel channel; + private long currentPosition = 0L; + private long readPosition = 0L; + private long writePosition = 0L; + + static abstract class NoRemoveIterator implements Iterator { + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + /** + * Base class for all immutable collections returned by + * {@link ClientChangesImpl}. + * + * @param + */ + static class CollectionBase implements Collection { + + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public TT[] toArray(TT[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + }; + + ClientChangesImpl(SessionImplSocket session) { + this.session = session; + } + + @Override + public String toString() { + final int LIMIT = 10000; + String to = ""; + for (StatementChange s : changedStatements()) { + String op = s.isClaim() ? "add" : "rem"; + to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject(); + if (to.length() > LIMIT) { + to += "... more.\n"; + return to; + } + } + for (Resource r : changedValues()) { + to += "\nmod " + r; + if (to.length() > LIMIT) { + to += "...etc.\n"; + return to; + } + } + return to; + } + + private Resource getResource(int id) { + return session.getResource(id); + } + + @Override + public synchronized Collection changedStatements() { + + if (!resetRead()) + return Collections.emptyList(); + + return new CollectionBase() { + + StatementChange seek() { + try { + while(true) { + byte method = readByte(); + switch(method) { + case TERM: { + return null; + } + case CLAIM: { + Resource s = getResource(readInt()); + Resource p = getResource(readInt()); + Resource o = getResource(readInt()); + return new StatementChangeImpl(s, p, o, true); + } + case DENY: { + Resource s = getResource(readInt()); + Resource p = getResource(readInt()); + Resource o = getResource(readInt()); + return new StatementChangeImpl(s, p, o, false); + } + case VALUE: + case INVALIDATE: { + // Skipping value opererations. + getResource(readInt()); + break; + } + } + } + } catch(Exception e) { + throw new RuntimeDatabaseException(e); + } + } + + private StatementChange next = seek(); + + @Override + public Iterator iterator() { + return new NoRemoveIterator() { + @Override + public boolean hasNext() { + return next != null; + } + @Override + public StatementChange next() { + StatementChange result = next; + next = seek(); + return result; + } + }; + } + + }; + + } + + @Override + public Collection changedValues() { + + if (!resetRead()) + return Collections.emptyList(); + + return new CollectionBase() { + + Resource seek() { + try { + while(true) { + byte method = readByte(); + switch(method) { + case TERM: { + return null; + } + case CLAIM: { + getResource(readInt()); + getResource(readInt()); + getResource(readInt()); + break; + } + case DENY: { + getResource(readInt()); + getResource(readInt()); + getResource(readInt()); + break; + } + case VALUE: + case INVALIDATE: { + return getResource(readInt()); + } + } + } + } catch(Exception e) { + return null; + } + } + + private Resource next = seek(); + + @Override + public Iterator iterator() { + return new NoRemoveIterator() { + @Override + public boolean hasNext() { + return next != null; + } + @Override + public Resource next() { + Resource result = next; + next = seek(); + return result; + } + }; + } + + }; + + } + + @Override + public Collection changedResources() { + + if (!resetRead()) + return Collections.emptyList(); + + return new CollectionBase() { + + Resource seek() { + + try { + while(true) { + byte method = readByte(); + switch(method) { + case TERM: { + return null; + } + case CLAIM: { + getResource(readInt()); + getResource(readInt()); + getResource(readInt()); + break; + } + case DENY: { + getResource(readInt()); + getResource(readInt()); + getResource(readInt()); + break; + } + case VALUE: { + getResource(readInt()); + break; + } + case INVALIDATE: { + return getResource(readInt()); + } + } + } + } catch(Exception e) { + return null; + } + + } + + private Resource next = seek(); + + @Override + public Iterator iterator() { + return new NoRemoveIterator() { + @Override + public boolean hasNext() { + return next != null; + } + @Override + public Resource next() { + Resource result = next; + next = seek(); + return result; + } + }; + } + + }; + + } + + @Override + public boolean isEmpty() { + if (notEmpty) + return false; + return writeBufferIndex == 0 && !overflow; + } + + private boolean notEmpty = false; + void setNotEmpty(boolean notEmpty) { + this.notEmpty = notEmpty; + } + + void claim(Resource s, Resource p, Resource o) { + writeByte(CLAIM); + writeInt(session.getId((ResourceImpl) s)); + writeInt(session.getId((ResourceImpl) p)); + writeInt(session.getId((ResourceImpl) o)); + if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId()); + } + + private int sCache1 = -1; + private int sCache2 = -1; + private int sCache3 = -1; + + final void invalidate(int s) { + + if(s == sCache1) { + return; + } else if (s == sCache2) { + sCache2 = sCache1; + sCache1 = s; + return; + } else if (s == sCache3) { + sCache3 = sCache2; + sCache2 = sCache1; + sCache1 = s; + return; + } + + sCache3 = sCache2; + sCache2 = sCache1; + sCache1 = s; + + writeByte(INVALIDATE); + writeInt(s); + + if(DEBUG) System.err.println("invalidate2 " + s); + + } + + void claim(int s, int p, int o) { + writeByte(CLAIM); + writeInt(s); + writeInt(p); + writeInt(o); + if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o); + } + + void deny(Resource s, Resource p, Resource o) { + writeByte(DENY); + writeInt(session.getId((ResourceImpl)s)); + writeInt(session.getId((ResourceImpl)p)); + writeInt(session.getId((ResourceImpl)o)); + if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o)); + } + + void deny(int s, int p, int o) { + writeByte(DENY); + writeInt(s); + writeInt(p); + writeInt(o); + if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o); + } + + void claimValue(Resource r) { + writeByte(VALUE); + writeInt(session.getId((ResourceImpl)r)); + if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId()); + } + + void claimValue(int r) { + writeByte(VALUE); + writeInt(r); + if(DEBUG) System.err.println("deny2 " + r); + } + + // Streaming + + private byte readByte() { + byte result = readBytes[readBufferIndex++]; + if(readBufferIndex == BUFFER_SIZE) fillReadBuffer(); + return result; + } + + private int readInt() { + if(readBufferIndex < (BUFFER_SIZE-4)) { + int result = (int) + ((readBytes[readBufferIndex++] & 0xff) | + ((readBytes[readBufferIndex++] & 0xff) << 8) | + ((readBytes[readBufferIndex++] & 0xff) << 16) | + ((readBytes[readBufferIndex++] & 0xff) << 24)); + return result; + } else { + int has = BUFFER_SIZE-readBufferIndex; + int result = 0; + if(has == 0) fillReadBuffer(); + result = (int)(readBytes[readBufferIndex++] & 0xff); + if(has == 1) fillReadBuffer(); + result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8); + if(has == 2) fillReadBuffer(); + result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16); + if(has == 3) fillReadBuffer(); + result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24); + if(has == 4) fillReadBuffer(); + return result; + } + } + + private void writeByte(int b) { + writeBytes[writeBufferIndex++] = (byte)b; + if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE); + wroteSinceLastRead = true; + } + + private void writeInt(int i) { + if(writeBufferIndex < (BUFFER_SIZE-4)) { + writeBytes[writeBufferIndex++] = (byte)(i&0xff); + writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); + writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); + writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); + } else { + int has = BUFFER_SIZE-writeBufferIndex; + if(has == 0) flushWriteBuffer(BUFFER_SIZE); + writeBytes[writeBufferIndex++] = (byte)(i&0xff); + if(has == 1) flushWriteBuffer(BUFFER_SIZE); + writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); + if(has == 2) flushWriteBuffer(BUFFER_SIZE); + writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); + if(has == 3) flushWriteBuffer(BUFFER_SIZE); + writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); + if(has == 4) flushWriteBuffer(BUFFER_SIZE); + } + wroteSinceLastRead = true; + } + + private int tryFlushBuffer(int size) throws IOException { + if (DEBUG) + System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile); + + if (tempFile == null) { + File base = InternalClientConfig.DB_CLIENT_TEMP_DIR; + base.mkdirs(); + tempFile = File.createTempFile("cset-", ".cs", base); + file = new RandomAccessFile(tempFile, "rwd"); + channel = file.getChannel(); + if (DEBUG) + System.err.println("Allocated temporary file for changeset: " + tempFile); + } + + int wrote = 0; + writeBuffer.position(0); + writeBuffer.limit(size); + for (; wrote < size;) { + wrote += channel.write(writeBuffer); + } + return wrote; + } + + private void flushWriteBuffer(int size) { + if (DEBUG) + System.err.println("flushBuffer(" + size + ") overflow=" + overflow); + if (size == 0) + return; + + if (!overflow) { + // Change set is about to become file-backed. + // Initialize separate buffer for writing. + writeBytes = Arrays.copyOf(readBytes, readBytes.length); + writeBuffer = ByteBuffer.wrap(writeBytes); + overflow = true; + } + + writeBufferIndex = 0; + int wrote = 0; + try { + setFilePosition(writePosition); + wrote = tryFlushBuffer(size); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeDatabaseException(e); + } finally { + try { + currentPosition = channel.position(); + writePosition = currentPosition; + } catch (IOException e) { + e.printStackTrace(); + } + if (DEBUG) + System.err.println("flushWriteBuffer wrote " + wrote + " bytes"); + } + } + + private boolean resetRead() { + if (DEBUG) + System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile); + try { + // Flush temporary file first if necessary. + if (wroteSinceLastRead) { + if (DEBUG) + System.err.println("\tTerminating write of changeset " + tempFile); + writeByte(TERM); + wroteSinceLastRead = false; + } + if (overflow) { + if (DEBUG) + System.err.println("\tReading changeset from " + tempFile.getAbsolutePath()); + flushWriteBuffer(writeBufferIndex); + readPosition = 0L; + setFilePosition(readPosition); + fillReadBuffer(); + } else { + readBufferIndex = 0; + } + return true; + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + private void fillReadBuffer() { + readBufferIndex = 0; + int read = 0; + try { + setFilePosition(readPosition); + readBuffer.position(0); + readBuffer.limit(BUFFER_SIZE); + for (;read < BUFFER_SIZE;) { + int got = channel.read(readBuffer); + if (got == -1) + return; + read += got; + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + currentPosition = channel.position(); + readPosition = currentPosition; + } catch (IOException e) { + e.printStackTrace(); + } + if (DEBUG) + System.err.println("fillReadBuffer read " + read + " bytes"); + } + } + + /** + * @param newPosition + * @return true if file position was modified + * @throws IOException + */ + private boolean setFilePosition(long newPosition) throws IOException { + if (channel == null) { + return false; + } + if (newPosition != currentPosition) { + if (DEBUG) + System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition); + channel.position(newPosition); + currentPosition = newPosition; + return true; + } + return false; + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + public void close() { + session = null; + channel = null; + if (file != null) { + if (DEBUG) + System.err.println("Closing temporary changeset input stream " + file); + try { + file.close(); + } catch (IOException e) { + } finally { + file = null; + } + } + if (tempFile != null) { + if (DEBUG) + System.err.println("Deleting temporary changeset file " + tempFile); + tempFile.delete(); + tempFile = null; + } + } + + public static void main(String[] args) { + ClientChangesImpl ccs = new ClientChangesImpl(null); + System.out.println("isEmpty=" + ccs.isEmpty()); + System.out.println("clientChanges=" + ccs.toString()); + System.out.println("isEmpty=" + ccs.isEmpty()); + } + +}