X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FClientChangesImpl.java;h=69e3d6efdca6456d2aacf8a6cda1b9701f0e45aa;hp=000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0;hb=09366c7018d046746832e4e7033ca2588455dac2;hpb=7fa60c7f3e3a98026b4657afec5f6c02ed958657 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java index 000880c3d..69e3d6efd 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Iterator; import org.simantics.db.ChangeSet; +import org.simantics.db.Disposable; import org.simantics.db.Resource; import org.simantics.db.common.internal.config.InternalClientConfig; import org.simantics.db.exception.RuntimeDatabaseException; @@ -31,7 +32,11 @@ import org.simantics.db.impl.ResourceImpl; * @author Antti Villberg * @author Tuukka Lehtonen */ -public final class ClientChangesImpl implements ChangeSet { +public final class ClientChangesImpl implements ChangeSet, Disposable { + + private static final boolean ASSERT_ALIVE = true; + + private static final byte[] EMPTY = {}; private static final boolean DEBUG = false; @@ -43,6 +48,8 @@ public final class ClientChangesImpl implements ChangeSet { private static final byte TERM = 4; private static final byte INVALIDATE = 5; + private boolean disposed = false; + private SessionImplSocket session; /** @@ -51,7 +58,7 @@ public final class ClientChangesImpl implements ChangeSet { */ private boolean overflow = false; - private final byte[] readBytes = new byte[BUFFER_SIZE]; + private byte[] readBytes = new byte[BUFFER_SIZE]; // By default the write buffer will be the exact same as the read buffer // since initially were not forced to write changes to the disk. private byte[] writeBytes = readBytes; @@ -67,7 +74,7 @@ public final class ClientChangesImpl implements ChangeSet { */ private boolean wroteSinceLastRead = true; - private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); + private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes); private File tempFile; @@ -161,6 +168,7 @@ public final class ClientChangesImpl implements ChangeSet { ClientChangesImpl(SessionImplSocket session) { this.session = session; + //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this)); } @Override @@ -191,6 +199,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public synchronized Collection changedStatements() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -254,6 +264,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public Collection changedValues() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -315,6 +327,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public Collection changedResources() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -392,6 +406,8 @@ public final class ClientChangesImpl implements ChangeSet { } void claim(Resource s, Resource p, Resource o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(CLAIM); writeInt(session.getId((ResourceImpl) s)); writeInt(session.getId((ResourceImpl) p)); @@ -404,6 +420,8 @@ public final class ClientChangesImpl implements ChangeSet { private int sCache3 = -1; final void invalidate(int s) { + if (ASSERT_ALIVE) + assertAlive(); if(s == sCache1) { return; @@ -430,6 +448,8 @@ public final class ClientChangesImpl implements ChangeSet { } void claim(int s, int p, int o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(CLAIM); writeInt(s); writeInt(p); @@ -438,6 +458,8 @@ public final class ClientChangesImpl implements ChangeSet { } void deny(Resource s, Resource p, Resource o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(DENY); writeInt(session.getId((ResourceImpl)s)); writeInt(session.getId((ResourceImpl)p)); @@ -446,6 +468,8 @@ public final class ClientChangesImpl implements ChangeSet { } void deny(int s, int p, int o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(DENY); writeInt(s); writeInt(p); @@ -454,12 +478,16 @@ public final class ClientChangesImpl implements ChangeSet { } void claimValue(Resource r) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(VALUE); writeInt(session.getId((ResourceImpl)r)); if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId()); } void claimValue(int r) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(VALUE); writeInt(r); if(DEBUG) System.err.println("deny2 " + r); @@ -474,25 +502,61 @@ public final class ClientChangesImpl implements ChangeSet { } private int readInt() { - if(readBufferIndex < (BUFFER_SIZE-4)) { - int result = (int) - ((readBytes[readBufferIndex++] & 0xff) | - ((readBytes[readBufferIndex++] & 0xff) << 8) | - ((readBytes[readBufferIndex++] & 0xff) << 16) | - ((readBytes[readBufferIndex++] & 0xff) << 24)); + int ri = readBufferIndex; + int remaining = BUFFER_SIZE - ri; + int result; + switch (remaining) { + case 0: + fillReadBuffer(); + // Intentional fall-through + default: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16) | + ((readBytes[ri+3] & 0xff) << 24)); + readBufferIndex += 4; + return result; + + case 4: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16) | + ((readBytes[ri+3] & 0xff) << 24)); + fillReadBuffer(); return result; - } else { - int has = BUFFER_SIZE-readBufferIndex; - int result = 0; - if(has == 0) fillReadBuffer(); - result = (int)(readBytes[readBufferIndex++] & 0xff); - if(has == 1) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8); - if(has == 2) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16); - if(has == 3) fillReadBuffer(); + + case 3: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16)); + fillReadBuffer(); result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24); - if(has == 4) fillReadBuffer(); + return result; + + case 2: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8)); + fillReadBuffer(); + ri = readBufferIndex; + result |= (int)( + ((readBytes[ri] & 0xff) << 16) | + ((readBytes[ri+1] & 0xff) << 24)); + readBufferIndex += 2; + return result; + + case 1: + result = (int)(readBytes[ri] & 0xff); + fillReadBuffer(); + ri = readBufferIndex; + result |= (int)( + ((readBytes[ri] & 0xff) << 8) | + ((readBytes[ri+1] & 0xff) << 16) | + ((readBytes[ri+2] & 0xff) << 24)); + readBufferIndex += 3; return result; } } @@ -504,22 +568,55 @@ public final class ClientChangesImpl implements ChangeSet { } private void writeInt(int i) { - if(writeBufferIndex < (BUFFER_SIZE-4)) { - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); + int wi = writeBufferIndex; + int room = BUFFER_SIZE - wi; + switch (room) { + case 0: + flushWriteBuffer(BUFFER_SIZE); + // Intentional fall-through + default: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + writeBytes[wi+3] = (byte)((i>>>24)&0xff); + writeBufferIndex += 4; + break; + + case 4: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + writeBytes[wi+3] = (byte)((i>>>24)&0xff); + flushWriteBuffer(BUFFER_SIZE); + break; + + case 3: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - } else { - int has = BUFFER_SIZE-writeBufferIndex; - if(has == 0) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - if(has == 1) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) flushWriteBuffer(BUFFER_SIZE); + break; + + case 2: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + flushWriteBuffer(BUFFER_SIZE); + wi = writeBufferIndex; + writeBytes[wi] = (byte)((i>>>16)&0xff); + writeBytes[wi+1] = (byte)((i>>>24)&0xff); + writeBufferIndex += 2; + break; + + case 1: + writeBytes[wi] = (byte)(i&0xff); + flushWriteBuffer(BUFFER_SIZE); + wi = writeBufferIndex; + writeBytes[wi] = (byte)((i>>>8)&0xff); + writeBytes[wi+1] = (byte)((i>>>16)&0xff); + writeBytes[wi+2] = (byte)((i>>>24)&0xff); + writeBufferIndex += 3; + break; } wroteSinceLastRead = true; } @@ -662,6 +759,10 @@ public final class ClientChangesImpl implements ChangeSet { } public void close() { + if (disposed) + return; + + disposed = true; session = null; channel = null; if (file != null) { @@ -680,6 +781,23 @@ public final class ClientChangesImpl implements ChangeSet { tempFile.delete(); tempFile = null; } + + // Release large buffers to minimize memory consumption + readBytes = EMPTY; + readBuffer = ByteBuffer.wrap(EMPTY); + writeBytes = EMPTY; + writeBuffer = ByteBuffer.wrap(EMPTY); + } + + private void assertAlive() { + if (disposed) + throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed"); + } + + @Override + public void dispose() { + //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY")); + close(); } public static void main(String[] args) {