Dispose ClientChangesImpl ChangeSets to minimize memory footprint 74/1474/7
authorTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Fri, 16 Feb 2018 13:09:29 +0000 (15:09 +0200)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Tue, 20 Feb 2018 14:01:50 +0000 (16:01 +0200)
This change stems from the fact that
org.simantics.db.common.changeset.GenericChangeListener gives ChangeSets
as an argument to DB requests which can leave strong references to the
ChangeSet instances in the DB QueryProcessor until the DB client
garbage collects these requests. Usually DB client request parameters
should be immutable, but in this case the request is never listened to
for changes so it is OK dispose of the ChangeSet parameter after all
DB ChangeListeners are notified.

This change adds a Disposable implementation to ClientChangesImpl which
minimizes the memory footprint of the class down to 192 bytes compared
to ~2^17 bytes. The instances can take up a very considerable amount of
memory if not disposed. It has not been uncommon to see multiple
gigabytes of memory being spent by these byte[] buffers that are not
really used anymore but still referenced.

Once more note that this is not a case of memory leakage but simply bad
use of memory through over-sized buffers that were GC'ed eventually.

refs #6233

Change-Id: I2e96754f106602a4986c37187a9af3bbd64356dc

bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClientChangesImpl.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java
bundles/org.simantics.db/src/org/simantics/db/Disposable.java

index 000880c3d1d1bf3ff00f5b409ee69bd657a3c2b0..69e3d6efdca6456d2aacf8a6cda1b9701f0e45aa 100644 (file)
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Iterator;
 
 import org.simantics.db.ChangeSet;
+import org.simantics.db.Disposable;
 import org.simantics.db.Resource;
 import org.simantics.db.common.internal.config.InternalClientConfig;
 import org.simantics.db.exception.RuntimeDatabaseException;
@@ -31,7 +32,11 @@ import org.simantics.db.impl.ResourceImpl;
  * @author Antti Villberg
  * @author Tuukka Lehtonen
  */
