import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.simantics.databoard.Bindings;
import org.simantics.db.Database.Session.ClusterChanges;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.SessionVariables;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.ClusterDoesNotExistException;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.ResourceNotFoundException;
import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
import org.simantics.db.service.ClusterUID;
import org.simantics.utils.Development;
+import org.slf4j.LoggerFactory;
import fi.vtt.simantics.procore.DebugPolicy;
import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
public final class ClusterTable implements IClusterTable {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ClusterTable.class);
+
private static final boolean VALIDATE_SIZE = false;
int maximumBytes = 128 * 1024 * 1024;
int limit = (int)(0.8*(double)maximumBytes);
- long timeCounter = 0;
+ private final AtomicLong timeCounter = new AtomicLong(1);
final private SessionImplSocket sessionImpl;
final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
public long getClusterId() {
return clusterId;
}
+ @Override
+ public String toString() {
+ return "CID " + clusterId;
+ }
}
private class Clusters {
// This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
int clusterKey = hashMap.size();
ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
if (sentinel.clusterId != sentinel.clusterUID.second)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
create(sentinel);
return sentinel;
}
create(proxy);
if (null != old) {
if (old.clusterKey != proxy.clusterKey)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
if (old.clusterId != proxy.clusterId)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
if (!old.clusterUID.equals(proxy.clusterUID))
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
}
}
private ClusterSmall freeProxy(ClusterImpl proxy) {
ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
if (null == clusterImpl)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
// ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
// ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
// if (clusterImpl != clusterImpl2)
-// throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+// throw new RuntimeDatabaseException("ClusterTable corrupted.");
if (proxy.clusterId != clusterImpl.clusterId)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
if (proxy.clusterKey != clusterImpl.clusterKey)
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
return (ClusterSmall)create(sentinel);
}
writeOnlyClusters.add(proxy);
return clusters.create(proxy);
} else {
+ //printMaps("makeCluster");
ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
clusters.create(cluster);
if (!cluster.isLoaded())
- Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
+ LOGGER.error("", new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
+ if (VALIDATE_SIZE)
+ validateSize("makeCluster");
if(collectorPolicy != null) collectorPolicy.added(cluster);
return cluster;
}
}
- synchronized void replaceCluster(ClusterI cluster) {
+ synchronized void replaceCluster(ClusterI cluster_) {
+ ClusterImpl cluster = (ClusterImpl) cluster_;
checkCollect();
int clusterKey = cluster.getClusterKey();
- ClusterI existing = clusterArray[clusterKey];
+ ClusterImpl existing = (ClusterImpl) clusterArray[clusterKey];
if (existing.hasVirtual())
cluster.markVirtual();
+ if (existing.cc != null) {
+ if (existing.isLoaded()) {
+ // This shall be promoted to actual exception in the future -
+ // for now, minimal changes
+ new Exception("Trying to replace cluster with pending changes " + existing.getClusterUID())
+ .printStackTrace();
+ } else {
+ // Adopt changes to loaded cluster
+ cluster.cc = existing.cc;
+ cluster.cc.adopt(cluster);
+ cluster.foreignLookup = existing.foreignLookup;
+ cluster.change = existing.change;
+ }
+ }
+
importanceMap.remove(existing.getImportance());
if (collectorPolicy != null)
collectorPolicy.removed((ClusterImpl)existing);
+ //System.out.println("ClusterTable.replace(" + existing + " (I=" + existing.getImportance() + ") => " + cluster + " (I=" + cluster.getImportance() + ")");
+
clusters.replace((ClusterImpl)cluster);
if (!cluster.isLoaded())
- Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
+ LOGGER.error("", new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
// This will update sizeInBytes through adjustCachedSize
cluster.getCachedSize();
}
- validateSize();
+ if (VALIDATE_SIZE)
+ validateSize("replaceCluster");
}
synchronized void release(CollectorCluster cluster) {
return;
if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
return;
+ //printMaps("release");
clusters.freeProxy(clusterImpl);
importanceMap.remove(clusterImpl.getImportance());
if (collectorPolicy != null)
if (!dirtySizeInBytes) {
adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl);
}
+ if (VALIDATE_SIZE)
+ validateSize("release");
}
synchronized void compact(long id) {
result += cluster.getCachedSize();
}
} catch (Throwable t) {
- Logger.defaultLogError(t);
+ LOGGER.error("Could not calculate size", t);
}
}
return true;
ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
for (CollectorCluster cluster : support.getResidentClusters()) {
- target -= support.getClusterSize(cluster);
- if (target > 0) {
- toRelease.add(cluster);
- } else {
+ toRelease.add(cluster);
+ long clusterSize = support.getClusterSize(cluster);
+ //System.err.println("release cluster with " + (clusterSize/1024) + " kiB - " + cluster);
+ target -= clusterSize;
+ if (target <= 0) {
break;
}
}
void removeWriteOnlyClusters() {
for (ClusterI proxy : writeOnlyClusters) {
if (!(proxy instanceof ClusterImpl))
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ throw new RuntimeDatabaseException("ClusterTable corrupted.");
clusters.freeProxy((ClusterImpl)proxy);
}
writeOnlyClusters.clear();
public boolean execute(int clusterKey) {
ClusterImpl proxy = clusterArray[clusterKey];
ClusterUID clusterUID = proxy.getClusterUID();
- System.err.println("writeOnlyInvalidate " + clusterUID);
clusters.freeProxy(proxy);
return true;
}
final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
- Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
+ LOGGER.error("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
return 0;
}
ClusterI c = clusterArray[clusterKey];
if (c == null) {
- Logger.defaultLogError("No cluster for key " + resourceKey);
+ LOGGER.error("No cluster for key " + resourceKey);
return 0;
}
return c.getClusterId();
ClientChangesImpl cs = new ClientChangesImpl(session);
if (session.clientChanges == null)
session.clientChanges = cs;
+ //printMaps("refresh");
for (int i=0; i<clusterUID.length; ++i) {
try {
if (DebugPolicy.REPORT_CLUSTER_EVENTS)
collectorPolicy.removed(oldCluster);
clusters.replace(newCluster);
if (!newCluster.isLoaded())
- Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
+ LOGGER.error("", new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
if (collectorPolicy != null)
collectorPolicy.added(newCluster);
// Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
} catch (Throwable t) {
- Logger.defaultLogError("Failed to load cluster in refresh.", t);
+ LOGGER.error("Failed to load cluster in refresh.", t);
}
}
+ if (VALIDATE_SIZE)
+ validateSize("refresh");
// Fake update of cluster changes.
QueryProcessor queryProcessor = session.getQueryProvider2();
WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
th = new TaskHelper("Refresh");
session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
try {
- session.getQueryProvider2().performDirtyUpdates(writer);
+ session.getQueryProvider2().propagateChangesInQueryCache(writer);
session.fireMetadataListeners(writer, cs);
- session.getQueryProvider2().performScheduledUpdates(writer);
+ session.getQueryProvider2().listening.fireListeners(writer);
session.fireReactionsToSynchronize(cs);
session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
session.printDiagnostics();
} finally {
if (null != th)
session.writeState = null;
+ cs.dispose();
}
}
}
try {
cc = session.graphSession.getClusterChanges(clusterUID, csid);
} catch (Exception e) {
- Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
+ LOGGER.error("Could not get cluster changes. cluster=" + clusterUID, e);
release(clusterId);
return;
}
System.err.println("value " + cc.getValueIndex()[i] + " changed.");
}
}
- final void refreshImportance(ClusterImpl c) {
+ final synchronized void refreshImportance(ClusterImpl c) {
if (c.isWriteOnly())
return;
+ //printMaps("refreshImportance");
+
importanceMap.remove(c.getImportance());
if(collectorPolicy != null) collectorPolicy.removed(c);
// System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
c.setImportance(newImportance);
if (!c.isLoaded())
- Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
+ LOGGER.error("", new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
importanceMap.put(c.getImportance(), new ImportanceEntry(c));
if(collectorPolicy != null) collectorPolicy.added(c);
+ if (VALIDATE_SIZE)
+ validateSize("refreshImportance");
}
cluster = load2(cs.getClusterId(), cs.getClusterKey());
}
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("Could not load cluster", e);
if (DebugPolicy.REPORT_CLUSTER_EVENTS)
e.printStackTrace();
String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
}
@SuppressWarnings("unchecked")
- public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
+ public synchronized final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
return (T) c;
}
if (!(c instanceof ClusterSmall)) {
- Logger.defaultLogError("Proxy must be instance of ClusterSmall");
+ LOGGER.error("Proxy must be instance of ClusterSmall");
return null;
}
return ensureLoaded((T)c);
return (T) c;
}
if (!(c instanceof ClusterSmall)) {
- Logger.defaultLogError("Proxy must be instance of ClusterSmall");
+ LOGGER.error("Proxy must be instance of ClusterSmall");
return null;
}
ClusterI cluster;
int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
- Logger.defaultLogError(msg, e);
+ LOGGER.error(msg, e);
c.setDeleted(true, null);
return (T)c;
}
try {
s.acquire();
} catch (InterruptedException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("unable to acquire", e);
}
if (null != ex[0])
throw ex[0];
} catch (Throwable t) {
// It's totally legal to call for non-existing cluster.
// Sometimes this indicates an error, though.
- Logger.getDefault().logInfo("Load cluster failed.", t);
+ LOGGER.error("Load cluster failed", t);
if (t instanceof DatabaseException)
e = (DatabaseException) t;
else
}
public long timeCounter() {
- return timeCounter++;
+ return timeCounter.getAndIncrement();
}
public boolean hasVirtual(int index) {
}
}
- private void validateSize() {
+ private void validateSize(String place) {
if (!VALIDATE_SIZE)
return;
-// System.out.println("validating cached cluster sizes: " + sizeInBytes + ", hashMap.size="
-// + clusters.hashMap.size() + ", importanceMap.size=" + importanceMap.size());
+ int ims = importanceMap.size();
+ int ihms = countImportantClusters();
+
+// System.out.format("[ClusterTable.%s] Validating: byteSize=%d (%d MB), hashMap=%d/%d, importanceMap=%d%n",
+// place, sizeInBytes, sizeInBytes / (1024*1024), ihms, clusters.hashMap.size(), ims);
int i = clusterArray.length;
long size = 0;
System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
//else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
}
+ if (ims != ihms) {
+ System.out.println("BUG2: hashmap and importanceMap sizes differ: " + ihms + " != " + ims + ", delta=" + (ihms - ims));
+ printMaps("validateSize");
+ }
+ //System.out.println("[" + place + "] VALIDATED");
+ }
- int ims = importanceMap.size();
- int[] hms = {0};
+ private void printMaps(String place) {
+ int ihms = countImportantClusters();
+ System.out.println("## printMaps(" + place + ") - " + importanceMap.size() + " - (" + ihms + "/" + clusters.hashMap.size() + ")");
+ System.out.println("importanceMap (" + importanceMap.size() + "):");
+ importanceMap.forEach((importance, ie) -> {
+ System.out.format("\t%d: %s%n", importance, ie.toString());
+ });
+ System.out.println("clusters.hashMap (" + ihms + "/" + clusters.hashMap.size() + "):");
clusters.hashMap.forEachEntry((cid, c) -> {
- if (c != null && !c.isWriteOnly() && !c.isEmpty() && c.isLoaded()) {
- //System.out.println(cid + ": " + c);
- hms[0]++;
- }
+ boolean important = importantCluster(c);
+ boolean wo = c != null && c.isWriteOnly();
+ boolean empty = c != null && c.isEmpty();
+ boolean loaded = c != null && c.isLoaded();
+ System.out.format("\t%s: %d - %s (writeOnly=%b, empty=%b, loaded=%b)%n", important ? " I" : "NI", cid, c, wo, empty, loaded);
return true;
});
- if (Math.abs(ims-hms[0]) > 0) {
- System.out.println("BUG2: hashmap and importanceMap sizes differ: " + hms[0] + " != " + ims + ", delta=" + (hms[0] - ims));
- }
+ }
+
+ private int countImportantClusters() {
+ int[] result = { 0 };
+ clusters.hashMap.forEachEntry((cid, c) -> {
+ if (importantCluster(c))
+ result[0]++;
+ return true;
+ });
+ return result[0];
+ }
+
+ private static boolean importantCluster(ClusterImpl c) {
+ return c != null && !c.isWriteOnly() && c.isLoaded();// && !c.isEmpty();
}
}