X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FClusterTable.java;h=6cab018a6f4beadde5859f94fc86de4b75638a92;hp=40b2530f2a569a4dedfdb7295095895871074f1d;hb=4aba159170fc72d39c2f930ea224aa71f4cdc2e7;hpb=fe1a2f532761669e67da4db4ae15096ced8a04db diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java index 40b2530f2..6cab018a6 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java @@ -18,13 +18,15 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; +import org.simantics.db.Database.Session.ClusterChanges; import org.simantics.db.DevelopmentKeys; import org.simantics.db.SessionVariables; -import org.simantics.db.Database.Session.ClusterChanges; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.ClusterDoesNotExistException; import org.simantics.db.exception.DatabaseException; @@ -46,7 +48,6 @@ import org.simantics.db.service.ClusterCollectorPolicy; import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster; import org.simantics.db.service.ClusterUID; import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; import fi.vtt.simantics.procore.DebugPolicy; import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl; @@ -60,10 +61,12 @@ import gnu.trove.set.hash.TIntHashSet; public final class ClusterTable implements IClusterTable { + private static final boolean VALIDATE_SIZE = false; + int maximumBytes = 128 * 1024 * 1024; int limit = (int)(0.8*(double)maximumBytes); - long timeCounter = 0; + private final AtomicLong timeCounter = new AtomicLong(1); final private SessionImplSocket sessionImpl; final private ArrayList writeOnlyClusters = new ArrayList(); @@ -91,6 +94,10 @@ public final class ClusterTable implements IClusterTable { public long getClusterId() { return clusterId; } + @Override + public String toString() { + return "CID " + clusterId; + } } private class Clusters { // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur) @@ -128,7 +135,7 @@ public final class ClusterTable implements IClusterTable { int clusterKey = hashMap.size(); ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator); if (sentinel.clusterId != sentinel.clusterUID.second) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); create(sentinel); return sentinel; } @@ -137,25 +144,25 @@ public final class ClusterTable implements IClusterTable { create(proxy); if (null != old) { if (old.clusterKey != proxy.clusterKey) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); if (old.clusterId != proxy.clusterId) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); if (!old.clusterUID.equals(proxy.clusterUID)) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); } } private ClusterSmall freeProxy(ClusterImpl proxy) { ClusterImpl clusterImpl = hashMap.get(proxy.clusterId); if (null == clusterImpl) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId); // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID ); // if (clusterImpl != clusterImpl2) -// throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); +// throw new RuntimeDatabaseException("ClusterTable corrupted."); if (proxy.clusterId != clusterImpl.clusterId) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); if (proxy.clusterKey != clusterImpl.clusterKey) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator); return (ClusterSmall)create(sentinel); } @@ -269,27 +276,48 @@ public final class ClusterTable implements IClusterTable { writeOnlyClusters.add(proxy); return clusters.create(proxy); } else { + //printMaps("makeCluster"); ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator); clusters.create(cluster); if (!cluster.isLoaded()) Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded")); importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster)); + if (VALIDATE_SIZE) + validateSize("makeCluster"); if(collectorPolicy != null) collectorPolicy.added(cluster); return cluster; } } - synchronized void replaceCluster(ClusterI cluster) { + synchronized void replaceCluster(ClusterI cluster_) { + ClusterImpl cluster = (ClusterImpl) cluster_; checkCollect(); int clusterKey = cluster.getClusterKey(); - ClusterI existing = clusterArray[clusterKey]; + ClusterImpl existing = (ClusterImpl) clusterArray[clusterKey]; if (existing.hasVirtual()) cluster.markVirtual(); + if (existing.cc != null) { + if (existing.isLoaded()) { + // This shall be promoted to actual exception in the future - + // for now, minimal changes + new Exception("Trying to replace cluster with pending changes " + existing.getClusterUID()) + .printStackTrace(); + } else { + // Adopt changes to loaded cluster + cluster.cc = existing.cc; + cluster.cc.adopt(cluster); + cluster.foreignLookup = existing.foreignLookup; + cluster.change = existing.change; + } + } + importanceMap.remove(existing.getImportance()); if (collectorPolicy != null) collectorPolicy.removed((ClusterImpl)existing); + //System.out.println("ClusterTable.replace(" + existing + " (I=" + existing.getImportance() + ") => " + cluster + " (I=" + cluster.getImportance() + ")"); + clusters.replace((ClusterImpl)cluster); if (!cluster.isLoaded()) Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded")); @@ -297,10 +325,15 @@ public final class ClusterTable implements IClusterTable { importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster)); if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster); - if (!dirtySizeInBytes && !existing.isLoaded()) { - sizeInBytes += cluster.getCachedSize(); + if (!dirtySizeInBytes) { + if (existing != cluster) { + adjustCachedSize(-existing.getCachedSize(), existing); + } + // This will update sizeInBytes through adjustCachedSize + cluster.getCachedSize(); } - + if (VALIDATE_SIZE) + validateSize("replaceCluster"); } synchronized void release(CollectorCluster cluster) { @@ -309,11 +342,14 @@ public final class ClusterTable implements IClusterTable { } synchronized void release(long clusterId) { + //System.out.println("ClusterTable.release(" + clusterId + "): " + sizeInBytes); + //validateSize(); ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId); if (null == clusterImpl) return; if(!clusterImpl.isLoaded() || clusterImpl.isEmpty()) return; + //printMaps("release"); clusters.freeProxy(clusterImpl); importanceMap.remove(clusterImpl.getImportance()); if (collectorPolicy != null) @@ -321,8 +357,10 @@ public final class ClusterTable implements IClusterTable { if (sessionImpl.writeState != null) sessionImpl.clusterStream.flush(clusterImpl.clusterUID); if (!dirtySizeInBytes) { - sizeInBytes -= clusterImpl.getCachedSize(); + adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl); } + if (VALIDATE_SIZE) + validateSize("release"); } synchronized void compact(long id) { @@ -529,17 +567,19 @@ public final class ClusterTable implements IClusterTable { ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly) throws DatabaseException { + ClusterImpl result = null; if (Constants.NewClusterId == newResourceClusterId) { newResourceClusterId = graphSession.newClusterId(); - return getClusterByClusterIdOrMake(newResourceClusterId, writeOnly); + result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly); } else { ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId); if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) { newResourceClusterId = graphSession.newClusterId(); cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly); } - return cluster; + result = cluster; } + return ensureLoaded(result); } void flushCluster(GraphSession graphSession) { @@ -555,7 +595,7 @@ public final class ClusterTable implements IClusterTable { void removeWriteOnlyClusters() { for (ClusterI proxy : writeOnlyClusters) { if (!(proxy instanceof ClusterImpl)) - throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); + throw new RuntimeDatabaseException("ClusterTable corrupted."); clusters.freeProxy((ClusterImpl)proxy); } writeOnlyClusters.clear(); @@ -564,7 +604,6 @@ public final class ClusterTable implements IClusterTable { public boolean execute(int clusterKey) { ClusterImpl proxy = clusterArray[clusterKey]; ClusterUID clusterUID = proxy.getClusterUID(); - System.err.println("writeOnlyInvalidate " + clusterUID); clusters.freeProxy(proxy); return true; } @@ -677,6 +716,7 @@ public final class ClusterTable implements IClusterTable { ClientChangesImpl cs = new ClientChangesImpl(session); if (session.clientChanges == null) session.clientChanges = cs; + //printMaps("refresh"); for (int i=0; i T getClusterByResourceKey(final int resourceKey) { - int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); - if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) - throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); - ClusterI c = clusterArray[clusterKey]; - if (c == null) - return null; - if (c.isLoaded()) { - if ((counter++ & 4095) == 0) - refreshImportance((ClusterImpl) c); - return (T) c; - } - if (!(c instanceof ClusterSmall)) { - Logger.defaultLogError("Proxy must be instance of ClusterSmall"); - return null; - } + private T ensureLoaded(T c) { ClusterI cluster; - ClusterSmall cs = (ClusterSmall) c; + ClusterImpl cs = (ClusterImpl) c; try { if(DebugPolicy.REPORT_CLUSTER_LOADING) { long start = System.nanoTime(); @@ -861,13 +892,32 @@ public final class ClusterTable implements IClusterTable { Logger.defaultLogError(e); if (DebugPolicy.REPORT_CLUSTER_EVENTS) e.printStackTrace(); - String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey - + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535)))); + String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535)))); // TODO: this jams the system => needs refactoring. throw new RuntimeDatabaseException(msg, e); } return (T) cluster; } + + @SuppressWarnings("unchecked") + public synchronized final T getClusterByResourceKey(final int resourceKey) { + int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); + if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) + throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); + ClusterI c = clusterArray[clusterKey]; + if (c == null) + return null; + if (c.isLoaded()) { + if ((counter++ & 4095) == 0) + refreshImportance((ClusterImpl) c); + return (T) c; + } + if (!(c instanceof ClusterSmall)) { + Logger.defaultLogError("Proxy must be instance of ClusterSmall"); + return null; + } + return ensureLoaded((T)c); + } @SuppressWarnings("unchecked") final T checkedGetClusterByResourceKey(final int resourceKey) { @@ -936,12 +986,9 @@ public final class ClusterTable implements IClusterTable { final Semaphore s = new Semaphore(0); final DatabaseException[] ex = new DatabaseException[1]; - load2(clusterId, clusterKey, new Callback() { - @Override - public void run(DatabaseException e) { - ex[0] = e; - s.release(); - } + load2(clusterId, clusterKey, e -> { + ex[0] = e; + s.release(); }); try { s.acquire(); @@ -1014,7 +1061,7 @@ public final class ClusterTable implements IClusterTable { } } - public synchronized void load2(long clusterId, int clusterKey, final Callback runnable) { + public synchronized void load2(long clusterId, int clusterKey, final Consumer runnable) { assert (Constants.ReservedClusterId != clusterId); @@ -1056,7 +1103,7 @@ public final class ClusterTable implements IClusterTable { e = new DatabaseException("Load cluster failed.", t); } if (null == cluster) { - runnable.run(e); + runnable.accept(e); return; } @@ -1065,7 +1112,7 @@ public final class ClusterTable implements IClusterTable { // Can not be called with null argument. replaceCluster(cluster); sessionImpl.onClusterLoaded(clusterId); - runnable.run(null); + runnable.accept(null); } @@ -1117,7 +1164,7 @@ public final class ClusterTable implements IClusterTable { } public long timeCounter() { - return timeCounter++; + return timeCounter.getAndIncrement(); } public boolean hasVirtual(int index) { @@ -1157,4 +1204,74 @@ public final class ClusterTable implements IClusterTable { return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second)); } + public void adjustCachedSize(long l, ClusterI cluster) { + if (l != 0) { + //System.out.println("ClusterTable: adjusting cluster table cached size by " + l + ": " + sizeInBytes + " -> " + // + (sizeInBytes + l) + ", for cluster " + cluster.getClusterId() + " (" + cluster + ")"); + sizeInBytes += l; + } + } + + private void validateSize(String place) { + if (!VALIDATE_SIZE) + return; + + int ims = importanceMap.size(); + int ihms = countImportantClusters(); + +// System.out.format("[ClusterTable.%s] Validating: byteSize=%d (%d MB), hashMap=%d/%d, importanceMap=%d%n", +// place, sizeInBytes, sizeInBytes / (1024*1024), ihms, clusters.hashMap.size(), ims); + + int i = clusterArray.length; + long size = 0; + for (int j = 0; j < i; ++j) { + ClusterI c = clusterArray[j]; + if (c == null) + continue; + size += c.getCachedSize(); + } + if (sizeInBytes != size) { + if (!dirtySizeInBytes) + System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size)); + //else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size)); + } + if (ims != ihms) { + System.out.println("BUG2: hashmap and importanceMap sizes differ: " + ihms + " != " + ims + ", delta=" + (ihms - ims)); + printMaps("validateSize"); + } + //System.out.println("[" + place + "] VALIDATED"); + } + + private void printMaps(String place) { + int ihms = countImportantClusters(); + System.out.println("## printMaps(" + place + ") - " + importanceMap.size() + " - (" + ihms + "/" + clusters.hashMap.size() + ")"); + System.out.println("importanceMap (" + importanceMap.size() + "):"); + importanceMap.forEach((importance, ie) -> { + System.out.format("\t%d: %s%n", importance, ie.toString()); + }); + System.out.println("clusters.hashMap (" + ihms + "/" + clusters.hashMap.size() + "):"); + clusters.hashMap.forEachEntry((cid, c) -> { + boolean important = importantCluster(c); + boolean wo = c != null && c.isWriteOnly(); + boolean empty = c != null && c.isEmpty(); + boolean loaded = c != null && c.isLoaded(); + System.out.format("\t%s: %d - %s (writeOnly=%b, empty=%b, loaded=%b)%n", important ? " I" : "NI", cid, c, wo, empty, loaded); + return true; + }); + } + + private int countImportantClusters() { + int[] result = { 0 }; + clusters.hashMap.forEachEntry((cid, c) -> { + if (importantCluster(c)) + result[0]++; + return true; + }); + return result[0]; + } + + private static boolean importantCluster(ClusterImpl c) { + return c != null && !c.isWriteOnly() && c.isLoaded();// && !c.isEmpty(); + } + }