From 09366c7018d046746832e4e7033ca2588455dac2 Mon Sep 17 00:00:00 2001 From: Tuukka Lehtonen Date: Fri, 16 Feb 2018 15:09:29 +0200 Subject: [PATCH] Dispose ClientChangesImpl ChangeSets to minimize memory footprint This change stems from the fact that org.simantics.db.common.changeset.GenericChangeListener gives ChangeSets as an argument to DB requests which can leave strong references to the ChangeSet instances in the DB QueryProcessor until the DB client garbage collects these requests. Usually DB client request parameters should be immutable, but in this case the request is never listened to for changes so it is OK dispose of the ChangeSet parameter after all DB ChangeListeners are notified. This change adds a Disposable implementation to ClientChangesImpl which minimizes the memory footprint of the class down to 192 bytes compared to ~2^17 bytes. The instances can take up a very considerable amount of memory if not disposed. It has not been uncommon to see multiple gigabytes of memory being spent by these byte[] buffers that are not really used anymore but still referenced. Once more note that this is not a case of memory leakage but simply bad use of memory through over-sized buffers that were GC'ed eventually. refs #6233 Change-Id: I2e96754f106602a4986c37187a9af3bbd64356dc --- .../procore/internal/ClientChangesImpl.java | 188 ++++++++++++++---- .../procore/internal/ClusterTable.java | 1 + .../procore/internal/SessionImplSocket.java | 6 + .../internal/SessionRequestManager.java | 3 + .../vtt/simantics/procore/internal/State.java | 2 + .../procore/internal/UndoRedoSupportImpl.java | 2 + .../src/org/simantics/db/Disposable.java | 10 + 7 files changed, 177 insertions(+), 35 deletions(-) diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java index 000880c3d..69e3d6efd 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Iterator; import org.simantics.db.ChangeSet; +import org.simantics.db.Disposable; import org.simantics.db.Resource; import org.simantics.db.common.internal.config.InternalClientConfig; import org.simantics.db.exception.RuntimeDatabaseException; @@ -31,7 +32,11 @@ import org.simantics.db.impl.ResourceImpl; * @author Antti Villberg * @author Tuukka Lehtonen */ -public final class ClientChangesImpl implements ChangeSet { +public final class ClientChangesImpl implements ChangeSet, Disposable { + + private static final boolean ASSERT_ALIVE = true; + + private static final byte[] EMPTY = {}; private static final boolean DEBUG = false; @@ -43,6 +48,8 @@ public final class ClientChangesImpl implements ChangeSet { private static final byte TERM = 4; private static final byte INVALIDATE = 5; + private boolean disposed = false; + private SessionImplSocket session; /** @@ -51,7 +58,7 @@ public final class ClientChangesImpl implements ChangeSet { */ private boolean overflow = false; - private final byte[] readBytes = new byte[BUFFER_SIZE]; + private byte[] readBytes = new byte[BUFFER_SIZE]; // By default the write buffer will be the exact same as the read buffer // since initially were not forced to write changes to the disk. private byte[] writeBytes = readBytes; @@ -67,7 +74,7 @@ public final class ClientChangesImpl implements ChangeSet { */ private boolean wroteSinceLastRead = true; - private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); + private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes); private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes); private File tempFile; @@ -161,6 +168,7 @@ public final class ClientChangesImpl implements ChangeSet { ClientChangesImpl(SessionImplSocket session) { this.session = session; + //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this)); } @Override @@ -191,6 +199,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public synchronized Collection changedStatements() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -254,6 +264,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public Collection changedValues() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -315,6 +327,8 @@ public final class ClientChangesImpl implements ChangeSet { @Override public Collection changedResources() { + if (ASSERT_ALIVE) + assertAlive(); if (!resetRead()) return Collections.emptyList(); @@ -392,6 +406,8 @@ public final class ClientChangesImpl implements ChangeSet { } void claim(Resource s, Resource p, Resource o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(CLAIM); writeInt(session.getId((ResourceImpl) s)); writeInt(session.getId((ResourceImpl) p)); @@ -404,6 +420,8 @@ public final class ClientChangesImpl implements ChangeSet { private int sCache3 = -1; final void invalidate(int s) { + if (ASSERT_ALIVE) + assertAlive(); if(s == sCache1) { return; @@ -430,6 +448,8 @@ public final class ClientChangesImpl implements ChangeSet { } void claim(int s, int p, int o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(CLAIM); writeInt(s); writeInt(p); @@ -438,6 +458,8 @@ public final class ClientChangesImpl implements ChangeSet { } void deny(Resource s, Resource p, Resource o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(DENY); writeInt(session.getId((ResourceImpl)s)); writeInt(session.getId((ResourceImpl)p)); @@ -446,6 +468,8 @@ public final class ClientChangesImpl implements ChangeSet { } void deny(int s, int p, int o) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(DENY); writeInt(s); writeInt(p); @@ -454,12 +478,16 @@ public final class ClientChangesImpl implements ChangeSet { } void claimValue(Resource r) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(VALUE); writeInt(session.getId((ResourceImpl)r)); if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId()); } void claimValue(int r) { + if (ASSERT_ALIVE) + assertAlive(); writeByte(VALUE); writeInt(r); if(DEBUG) System.err.println("deny2 " + r); @@ -474,25 +502,61 @@ public final class ClientChangesImpl implements ChangeSet { } private int readInt() { - if(readBufferIndex < (BUFFER_SIZE-4)) { - int result = (int) - ((readBytes[readBufferIndex++] & 0xff) | - ((readBytes[readBufferIndex++] & 0xff) << 8) | - ((readBytes[readBufferIndex++] & 0xff) << 16) | - ((readBytes[readBufferIndex++] & 0xff) << 24)); + int ri = readBufferIndex; + int remaining = BUFFER_SIZE - ri; + int result; + switch (remaining) { + case 0: + fillReadBuffer(); + // Intentional fall-through + default: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16) | + ((readBytes[ri+3] & 0xff) << 24)); + readBufferIndex += 4; + return result; + + case 4: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16) | + ((readBytes[ri+3] & 0xff) << 24)); + fillReadBuffer(); return result; - } else { - int has = BUFFER_SIZE-readBufferIndex; - int result = 0; - if(has == 0) fillReadBuffer(); - result = (int)(readBytes[readBufferIndex++] & 0xff); - if(has == 1) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8); - if(has == 2) fillReadBuffer(); - result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16); - if(has == 3) fillReadBuffer(); + + case 3: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8) | + ((readBytes[ri+2] & 0xff) << 16)); + fillReadBuffer(); result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24); - if(has == 4) fillReadBuffer(); + return result; + + case 2: + result = (int)( + (readBytes[ri] & 0xff) | + ((readBytes[ri+1] & 0xff) << 8)); + fillReadBuffer(); + ri = readBufferIndex; + result |= (int)( + ((readBytes[ri] & 0xff) << 16) | + ((readBytes[ri+1] & 0xff) << 24)); + readBufferIndex += 2; + return result; + + case 1: + result = (int)(readBytes[ri] & 0xff); + fillReadBuffer(); + ri = readBufferIndex; + result |= (int)( + ((readBytes[ri] & 0xff) << 8) | + ((readBytes[ri+1] & 0xff) << 16) | + ((readBytes[ri+2] & 0xff) << 24)); + readBufferIndex += 3; return result; } } @@ -504,22 +568,55 @@ public final class ClientChangesImpl implements ChangeSet { } private void writeInt(int i) { - if(writeBufferIndex < (BUFFER_SIZE-4)) { - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); + int wi = writeBufferIndex; + int room = BUFFER_SIZE - wi; + switch (room) { + case 0: + flushWriteBuffer(BUFFER_SIZE); + // Intentional fall-through + default: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + writeBytes[wi+3] = (byte)((i>>>24)&0xff); + writeBufferIndex += 4; + break; + + case 4: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + writeBytes[wi+3] = (byte)((i>>>24)&0xff); + flushWriteBuffer(BUFFER_SIZE); + break; + + case 3: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + writeBytes[wi+2] = (byte)((i>>>16)&0xff); + flushWriteBuffer(BUFFER_SIZE); writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - } else { - int has = BUFFER_SIZE-writeBufferIndex; - if(has == 0) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)(i&0xff); - if(has == 1) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) flushWriteBuffer(BUFFER_SIZE); - writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) flushWriteBuffer(BUFFER_SIZE); + break; + + case 2: + writeBytes[wi] = (byte)(i&0xff); + writeBytes[wi+1] = (byte)((i>>>8)&0xff); + flushWriteBuffer(BUFFER_SIZE); + wi = writeBufferIndex; + writeBytes[wi] = (byte)((i>>>16)&0xff); + writeBytes[wi+1] = (byte)((i>>>24)&0xff); + writeBufferIndex += 2; + break; + + case 1: + writeBytes[wi] = (byte)(i&0xff); + flushWriteBuffer(BUFFER_SIZE); + wi = writeBufferIndex; + writeBytes[wi] = (byte)((i>>>8)&0xff); + writeBytes[wi+1] = (byte)((i>>>16)&0xff); + writeBytes[wi+2] = (byte)((i>>>24)&0xff); + writeBufferIndex += 3; + break; } wroteSinceLastRead = true; } @@ -662,6 +759,10 @@ public final class ClientChangesImpl implements ChangeSet { } public void close() { + if (disposed) + return; + + disposed = true; session = null; channel = null; if (file != null) { @@ -680,6 +781,23 @@ public final class ClientChangesImpl implements ChangeSet { tempFile.delete(); tempFile = null; } + + // Release large buffers to minimize memory consumption + readBytes = EMPTY; + readBuffer = ByteBuffer.wrap(EMPTY); + writeBytes = EMPTY; + writeBuffer = ByteBuffer.wrap(EMPTY); + } + + private void assertAlive() { + if (disposed) + throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed"); + } + + @Override + public void dispose() { + //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY")); + close(); } public static void main(String[] args) { diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java index 99aa9004f..2c65ee8e8 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java @@ -756,6 +756,7 @@ public final class ClusterTable implements IClusterTable { } finally { if (null != th) session.writeState = null; + cs.dispose(); } } } diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 9a86dc1fa..9290ac99f 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -34,6 +34,7 @@ import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ChangeSet; import org.simantics.db.DevelopmentKeys; +import org.simantics.db.Disposable; import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; @@ -429,6 +430,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -555,6 +557,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -677,6 +680,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1296,6 +1300,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1381,6 +1386,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java index 0ec6387ae..5bdec52e9 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; +import org.simantics.db.Disposable; import org.simantics.db.Resource; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.CancelTransactionException; @@ -213,6 +214,7 @@ public class SessionRequestManager { } else { throw new UnsupportedOperationException("delayedWriteState may only exist when request fails."); } + Disposable.safeDispose(session.clientChanges); session.clientChanges = new ClientChangesImpl(session); delayedState.finish(); return; @@ -242,6 +244,7 @@ public class SessionRequestManager { } + Disposable.safeDispose(session.clientChanges); session.clientChanges = new ClientChangesImpl(session); WriteState state = session.writeState; diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java index 70e2cb10d..3199901e9 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java @@ -17,6 +17,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.simantics.db.ChangeSet; +import org.simantics.db.Disposable; import org.simantics.db.Operation; import org.simantics.db.VirtualGraph; import org.simantics.db.common.utils.Logger; @@ -331,6 +332,7 @@ class State { // deny can always remove a persistent statement. clusterStream.reallyFlush(); + Disposable.safeDispose(session.clientChanges); session.clientChanges = new ClientChangesImpl(session); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java index 8b9390410..0f8d9706c 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java @@ -106,6 +106,8 @@ public class UndoRedoSupportImpl implements UndoRedoSupport { e.printStackTrace(); Logger.defaultLogError(e); th.throwableSet(e); + } finally { + cs.dispose(); } } }); diff --git a/bundles/org.simantics.db/src/org/simantics/db/Disposable.java b/bundles/org.simantics.db/src/org/simantics/db/Disposable.java index 9da08dd86..a413837fd 100644 --- a/bundles/org.simantics.db/src/org/simantics/db/Disposable.java +++ b/bundles/org.simantics.db/src/org/simantics/db/Disposable.java @@ -27,4 +27,14 @@ public interface Disposable { void dispose(); + static void safeDispose(Disposable d) { + if (d != null) + d.dispose(); + } + + static void safeDispose(Object o) { + if (o instanceof Disposable) + ((Disposable) o).dispose(); + } + } -- 2.47.1