-public final class ClientChangesImpl implements ChangeSet {
+public final class ClientChangesImpl implements ChangeSet, Disposable {
+
+    private static final boolean ASSERT_ALIVE = true; 
+
+    private static final byte[] EMPTY = {};
 
     private static final boolean DEBUG = false;
 
@@ -43,6 +48,8 @@ public final class ClientChangesImpl implements ChangeSet {
     private static final byte TERM = 4;
     private static final byte INVALIDATE = 5;
 
+    private boolean disposed = false;
+
     private SessionImplSocket session;
 
     /**
@@ -51,7 +58,7 @@ public final class ClientChangesImpl implements ChangeSet {
      */
     private boolean overflow = false;
 
-    private final byte[] readBytes = new byte[BUFFER_SIZE];
+    private byte[] readBytes = new byte[BUFFER_SIZE];
     // By default the write buffer will be the exact same as the read buffer
     // since initially were not forced to write changes to the disk.
     private byte[] writeBytes = readBytes;
@@ -67,7 +74,7 @@ public final class ClientChangesImpl implements ChangeSet {
      */
     private boolean wroteSinceLastRead = true;
 
-    private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
+    private ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
     private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
 
     private File tempFile;
@@ -161,6 +168,7 @@ public final class ClientChangesImpl implements ChangeSet {
 
     ClientChangesImpl(SessionImplSocket session) {
         this.session = session;
+        //System.out.println("new ClientChangesImpl@" + System.identityHashCode(this));
     }
 
     @Override
@@ -191,6 +199,8 @@ public final class ClientChangesImpl implements ChangeSet {
 
     @Override
     public synchronized Collection<StatementChange> changedStatements() {
+        if (ASSERT_ALIVE)
+            assertAlive();
 
         if (!resetRead())
             return Collections.emptyList();
@@ -254,6 +264,8 @@ public final class ClientChangesImpl implements ChangeSet {
 
     @Override
     public Collection<Resource> changedValues() {
+        if (ASSERT_ALIVE)
+            assertAlive();
 
         if (!resetRead())
             return Collections.emptyList();
@@ -315,6 +327,8 @@ public final class ClientChangesImpl implements ChangeSet {
 
     @Override
     public Collection<Resource> changedResources() {
+        if (ASSERT_ALIVE)
+            assertAlive();
 
         if (!resetRead())
             return Collections.emptyList();
@@ -392,6 +406,8 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     void claim(Resource s, Resource p, Resource o) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(CLAIM);
         writeInt(session.getId((ResourceImpl) s));
         writeInt(session.getId((ResourceImpl) p));
@@ -404,6 +420,8 @@ public final class ClientChangesImpl implements ChangeSet {
     private int sCache3 = -1;
 
     final void invalidate(int s) {
+        if (ASSERT_ALIVE)
+            assertAlive();
        
        if(s == sCache1) {
                return;
@@ -430,6 +448,8 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     void claim(int s, int p, int o) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(CLAIM);
         writeInt(s);
         writeInt(p);
@@ -438,6 +458,8 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     void deny(Resource s, Resource p, Resource o) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(DENY);
         writeInt(session.getId((ResourceImpl)s));
         writeInt(session.getId((ResourceImpl)p));
@@ -446,6 +468,8 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     void deny(int s, int p, int o) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(DENY);
         writeInt(s);
         writeInt(p);
@@ -454,12 +478,16 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     void claimValue(Resource r) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(VALUE);
         writeInt(session.getId((ResourceImpl)r));
         if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
     }
 
     void claimValue(int r) {
+        if (ASSERT_ALIVE)
+            assertAlive();
         writeByte(VALUE);
         writeInt(r);
         if(DEBUG) System.err.println("deny2 " + r);
@@ -474,25 +502,61 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
        private int readInt() {
-        if(readBufferIndex < (BUFFER_SIZE-4)) {
-            int result = (int) 
-                ((readBytes[readBufferIndex++] & 0xff) | 
-                ((readBytes[readBufferIndex++] & 0xff) << 8) | 
-                ((readBytes[readBufferIndex++] & 0xff) << 16) | 
-                ((readBytes[readBufferIndex++] & 0xff) << 24)); 
+        int ri = readBufferIndex; 
+        int remaining = BUFFER_SIZE - ri;
+        int result;
+        switch (remaining) {
+        case 0:
+            fillReadBuffer();
+            // Intentional fall-through
+        default:
+            result = (int)(
+                     (readBytes[ri] & 0xff) | 
+                    ((readBytes[ri+1] & 0xff) << 8) | 
+                    ((readBytes[ri+2] & 0xff) << 16) | 
+                    ((readBytes[ri+3] & 0xff) << 24));
+            readBufferIndex += 4;
+            return result;
+
+        case 4:
+            result = (int)(
+                     (readBytes[ri] & 0xff) | 
+                    ((readBytes[ri+1] & 0xff) << 8) | 
+                    ((readBytes[ri+2] & 0xff) << 16) | 
+                    ((readBytes[ri+3] & 0xff) << 24));
+            fillReadBuffer();
             return result;
-        } else {
-            int has = BUFFER_SIZE-readBufferIndex;
-            int result = 0;
-            if(has == 0) fillReadBuffer();
-            result = (int)(readBytes[readBufferIndex++] & 0xff);
-            if(has == 1) fillReadBuffer();
-            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
-            if(has == 2) fillReadBuffer();
-            result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
-            if(has == 3) fillReadBuffer();
+
+        case 3:
+            result = (int)(
+                     (readBytes[ri] & 0xff) | 
+                    ((readBytes[ri+1] & 0xff) << 8) | 
+                    ((readBytes[ri+2] & 0xff) << 16)); 
+            fillReadBuffer();
             result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
-            if(has == 4) fillReadBuffer();
+            return result;
+
+        case 2:
+            result = (int)(
+                    (readBytes[ri] & 0xff) | 
+                   ((readBytes[ri+1] & 0xff) << 8)); 
+            fillReadBuffer();
+            ri = readBufferIndex;
+            result |= (int)(
+                   ((readBytes[ri] & 0xff) << 16) | 
+                   ((readBytes[ri+1] & 0xff) << 24)); 
+            readBufferIndex += 2;
+            return result;
+
+        case 1:
+            result = (int)(readBytes[ri] & 0xff); 
+            fillReadBuffer();
+            ri = readBufferIndex;
+            result |= (int)( 
+                    ((readBytes[ri] & 0xff) << 8) | 
+                    ((readBytes[ri+1] & 0xff) << 16) | 
+                    ((readBytes[ri+2] & 0xff) << 24));
+            readBufferIndex += 3;
             return result;
         }
     }
@@ -504,22 +568,55 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     private void writeInt(int i) {
-        if(writeBufferIndex < (BUFFER_SIZE-4)) {
-            writeBytes[writeBufferIndex++] = (byte)(i&0xff);
-            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
-            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
+        int wi = writeBufferIndex;
+        int room = BUFFER_SIZE - wi;
+        switch (room) {
+        case 0:
+            flushWriteBuffer(BUFFER_SIZE);
+            // Intentional fall-through
+        default:
+            writeBytes[wi]   = (byte)(i&0xff);
+            writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+            writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+            writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+            writeBufferIndex += 4;
+            break;
+
+        case 4:
+            writeBytes[wi]   = (byte)(i&0xff);
+            writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+            writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+            writeBytes[wi+3] = (byte)((i>>>24)&0xff);
+            flushWriteBuffer(BUFFER_SIZE);
+            break;
+
+        case 3:
+            writeBytes[wi]   = (byte)(i&0xff);
+            writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+            writeBytes[wi+2] = (byte)((i>>>16)&0xff);
+            flushWriteBuffer(BUFFER_SIZE);
             writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
-        } else {
-            int has = BUFFER_SIZE-writeBufferIndex;
-            if(has == 0) flushWriteBuffer(BUFFER_SIZE);
-            writeBytes[writeBufferIndex++] = (byte)(i&0xff);
-            if(has == 1) flushWriteBuffer(BUFFER_SIZE);
-            writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
-            if(has == 2) flushWriteBuffer(BUFFER_SIZE);
-            writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
-            if(has == 3) flushWriteBuffer(BUFFER_SIZE);
-            writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
-            if(has == 4) flushWriteBuffer(BUFFER_SIZE);
+            break;
+
+        case 2:
+            writeBytes[wi]   = (byte)(i&0xff);
+            writeBytes[wi+1] = (byte)((i>>>8)&0xff);
+            flushWriteBuffer(BUFFER_SIZE);
+            wi = writeBufferIndex;
+            writeBytes[wi]   = (byte)((i>>>16)&0xff);
+            writeBytes[wi+1] = (byte)((i>>>24)&0xff);
+            writeBufferIndex += 2;
+            break;
+
+        case 1:
+            writeBytes[wi]   = (byte)(i&0xff);
+            flushWriteBuffer(BUFFER_SIZE);
+            wi = writeBufferIndex;
+            writeBytes[wi]   = (byte)((i>>>8)&0xff);
+            writeBytes[wi+1] = (byte)((i>>>16)&0xff);
+            writeBytes[wi+2] = (byte)((i>>>24)&0xff);
+            writeBufferIndex += 3;
+            break;
         }
         wroteSinceLastRead = true;
     }
@@ -662,6 +759,10 @@ public final class ClientChangesImpl implements ChangeSet {
     }
 
     public void close() {
+        if (disposed)
+            return;
+
+        disposed = true;
         session = null;
         channel = null;
         if (file != null) {
@@ -680,6 +781,23 @@ public final class ClientChangesImpl implements ChangeSet {
             tempFile.delete();
             tempFile = null;
         }
+
+        // Release large buffers to minimize memory consumption
+        readBytes = EMPTY;
+        readBuffer = ByteBuffer.wrap(EMPTY);
+        writeBytes = EMPTY;
+        writeBuffer = ByteBuffer.wrap(EMPTY);
+    }
+
+    private void assertAlive() {
+        if (disposed)
+            throw new IllegalStateException("ClientChangeImpl@" + System.identityHashCode(this) + " is already disposed");
+    }
+
+    @Override
+    public void dispose() {
+        //System.out.println("ClientChangesImpl[" + (disposed ? "DISPOSED" : "") + "]@" + System.identityHashCode(this) + ".dispose " + (isEmpty() ? "EMPTY" : "NON-EMPTY"));
+        close();
     }
 
     public static void main(String[] args) {
index 99aa9004f34d610f7a66636a47a9659d254566ee..2c65ee8e85088f6f1f051de7b5b851733a345c54 100644 (file)
@@ -756,6 +756,7 @@ public final class ClusterTable implements IClusterTable {
                 } finally {
                     if (null != th)
                         session.writeState = null;
+                    cs.dispose();
                 }
             }
         }
index 9a86dc1fa2ae813b0b3a137b51a9063546af27af..9290ac99fdd98a5792a542a0b51b7309941f1704 100644 (file)
@@ -34,6 +34,7 @@ import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.ChangeSet;
 import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
 import org.simantics.db.ExternalValueSupport;
 import org.simantics.db.Metadata;
 import org.simantics.db.MonitorContext;
@@ -429,6 +430,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 try {
 
                     flushCounter = 0;
+                    Disposable.safeDispose(clientChanges);
                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                     VirtualGraph vg = getProvider(request.getProvider());
@@ -555,6 +557,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 VirtualGraph vg = getProvider(request.getProvider());
@@ -677,6 +680,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 acquireWriteOnly();
@@ -1296,6 +1300,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
             flushCounter = 0;
+            Disposable.safeDispose(clientChanges);
             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
             acquireWriteOnly();
@@ -1381,6 +1386,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                         flushCounter = 0;
+                        Disposable.safeDispose(clientChanges);
                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                         acquireWriteOnly();
index 0ec6387ae941b0e9cb9c6358282c3b3849ff9d0d..5bdec52e99cfcb3cdb71010584250904e65ad9ac 100644 (file)
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.simantics.db.Disposable;
 import org.simantics.db.Resource;
 import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.CancelTransactionException;
@@ -213,6 +214,7 @@ public class SessionRequestManager {
                        } else {
                            throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
                        }
+                       Disposable.safeDispose(session.clientChanges);
                        session.clientChanges = new ClientChangesImpl(session);
                                        delayedState.finish();
                                        return;
@@ -242,6 +244,7 @@ public class SessionRequestManager {
 
                                }
 
+                               Disposable.safeDispose(session.clientChanges);
                                session.clientChanges = new ClientChangesImpl(session);
 
                                WriteState<?> state = session.writeState;
index 70e2cb10dd603122da7737bab85a2ea88bdc631a..3199901e90e191962c5815b6bde1623e39cda4f8 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.simantics.db.ChangeSet;
+import org.simantics.db.Disposable;
 import org.simantics.db.Operation;
 import org.simantics.db.VirtualGraph;
 import org.simantics.db.common.utils.Logger;
@@ -331,6 +332,7 @@ class State {
                 // deny can always remove a persistent statement. 
                 clusterStream.reallyFlush();
 
+                Disposable.safeDispose(session.clientChanges);
                 session.clientChanges = new ClientChangesImpl(session);
 
 
index 8b939041031671de5ea875e1cd1f1793464bc44e..0f8d9706c6814d089120f71996a5ebcd10fd3b62 100644 (file)
@@ -106,6 +106,8 @@ public class UndoRedoSupportImpl implements UndoRedoSupport {
                         e.printStackTrace();
                     Logger.defaultLogError(e);
                     th.throwableSet(e);
+                } finally {
+                    cs.dispose();
                 }
             }
         });
index 9da08dd86c980a3c13ad7547bb56697edc8c68ea..a413837fd3ed4310d32f57b78c39dd6d550f0ea6 100644 (file)
@@ -27,4 +27,14 @@ public interface Disposable {
 
     void dispose();
 
+    static void safeDispose(Disposable d) {
+        if (d != null)
+            d.dispose();
+    }
+
+    static void safeDispose(Object o) {
+        if (o instanceof Disposable)
+            ((Disposable) o).dispose();
+    }
+
 }