import java.util.Iterator;
import org.simantics.db.ChangeSet;
+import org.simantics.db.Disposable;
import org.simantics.db.Resource;
import org.simantics.db.common.internal.config.InternalClientConfig;
import org.simantics.db.exception.RuntimeDatabaseException;
* @author Antti Villberg
* @author Tuukka Lehtonen
*/
-public final class ClientChangesImpl implements ChangeSet {
+public final class ClientChangesImpl implements ChangeSet, Disposable {
+
+ private static final boolean ASSERT_ALIVE = true;
+
+ private static final byte[] EMPTY = {};
private static final boolean DEBUG = false;
private static final byte TERM = 4;
private static final byte INVALIDATE = 5;
+ private boolean disposed = false;
+
private SessionImplSocket session;
/**
*/
private boolean overflow = false;
- private final byte[] readBytes = new byte[BUFFER_SIZE];
+ private byte[] readBytes = new byte[BUFFER_SIZE];
// By default the write buffer will be the exact same as the read buffer
// since initially were not forced to write changes to the disk.
private byte[] writeBytes = readBytes;
*/
private boolean wroteSinceLastRead = true;
- private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
+ private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
private File tempFile;
ClientChangesImpl(SessionImplSocket session) {
this.session = session;
+ //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
}
@Override
@Override
public synchronized Collection<StatementChange> changedStatements() {
+ if (ASSERT_ALIVE)
+ assertAlive();
if (!resetRead())
return Collections.emptyList();
@Override
public Collection<Resource> changedValues() {
+ if (ASSERT_ALIVE)
+ assertAlive();
if (!resetRead())
return Collections.emptyList();
@Override
public Collection<Resource> changedResources() {
+ if (ASSERT_ALIVE)
+ assertAlive();
if (!resetRead())
return Collections.emptyList();
}
void claim(Resource s, Resource p, Resource o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(CLAIM);
writeInt(session.getId((ResourceImpl) s));
writeInt(session.getId((ResourceImpl) p));
private int sCache3 = -1;
final void invalidate(int s) {
+ if (ASSERT_ALIVE)
+ assertAlive();
if(s == sCache1) {
return;
}
void claim(int s, int p, int o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(CLAIM);
writeInt(s);
writeInt(p);
}
void deny(Resource s, Resource p, Resource o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(DENY);
writeInt(session.getId((ResourceImpl)s));
writeInt(session.getId((ResourceImpl)p));
}
void deny(int s, int p, int o) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(DENY);
writeInt(s);
writeInt(p);
}
void claimValue(Resource r) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(VALUE);
writeInt(session.getId((ResourceImpl)r));
if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
}
void claimValue(int r) {
+ if (ASSERT_ALIVE)
+ assertAlive();
writeByte(VALUE);
writeInt(r);
if(DEBUG) System.err.println("deny2 " + r);
}
private int readInt() {
- if(readBufferIndex < (BUFFER_SIZE-4)) {
- int result = (int)
- ((readBytes[readBufferIndex++] & 0xff) |
- ((readBytes[readBufferIndex++] & 0xff) << 8) |
- ((readBytes[readBufferIndex++] & 0xff) << 16) |
- ((readBytes[readBufferIndex++] & 0xff) << 24));
+ int ri = readBufferIndex;
+ int remaining = BUFFER_SIZE - ri;
+ int result;
+ switch (remaining) {
+ case 0:
+ fillReadBuffer();
+ // Intentional fall-through
+ default:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16) |
+ ((readBytes[ri+3] & 0xff) << 24));
+ readBufferIndex += 4;
+ return result;
+
+ case 4:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16) |
+ ((readBytes[ri+3] & 0xff) << 24));
+ fillReadBuffer();
return result;
- } else {
- int has = BUFFER_SIZE-readBufferIndex;
- int result = 0;
- if(has == 0) fillReadBuffer();
- result = (int)(readBytes[readBufferIndex++] & 0xff);
- if(has == 1) fillReadBuffer();
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
- if(has == 2) fillReadBuffer();
- result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
- if(has == 3) fillReadBuffer();
+
+ case 3:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8) |
+ ((readBytes[ri+2] & 0xff) << 16));
+ fillReadBuffer();
result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
- if(has == 4) fillReadBuffer();
+ return result;
+
+ case 2:
+ result = (int)(
+ (readBytes[ri] & 0xff) |
+ ((readBytes[ri+1] & 0xff) << 8));
+ fillReadBuffer();
+ ri = readBufferIndex;
+ result |= (int)(
+ ((readBytes[ri] & 0xff) << 16) |
+ ((readBytes[ri+1] & 0xff) << 24));
+ readBufferIndex += 2;
+ return result;
+
+ case 1:
+ result = (int)(readBytes[ri] & 0xff);
+ fillReadBuffer();
+ ri = readBufferIndex;
+ result |= (int)(
+ ((readBytes[ri] & 0xff) << 8) |
+ ((readBytes[ri+1] & 0xff) << 16) |
+ ((readBytes[ri+2] & 0xff) << 24));
+ readBufferIndex += 3;
return result;
}
}
}
private void writeInt(int i) {
- if(writeBufferIndex < (BUFFER_SIZE-4)) {
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+ int wi = writeBufferIndex;
+ int room = BUFFER_SIZE - wi;
+ switch (room) {
+ case 0:
+ flushWriteBuffer(BUFFER_SIZE);
+ // Intentional fall-through
+ default:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 4;
+ break;
+
+ case 4:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ break;
+
+ case 3:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
- } else {
- int has = BUFFER_SIZE-writeBufferIndex;
- if(has == 0) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)(i&0xff);
- if(has == 1) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
- if(has == 2) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
- if(has == 3) flushWriteBuffer(BUFFER_SIZE);
- writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
- if(has == 4) flushWriteBuffer(BUFFER_SIZE);
+ break;
+
+ case 2:
+ writeBytes[wi] = (byte)(i&0xff);
+ writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ wi = writeBufferIndex;
+ writeBytes[wi] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+1] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 2;
+ break;
+
+ case 1:
+ writeBytes[wi] = (byte)(i&0xff);
+ flushWriteBuffer(BUFFER_SIZE);
+ wi = writeBufferIndex;
+ writeBytes[wi] = (byte)((i>>>8)&0xff);
+ writeBytes[wi+1] = (byte)((i>>>16)&0xff);
+ writeBytes[wi+2] = (byte)((i>>>24)&0xff);
+ writeBufferIndex += 3;
+ break;
}
wroteSinceLastRead = true;
}
}
public void close() {
+ if (disposed)
+ return;
+
+ disposed = true;
session = null;
channel = null;
if (file != null) {
tempFile.delete();
tempFile = null;
}
+
+ // Release large buffers to minimize memory consumption
+ readBytes = EMPTY;
+ readBuffer = ByteBuffer.wrap(EMPTY);
+ writeBytes = EMPTY;
+ writeBuffer = ByteBuffer.wrap(EMPTY);
+ }
+
+ private void assertAlive() {
+ if (disposed)
+ throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
+ }
+
+ @Override
+ public void dispose() {
+ //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
+ close();
}
public static void main(String[] args) {