/*******************************************************************************
* Copyright (c) 2007, 2010 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* VTT Technical Research Centre of Finland - initial API and implementation
*******************************************************************************/
package fi.vtt.simantics.procore.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import org.simantics.db.ChangeSet;
import org.simantics.db.Disposable;
import org.simantics.db.Resource;
import org.simantics.db.common.internal.config.InternalClientConfig;
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.impl.ResourceImpl;
/**
* @author Antti Villberg
* @author Tuukka Lehtonen
*/
public final class ClientChangesImpl implements ChangeSet, Disposable {
private static final boolean ASSERT_ALIVE = true;
private static final byte[] EMPTY = {};
private static final boolean DEBUG = false;
private static final int BUFFER_SIZE = 1 << 17;
private static final byte CLAIM = 1;
private static final byte DENY = 2;
private static final byte VALUE = 3;
private static final byte TERM = 4;
private static final byte INVALIDATE = 5;
private boolean disposed = false;
private SessionImplSocket session;
/**
* True if enough data has been written to overflow the normal buffer which
* will result in data being written to disk.
*/
private boolean overflow = false;
private byte[] readBytes = new byte[BUFFER_SIZE];
// By default the write buffer will be the exact same as the read buffer
// since initially were not forced to write changes to the disk.
private byte[] writeBytes = readBytes;
private int readBufferIndex = 0;
private int writeBufferIndex = 0;
/**
* true
if something has been written since last call to
* {@link #resetRead()}. By default it is true
since
* {@link #resetRead()} must on its first invocation write the {@link #TERM}
* constant into the stream.
*/
private boolean wroteSinceLastRead = true;
private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
private File tempFile;
private RandomAccessFile file;
private FileChannel channel;
private long currentPosition = 0L;
private long readPosition = 0L;
private long writePosition = 0L;
static abstract class NoRemoveIterator implements Iterator {
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* Base class for all immutable collections returned by
* {@link ClientChangesImpl}.
*
* @param
*/
static class CollectionBase implements Collection {
@Override
public int size() {
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty() {
throw new UnsupportedOperationException();
}
@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException();
}
@Override
public Iterator iterator() {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public TT[] toArray(TT[] a) {
throw new UnsupportedOperationException();
}
@Override
public boolean add(T e) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(Collection extends T> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
};
ClientChangesImpl(SessionImplSocket session) {
this.session = session;
//System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
}
@Override
public String toString() {
final int LIMIT = 10000;
String to = "";
for (StatementChange s : changedStatements()) {
String op = s.isClaim() ? "add" : "rem";
to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
if (to.length() > LIMIT) {
to += "... more.\n";
return to;
}
}
for (Resource r : changedValues()) {
to += "\nmod " + r;
if (to.length() > LIMIT) {
to += "...etc.\n";
return to;
}
}
return to;
}
private Resource getResource(int id) {
return session.getResource(id);
}
@Override
public synchronized Collection changedStatements() {
if (ASSERT_ALIVE)
assertAlive();
if (!resetRead())
return Collections.emptyList();
return new CollectionBase() {
StatementChange seek() {
try {
while(true) {
byte method = readByte();
switch(method) {
case TERM: {
return null;
}
case CLAIM: {
Resource s = getResource(readInt());
Resource p = getResource(readInt());
Resource o = getResource(readInt());
return new StatementChangeImpl(s, p, o, true);
}
case DENY: {
Resource s = getResource(readInt());
Resource p = getResource(readInt());
Resource o = getResource(readInt());
return new StatementChangeImpl(s, p, o, false);
}
case VALUE:
case INVALIDATE: {
// Skipping value opererations.
getResource(readInt());
break;
}
}
}
} catch(Exception e) {
throw new RuntimeDatabaseException(e);
}
}
private StatementChange next = seek();
@Override
public Iterator iterator() {
return new NoRemoveIterator() {
@Override
public boolean hasNext() {
return next != null;
}
@Override
public StatementChange next() {
StatementChange result = next;
next = seek();
return result;
}
};
}
};
}
@Override
public Collection changedValues() {
if (ASSERT_ALIVE)
assertAlive();
if (!resetRead())
return Collections.emptyList();
return new CollectionBase() {
Resource seek() {
try {
while(true) {
byte method = readByte();
switch(method) {
case TERM: {
return null;
}
case CLAIM: {
getResource(readInt());
getResource(readInt());
getResource(readInt());
break;
}
case DENY: {
getResource(readInt());
getResource(readInt());
getResource(readInt());
break;
}
case VALUE:
case INVALIDATE: {
return getResource(readInt());
}
}
}
} catch(Exception e) {
return null;
}
}
private Resource next = seek();
@Override
public Iterator iterator() {
return new NoRemoveIterator() {
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Resource next() {
Resource result = next;
next = seek();
return result;
}
};
}
};
}
@Override
public Collection changedResources() {
if (ASSERT_ALIVE)
assertAlive();
if (!resetRead())
return Collections.emptyList();
return new CollectionBase() {
Resource seek() {
try {
while(true) {
byte method = readByte();
switch(method) {
case TERM: {
return null;
}
case CLAIM: {
getResource(readInt());
getResource(readInt());
getResource(readInt());
break;
}
case DENY: {
getResource(readInt());
getResource(readInt());
getResource(readInt());
break;
}
case VALUE: {
getResource(readInt());
break;
}
case INVALIDATE: {
return getResource(readInt());
}
}
}
} catch(Exception e) {
return null;
}
}
private Resource next = seek();
@Override
public Iterator iterator() {
return new NoRemoveIterator() {
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Resource next() {
Resource result = next;
next = seek();
return result;
}
};
}
};
}
@Override
public boolean isEmpty() {
if (notEmpty)
return false;
return writeBufferIndex == 0 && !overflow;
}
private boolean notEmpty = false;
void setNotEmpty(boolean notEmpty) {
this.notEmpty = notEmpty;
}
void claim(Resource s, Resource p, Resource o) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(CLAIM);
writeInt(session.getId((ResourceImpl) s));
writeInt(session.getId((ResourceImpl) p));
writeInt(session.getId((ResourceImpl) o));
if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
}
private int sCache1 = -1;
private int sCache2 = -1;
private int sCache3 = -1;
final void invalidate(int s) {
if (ASSERT_ALIVE)
assertAlive();
if(s == sCache1) {
return;
} else if (s == sCache2) {
sCache2 = sCache1;
sCache1 = s;
return;
} else if (s == sCache3) {
sCache3 = sCache2;
sCache2 = sCache1;
sCache1 = s;
return;
}
sCache3 = sCache2;
sCache2 = sCache1;
sCache1 = s;
writeByte(INVALIDATE);
writeInt(s);
if(DEBUG) System.err.println("invalidate2 " + s);
}
void claim(int s, int p, int o) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(CLAIM);
writeInt(s);
writeInt(p);
writeInt(o);
if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
}
void deny(Resource s, Resource p, Resource o) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(DENY);
writeInt(session.getId((ResourceImpl)s));
writeInt(session.getId((ResourceImpl)p));
writeInt(session.getId((ResourceImpl)o));
if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
}
void deny(int s, int p, int o) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(DENY);
writeInt(s);
writeInt(p);
writeInt(o);
if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
}
void claimValue(Resource r) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(VALUE);
writeInt(session.getId((ResourceImpl)r));
if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
}
void claimValue(int r) {
if (ASSERT_ALIVE)
assertAlive();
writeByte(VALUE);
writeInt(r);
if(DEBUG) System.err.println("deny2 " + r);
}
// Streaming
private byte readByte() {
byte result = readBytes[readBufferIndex++];
if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
return result;
}
private int readInt() {
int ri = readBufferIndex;
int remaining = BUFFER_SIZE - ri;
int result;
switch (remaining) {
case 0:
fillReadBuffer();
// Intentional fall-through
default:
result = (int)(
(readBytes[ri] & 0xff) |
((readBytes[ri+1] & 0xff) << 8) |
((readBytes[ri+2] & 0xff) << 16) |
((readBytes[ri+3] & 0xff) << 24));
readBufferIndex += 4;
return result;
case 4:
result = (int)(
(readBytes[ri] & 0xff) |
((readBytes[ri+1] & 0xff) << 8) |
((readBytes[ri+2] & 0xff) << 16) |
((readBytes[ri+3] & 0xff) << 24));
fillReadBuffer();
return result;
case 3:
result = (int)(
(readBytes[ri] & 0xff) |
((readBytes[ri+1] & 0xff) << 8) |
((readBytes[ri+2] & 0xff) << 16));
fillReadBuffer();
result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
return result;
case 2:
result = (int)(
(readBytes[ri] & 0xff) |
((readBytes[ri+1] & 0xff) << 8));
fillReadBuffer();
ri = readBufferIndex;
result |= (int)(
((readBytes[ri] & 0xff) << 16) |
((readBytes[ri+1] & 0xff) << 24));
readBufferIndex += 2;
return result;
case 1:
result = (int)(readBytes[ri] & 0xff);
fillReadBuffer();
ri = readBufferIndex;
result |= (int)(
((readBytes[ri] & 0xff) << 8) |
((readBytes[ri+1] & 0xff) << 16) |
((readBytes[ri+2] & 0xff) << 24));
readBufferIndex += 3;
return result;
}
}
private void writeByte(int b) {
writeBytes[writeBufferIndex++] = (byte)b;
if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
wroteSinceLastRead = true;
}
private void writeInt(int i) {
int wi = writeBufferIndex;
int room = BUFFER_SIZE - wi;
switch (room) {
case 0:
flushWriteBuffer(BUFFER_SIZE);
// Intentional fall-through
default:
writeBytes[wi] = (byte)(i&0xff);
writeBytes[wi+1] = (byte)((i>>>8)&0xff);
writeBytes[wi+2] = (byte)((i>>>16)&0xff);
writeBytes[wi+3] = (byte)((i>>>24)&0xff);
writeBufferIndex += 4;
break;
case 4:
writeBytes[wi] = (byte)(i&0xff);
writeBytes[wi+1] = (byte)((i>>>8)&0xff);
writeBytes[wi+2] = (byte)((i>>>16)&0xff);
writeBytes[wi+3] = (byte)((i>>>24)&0xff);
flushWriteBuffer(BUFFER_SIZE);
break;
case 3:
writeBytes[wi] = (byte)(i&0xff);
writeBytes[wi+1] = (byte)((i>>>8)&0xff);
writeBytes[wi+2] = (byte)((i>>>16)&0xff);
flushWriteBuffer(BUFFER_SIZE);
writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
break;
case 2:
writeBytes[wi] = (byte)(i&0xff);
writeBytes[wi+1] = (byte)((i>>>8)&0xff);
flushWriteBuffer(BUFFER_SIZE);
wi = writeBufferIndex;
writeBytes[wi] = (byte)((i>>>16)&0xff);
writeBytes[wi+1] = (byte)((i>>>24)&0xff);
writeBufferIndex += 2;
break;
case 1:
writeBytes[wi] = (byte)(i&0xff);
flushWriteBuffer(BUFFER_SIZE);
wi = writeBufferIndex;
writeBytes[wi] = (byte)((i>>>8)&0xff);
writeBytes[wi+1] = (byte)((i>>>16)&0xff);
writeBytes[wi+2] = (byte)((i>>>24)&0xff);
writeBufferIndex += 3;
break;
}
wroteSinceLastRead = true;
}
private int tryFlushBuffer(int size) throws IOException {
if (DEBUG)
System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
if (tempFile == null) {
File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
base.mkdirs();
tempFile = File.createTempFile("cset-", ".cs", base);
file = new RandomAccessFile(tempFile, "rwd");
channel = file.getChannel();
if (DEBUG)
System.err.println("Allocated temporary file for changeset: " + tempFile);
}
int wrote = 0;
writeBuffer.position(0);
writeBuffer.limit(size);
for (; wrote < size;) {
wrote += channel.write(writeBuffer);
}
return wrote;
}
private void flushWriteBuffer(int size) {
if (DEBUG)
System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
if (size == 0)
return;
if (!overflow) {
// Change set is about to become file-backed.
// Initialize separate buffer for writing.
writeBytes = Arrays.copyOf(readBytes, readBytes.length);
writeBuffer = ByteBuffer.wrap(writeBytes);
overflow = true;
}
writeBufferIndex = 0;
int wrote = 0;
try {
setFilePosition(writePosition);
wrote = tryFlushBuffer(size);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeDatabaseException(e);
} finally {
try {
currentPosition = channel.position();
writePosition = currentPosition;
} catch (IOException e) {
e.printStackTrace();
}
if (DEBUG)
System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
}
}
private boolean resetRead() {
if (DEBUG)
System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
try {
// Flush temporary file first if necessary.
if (wroteSinceLastRead) {
if (DEBUG)
System.err.println("\tTerminating write of changeset " + tempFile);
writeByte(TERM);
wroteSinceLastRead = false;
}
if (overflow) {
if (DEBUG)
System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
flushWriteBuffer(writeBufferIndex);
readPosition = 0L;
setFilePosition(readPosition);
fillReadBuffer();
} else {
readBufferIndex = 0;
}
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
private void fillReadBuffer() {
readBufferIndex = 0;
int read = 0;
try {
setFilePosition(readPosition);
readBuffer.position(0);
readBuffer.limit(BUFFER_SIZE);
for (;read < BUFFER_SIZE;) {
int got = channel.read(readBuffer);
if (got == -1)
return;
read += got;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
currentPosition = channel.position();
readPosition = currentPosition;
} catch (IOException e) {
e.printStackTrace();
}
if (DEBUG)
System.err.println("fillReadBuffer read " + read + " bytes");
}
}
/**
* @param newPosition
* @return true
if file position was modified
* @throws IOException
*/
private boolean setFilePosition(long newPosition) throws IOException {
if (channel == null) {
return false;
}
if (newPosition != currentPosition) {
if (DEBUG)
System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
channel.position(newPosition);
currentPosition = newPosition;
return true;
}
return false;
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
public void close() {
if (disposed)
return;
disposed = true;
session = null;
channel = null;
if (file != null) {
if (DEBUG)
System.err.println("Closing temporary changeset input stream " + file);
try {
file.close();
} catch (IOException e) {
} finally {
file = null;
}
}
if (tempFile != null) {
if (DEBUG)
System.err.println("Deleting temporary changeset file " + tempFile);
tempFile.delete();
tempFile = null;
}
// Release large buffers to minimize memory consumption
readBytes = EMPTY;
readBuffer = ByteBuffer.wrap(EMPTY);
writeBytes = EMPTY;
writeBuffer = ByteBuffer.wrap(EMPTY);
}
private void assertAlive() {
if (disposed)
throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
}
@Override
public void dispose() {
//System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
close();
}
public static void main(String[] args) {
ClientChangesImpl ccs = new ClientChangesImpl(null);
System.out.println("isEmpty=" + ccs.isEmpty());
System.out.println("clientChanges=" + ccs.toString());
System.out.println("isEmpty=" + ccs.isEmpty());
}
}