-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.File;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.HashMap;\r
-import java.util.Map;\r
-import java.util.TreeMap;\r
-import java.util.concurrent.Semaphore;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.db.ClusterCreator;\r
-import org.simantics.db.Database;\r
-import org.simantics.db.DevelopmentKeys;\r
-import org.simantics.db.SessionVariables;\r
-import org.simantics.db.Database.Session.ClusterChanges;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.ClusterDoesNotExistException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.ResourceNotFoundException;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.ClusterBase;\r
-import org.simantics.db.impl.ClusterI;\r
-import org.simantics.db.impl.ClusterSupport;\r
-import org.simantics.db.impl.ClusterTraitsBase;\r
-import org.simantics.db.impl.IClusterTable;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.query.QueryProcessor;\r
-import org.simantics.db.procore.cluster.ClusterBig;\r
-import org.simantics.db.procore.cluster.ClusterImpl;\r
-import org.simantics.db.procore.cluster.ClusterSmall;\r
-import org.simantics.db.procore.cluster.ClusterTraits;\r
-import org.simantics.db.procore.protocol.Constants;\r
-import org.simantics.db.service.ClusterCollectorPolicy;\r
-import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;\r
-import org.simantics.db.service.ClusterUID;\r
-import org.simantics.utils.Development;\r
-import org.simantics.utils.datastructures.Callback;\r
-\r
-import fi.vtt.simantics.procore.DebugPolicy;\r
-import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;\r
-import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;\r
-import gnu.trove.map.hash.TLongIntHashMap;\r
-import gnu.trove.map.hash.TLongObjectHashMap;\r
-import gnu.trove.procedure.TIntProcedure;\r
-import gnu.trove.procedure.TLongObjectProcedure;\r
-import gnu.trove.procedure.TObjectProcedure;\r
-import gnu.trove.set.hash.TIntHashSet;\r
-\r
-public final class ClusterTable implements IClusterTable {\r
-\r
- int maximumBytes = 128 * 1024 * 1024;\r
- int limit = (int)(0.8*(double)maximumBytes);\r
-\r
- long timeCounter = 0;\r
-\r
- final private SessionImplSocket sessionImpl;\r
- final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();\r
- final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();\r
- final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();\r
- final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];\r
- final private Boolean[] immutables = new Boolean[ARRAY_SIZE];\r
- final private boolean[] virtuals = new boolean[ARRAY_SIZE];\r
- static class ImportanceEntry implements CollectorCluster {\r
- final public long importance;\r
- final public long clusterId;\r
- public ImportanceEntry(ClusterImpl impl) {\r
- this(impl.getImportance(), impl.getClusterId());\r
- }\r
-\r
- public ImportanceEntry(long importance, long clusterId) {\r
- this.importance = importance;\r
- this.clusterId = clusterId;\r
- }\r
- @Override\r
- public long getImportance() {\r
- return importance;\r
- }\r
- @Override\r
- public long getClusterId() {\r
- return clusterId;\r
- }\r
- }\r
- private class Clusters {\r
- // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)\r
- final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);\r
- //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.\r
- private Clusters() {\r
- clear();\r
- }\r
- private void clear() {\r
- hashMap.clear();\r
- // clusterU2I.clear();\r
- hashMap.put(0, null); // reserved for null value\r
- // clusterU2I.put(ClusterUID.make(0, 0), null);\r
- }\r
- private int size() {\r
- return hashMap.size();\r
- }\r
- private ClusterImpl getClusterByClusterId(long clusterId) {\r
- return hashMap.get(clusterId);\r
- }\r
- private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {\r
- return getClusterByClusterId(clusterUID.second);\r
- //return clusterU2I.get(clusterUID);\r
- }\r
- private ClusterImpl getClusterByClusterUID(long id1, long id2) {\r
- return getClusterByClusterId(id2);\r
- }\r
- private ClusterImpl makeProxy(long clusterId) {\r
- return makeProxy(ClusterUID.make(0, clusterId));\r
- }\r
- private ClusterImpl makeProxy(ClusterUID clusterUID) {\r
- ClusterImpl proxy = hashMap.get(clusterUID.second);\r
- if (null != proxy)\r
- return proxy;\r
- int clusterKey = hashMap.size();\r
- ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);\r
- if (sentinel.clusterId != sentinel.clusterUID.second)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- create(sentinel);\r
- return sentinel;\r
- }\r
- private void replace(ClusterImpl proxy) {\r
- ClusterImpl old = clusterArray[proxy.clusterKey];\r
- create(proxy);\r
- if (null != old) {\r
- if (old.clusterKey != proxy.clusterKey)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- if (old.clusterId != proxy.clusterId)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- if (!old.clusterUID.equals(proxy.clusterUID))\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- }\r
- }\r
- private ClusterSmall freeProxy(ClusterImpl proxy) {\r
- ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);\r
- if (null == clusterImpl)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
-// ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);\r
-// ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );\r
-// if (clusterImpl != clusterImpl2)\r
-// throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- if (proxy.clusterId != clusterImpl.clusterId)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- if (proxy.clusterKey != clusterImpl.clusterKey)\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);\r
- return (ClusterSmall)create(sentinel);\r
- }\r
- private ClusterImpl create(ClusterImpl clusterImpl) {\r
- hashMap.put(clusterImpl.clusterId, clusterImpl);\r
-// clusterU2I.put(clusterImpl.clusterUID, clusterImpl);\r
- clusterArray[clusterImpl.clusterKey] = clusterImpl;\r
- return clusterImpl;\r
- }\r
- private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {\r
- hashMap.forEachValue(procedure);\r
- }\r
- private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {\r
- hashMap.forEachEntry(procedure);\r
- }\r
- private ClusterUID makeClusterUID(long clusterId) {\r
- return ClusterUID.make(0, clusterId);\r
- }\r
- private void removeProxy(ClusterImpl proxy) {\r
- hashMap.remove(proxy.clusterId);\r
- clusterArray[proxy.clusterKey] = null;\r
- }\r
- }\r
- final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();\r
-\r
- private ClusterCollectorPolicy collectorPolicy;\r
- private boolean dirtySizeInBytes = true;\r
- private long sizeInBytes = 0;\r
- private final Clusters clusters;\r
- ClusterUID makeClusterUID(long clusterId) {\r
- return clusters.makeClusterUID(clusterId);\r
- }\r
- public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {\r
- int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);\r
- return clusterArray[clusterKey].clusterUID;\r
- }\r
- ClusterTable(SessionImplSocket sessionImpl, File folderPath) {\r
- this.sessionImpl = sessionImpl;\r
- clusters = new Clusters();\r
- }\r
-\r
- void dispose() {\r
- clusters.clear();\r
- importanceMap.clear();\r
- }\r
-\r
- public void setCollectorPolicy(ClusterCollectorPolicy policy) {\r
- this.collectorPolicy = policy;\r
- }\r
-\r
- /**\r
- * Sets Cluster Table allocation size by percentage of maximum usable\r
- * memory. Allocation is limited between 5% and 50% of maximum memory.\r
- *\r
- * @param precentage\r
- */\r
- private void setAllocateSize(double percentage) {\r
-\r
- percentage = Math.max(percentage, 0.05);\r
- percentage = Math.min(percentage, 0.5);\r
-\r
- maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());\r
- }\r
-\r
- private double estimateProperAllocation(ClusterCollectorSupport support) {\r
- long max = Runtime.getRuntime().maxMemory();\r
- long free = Runtime.getRuntime().freeMemory();\r
- long total = Runtime.getRuntime().totalMemory();\r
- long realFree = max - total + free; // amount of free memory\r
- long current = support.getCurrentSize(); // currently allocated cache\r
- long inUseTotal = max - realFree; // currently used memory\r
- long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)\r
- double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses\r
- double aimFree = 0.2; // percentage of maximum heap that we try to keep free\r
- double estimate = 1.0 - otherUsePercentage - aimFree;\r
- estimate = Math.min(estimate, 0.5);\r
- estimate = Math.max(estimate, 0.05);\r
- // System.out.println("Estimated allocation percentage " + estimate +\r
-// " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +\r
-// inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);\r
- return estimate;\r
- }\r
-\r
- void checkCollect() {\r
- collector.collect();\r
- }\r
-\r
- synchronized ClusterImpl getClusterByClusterId(long clusterId) {\r
- return clusters.getClusterByClusterId(clusterId);\r
- }\r
-\r
- ClusterBase getClusterByClusterKey(int clusterKey) {\r
- return clusterArray[clusterKey];\r
- }\r
-\r
- synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {\r
- if (clusterUID.second != clusterId)\r
- throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);\r
- return clusters.makeProxy(clusterUID);\r
- }\r
-\r
- synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {\r
- checkCollect();\r
- ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
- if (null != proxy)\r
- return proxy;\r
- ClusterUID clusterUID = ClusterUID.make(0, clusterId);\r
- int clusterKey = clusters.size(); // new key\r
- if (writeOnly) {\r
- proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);\r
- writeOnlyClusters.add(proxy);\r
- return clusters.create(proxy);\r
- } else {\r
- ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);\r
- clusters.create(cluster);\r
- if (!cluster.isLoaded())\r
- Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));\r
- importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));\r
- if(collectorPolicy != null) collectorPolicy.added(cluster);\r
- return cluster;\r
- }\r
- }\r
-\r
- synchronized void replaceCluster(ClusterI cluster) {\r
- checkCollect();\r
- int clusterKey = cluster.getClusterKey();\r
- ClusterI existing = clusterArray[clusterKey];\r
- if (existing.hasVirtual())\r
- cluster.markVirtual();\r
-\r
- importanceMap.remove(existing.getImportance());\r
- if (collectorPolicy != null)\r
- collectorPolicy.removed((ClusterImpl)existing);\r
-\r
- clusters.replace((ClusterImpl)cluster);\r
- if (!cluster.isLoaded())\r
- Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));\r
-\r
- importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));\r
- if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);\r
-\r
- if (!dirtySizeInBytes && !existing.isLoaded()) {\r
- sizeInBytes += cluster.getCachedSize();\r
- }\r
-\r
- }\r
-\r
- synchronized void release(CollectorCluster cluster) {\r
- importanceMap.remove(cluster.getImportance());\r
- release(cluster.getClusterId());\r
- }\r
-\r
- synchronized void release(long clusterId) {\r
- ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);\r
- if (null == clusterImpl)\r
- return;\r
- if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())\r
- return;\r
- clusters.freeProxy(clusterImpl);\r
- importanceMap.remove(clusterImpl.getImportance());\r
- if (collectorPolicy != null)\r
- collectorPolicy.removed(clusterImpl);\r
- if (sessionImpl.writeState != null)\r
- sessionImpl.clusterStream.flush(clusterImpl.clusterUID);\r
- if (!dirtySizeInBytes) {\r
- sizeInBytes -= clusterImpl.getCachedSize();\r
- }\r
- }\r
-\r
- synchronized void compact(long id) {\r
- ClusterI impl = clusters.getClusterByClusterId(id);\r
- if (impl != null)\r
- impl.compact();\r
- }\r
-\r
- void updateSize() {\r
-// cachedSize = getSizeInBytes();\r
- }\r
-\r
- double getLoadProbability() {\r
- // This can currently cause stack overflow\r
- return 1.0;\r
-// if(cachedSize < SLOW_LIMIT) return 1.0;\r
-// if(cachedSize > HIGH_LIMIT) return 1e-2;\r
-//\r
-// double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -\r
-// SLOW_LIMIT);\r
-// double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;\r
-// return val;\r
- }\r
-\r
- class SizeProcedure implements TObjectProcedure<ClusterImpl> {\r
- public long result = 0;\r
-\r
- @Override\r
- public boolean execute(ClusterImpl cluster) {\r
- if (cluster != null) {\r
- try {\r
- if (cluster.isLoaded() && !cluster.isEmpty()) {\r
- result += cluster.getCachedSize();\r
- }\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
- return true;\r
- }\r
-\r
- public void clear() {\r
- result = 0;\r
- }\r
-\r
- };\r
-\r
- private SizeProcedure sizeProcedure = new SizeProcedure();\r
- long getSizeInBytes() {\r
- if (dirtySizeInBytes) {\r
- sizeProcedure.clear();\r
- clusters.forEachValue(sizeProcedure);\r
- sizeInBytes = sizeProcedure.result;\r
- // System.err.println("recomputed size of clusterTable => " + sizeInBytes);\r
- setDirtySizeInBytes(false);\r
- }\r
- return sizeInBytes;\r
- }\r
-\r
- public void setDirtySizeInBytes(boolean value) {\r
- dirtySizeInBytes = value;\r
- }\r
-\r
- ClusterStateImpl getState() {\r
- final ClusterStateImpl result = new ClusterStateImpl();\r
- clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {\r
- @Override\r
- public boolean execute(long arg0, ClusterImpl arg1) {\r
- if (arg1 == null)\r
- return true;\r
- if (arg1.isLoaded() && !arg1.isEmpty()) {\r
- result.ids.add(new ImportanceEntry(arg1));\r
- }\r
- return true;\r
- }\r
- });\r
- return result;\r
- }\r
-\r
- void restoreState(ClusterStateImpl state) {\r
- ClusterStateImpl current = getState();\r
- for (CollectorCluster id : current.ids)\r
- if (!state.ids.contains(id))\r
- collectorSupport.release(id);\r
- }\r
-\r
- class ClusterCollectorImpl implements ClusterCollector {\r
-\r
- final private ClusterCollectorSupport support;\r
- private ClusterCollectorPolicy policy;\r
- ClusterCollectorImpl(ClusterCollectorSupport support) {\r
- this.support = support;\r
- }\r
-\r
- ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {\r
-\r
- ClusterCollectorPolicy oldPolicy = policy;\r
- policy = newPolicy;\r
-\r
- if(policy != null) {\r
- for (CollectorCluster id : support.getResidentClusters()) {\r
- policy.added(getClusterByClusterId(id.getClusterId()));\r
- }\r
- }\r
-\r
- support.setPolicy(policy);\r
-\r
- return oldPolicy;\r
-\r
- }\r
-\r
- @Override\r
- public void collect() {\r
-\r
- if(policy != null) {\r
-\r
- release(policy.select());\r
-\r
- } else {\r
-\r
- int size = support.getCurrentSize();\r
- boolean dynamicAllocation = useDynamicAllocation();\r
- if (dynamicAllocation)\r
- setAllocateSize(estimateProperAllocation(support));\r
- if (DebugPolicy.CLUSTER_COLLECTION) {\r
- System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);\r
- }\r
- if (dynamicAllocation) {\r
- int collectSize = maximumBytes / 2;\r
- collectSize = Math.min(collectSize, 32 * 1024 * 1024);\r
- // try to keep allocated clusters below the maximum\r
- if (maximumBytes - size > collectSize)\r
- return;\r
- collectSize += size - maximumBytes;\r
- collect(collectSize);\r
- } else {\r
- // try to keep allocated clusters below the maximum\r
- if (size < maximumBytes)\r
- return;\r
- // shave off 20%\r
- collect(size-limit);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- private boolean useDynamicAllocation() {\r
- return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));\r
- }\r
-\r
- @Override\r
- public void collect(int target) {\r
-\r
- if(policy != null) {\r
-\r
- release(policy.select(target));\r
-\r
- } else {\r
-\r
- ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();\r
-\r
- for (CollectorCluster cluster : support.getResidentClusters()) {\r
- target -= support.getClusterSize(cluster);\r
- if (target > 0) {\r
- toRelease.add(cluster);\r
- } else {\r
- break;\r
- }\r
- }\r
-\r
- release(toRelease);\r
-\r
- if (DebugPolicy.CLUSTER_COLLECTION) {\r
- System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- void release(Collection<CollectorCluster> toRelease) {\r
- for (CollectorCluster id : toRelease) {\r
- support.release(id);\r
- }\r
- }\r
-\r
- }\r
-\r
- private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);\r
- ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);\r
-\r
- void gc() {\r
- collector.collect();\r
- }\r
-\r
- private long newResourceClusterId = Constants.NewClusterId;\r
- public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();\r
-\r
- /*\r
- * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��\r
- * vain jo k�yt�ss� olevia klustereita\r
- */\r
-\r
- ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)\r
- throws DatabaseException {\r
- if (Constants.NewClusterId == newResourceClusterId) {\r
- newResourceClusterId = graphSession.newClusterId();\r
- return getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);\r
- } else {\r
- ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);\r
- if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {\r
- newResourceClusterId = graphSession.newClusterId();\r
- cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);\r
- }\r
- return cluster;\r
- }\r
- }\r
-\r
- void flushCluster(GraphSession graphSession) {\r
-// We seem to disagree about this.\r
-// graphSession.newClusterId();\r
- newResourceClusterId = Constants.NewClusterId;\r
- }\r
-\r
- void writeOnlyInvalidate(ClusterI impl) {\r
- writeOnlyInvalidates.add(impl.getClusterKey());\r
- }\r
-\r
- void removeWriteOnlyClusters() {\r
- for (ClusterI proxy : writeOnlyClusters) {\r
- if (!(proxy instanceof ClusterImpl))\r
- throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
- clusters.freeProxy((ClusterImpl)proxy);\r
- }\r
- writeOnlyClusters.clear();\r
- writeOnlyInvalidates.forEach(new TIntProcedure() {\r
- @Override\r
- public boolean execute(int clusterKey) {\r
- ClusterImpl proxy = clusterArray[clusterKey];\r
- ClusterUID clusterUID = proxy.getClusterUID();\r
- System.err.println("writeOnlyInvalidate " + clusterUID);\r
- clusters.freeProxy(proxy);\r
- return true;\r
- }\r
- });\r
- writeOnlyInvalidates.clear();\r
- }\r
-\r
- public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {\r
- synchronized (this) {\r
- return clusters.getClusterByClusterUID(clusterUID);\r
- }\r
- }\r
- public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {\r
- return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);\r
- }\r
- public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {\r
- return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;\r
- }\r
- public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {\r
- synchronized (this) {\r
- ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);\r
- if (null == clusterImpl)\r
- clusterImpl = clusters.makeProxy(clusterUID);\r
- return clusterImpl;\r
- }\r
- }\r
- public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {\r
- synchronized (this) {\r
- ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);\r
- if (null == clusterImpl)\r
- clusterImpl = clusters.makeProxy(id2);\r
- return clusterImpl;\r
- }\r
- }\r
- ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {\r
- synchronized (this) {\r
- ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
- int clusterKey = 0;\r
- if (proxy != null)\r
- if (proxy.isLoaded())\r
- return proxy;\r
- else\r
- clusterKey = proxy.getClusterKey();\r
- try {\r
- if (clusterKey == 0) {\r
- proxy = clusters.makeProxy(clusterId);\r
- clusterKey = proxy.getClusterKey();\r
- }\r
- ClusterImpl ci = tryLoad(clusterId, clusterKey);\r
- if (null == ci)\r
- throw new ResourceNotFoundException(clusterId);\r
- return ci;\r
- } catch (ClusterDoesNotExistException t) {\r
- clusters.removeProxy(proxy);\r
- throw t;\r
- }\r
- }\r
- }\r
-\r
- ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {\r
- synchronized (this) {\r
- ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
- if (proxy != null)\r
- return proxy;\r
- else\r
- return makeCluster(clusterId, writeOnly);\r
- }\r
- }\r
- ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {\r
- synchronized (this) {\r
- ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
- if (null == proxy)\r
- throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");\r
- return proxy;\r
- }\r
- }\r
- long getClusterIdOrCreate(ClusterUID clusterUID) {\r
- return clusterUID.second;\r
- }\r
- final long getClusterIdByResourceKey(final int resourceKey)\r
- throws DatabaseException {\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
- throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
- ClusterI c = clusterArray[clusterKey];\r
- if (c == null)\r
- throw new RuntimeException("No cluster for key " + resourceKey);\r
- return c.getClusterId();\r
- }\r
- final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {\r
- Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);\r
- return 0;\r
- }\r
- ClusterI c = clusterArray[clusterKey];\r
- if (c == null) {\r
- Logger.defaultLogError("No cluster for key " + resourceKey);\r
- return 0;\r
- }\r
- return c.getClusterId();\r
- }\r
-\r
- int counter = 0;\r
-\r
- void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {\r
- synchronized (this) {\r
- session.flushCounter = 0;\r
- session.clusterStream.reallyFlush();\r
- ClientChangesImpl cs = new ClientChangesImpl(session);\r
- if (session.clientChanges == null)\r
- session.clientChanges = cs;\r
- for (int i=0; i<clusterUID.length; ++i) {\r
- try {\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);\r
- ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);\r
- if (null == oldCluster)\r
- continue;\r
- if (!oldCluster.isLoaded())\r
- continue;\r
- int clusterKey = oldCluster.getClusterKey();\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
- continue;\r
- boolean big = oldCluster instanceof ClusterBig;\r
- boolean small = oldCluster instanceof ClusterSmall;\r
- if (!big && !small)\r
- continue;\r
-// ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);\r
- Database.Session dbSession = sessionImpl.graphSession.dbSession;\r
- \r
- ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {\r
- \r
- @Override\r
- public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
- ClusterSupport support = sessionImpl.clusterTranslator;\r
- try {\r
- return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- return null;\r
- }\r
- }\r
- \r
- });\r
- if (null == newCluster)\r
- continue;\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- System.err.println("cluster=" + newCluster + " updated.");\r
- importanceMap.remove(oldCluster.getImportance());\r
- if (collectorPolicy != null)\r
- collectorPolicy.removed(oldCluster);\r
- clusters.replace(newCluster);\r
- if (!newCluster.isLoaded())\r
- Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));\r
- importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));\r
- if (collectorPolicy != null)\r
- collectorPolicy.added(newCluster);\r
- // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.\r
- refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError("Failed to load cluster in refresh.", t);\r
- }\r
- }\r
- // Fake update of cluster changes.\r
- QueryProcessor queryProcessor = session.getQueryProvider2();\r
- WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);\r
- TaskHelper th = null;\r
- if (null == session.writeState) {\r
- th = new TaskHelper("Refresh");\r
- session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);\r
- try {\r
- session.getQueryProvider2().performDirtyUpdates(writer);\r
- session.fireMetadataListeners(writer, cs);\r
- session.getQueryProvider2().performScheduledUpdates(writer);\r
- session.fireReactionsToSynchronize(cs);\r
- session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
- session.printDiagnostics();\r
- } finally {\r
- if (null != th)\r
- session.writeState = null;\r
- }\r
- }\r
- }\r
- }\r
- final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)\r
- throws DatabaseException {\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");\r
- // get cluster change sets\r
- QueryProcessor queryProcessor = session.getQueryProvider2();\r
- ClusterChanges cc;\r
- try {\r
- cc = session.graphSession.getClusterChanges(clusterUID, csid);\r
- } catch (Exception e) {\r
- Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);\r
- release(clusterId);\r
- return;\r
- }\r
- for (int i=0; i<cc.getResourceIndex().length; ++i) {\r
- int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);\r
- ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);\r
- ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);\r
- if (null == pCluster)\r
- continue;\r
- int pClusterKey = pCluster.getClusterKey();\r
- int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);\r
- queryProcessor.updateStatements(resource, predicate);\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");\r
- }\r
- for (int i=0; i<cc.getValueIndex().length; ++i) {\r
- int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);\r
- queryProcessor.updateValue(resource);\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- System.err.println("value " + cc.getValueIndex()[i] + " changed.");\r
- }\r
- }\r
- final void refreshImportance(ClusterImpl c) {\r
-\r
- if (c.isWriteOnly())\r
- return;\r
-\r
- importanceMap.remove(c.getImportance());\r
- if(collectorPolicy != null) collectorPolicy.removed(c);\r
-\r
- long newImportance = timeCounter();\r
-// System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);\r
- c.setImportance(newImportance);\r
- if (!c.isLoaded())\r
- Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));\r
-\r
- importanceMap.put(c.getImportance(), new ImportanceEntry(c));\r
- if(collectorPolicy != null) collectorPolicy.added(c);\r
-\r
- }\r
-\r
- static long loadTime = 0;\r
-\r
- @SuppressWarnings("unchecked")\r
- public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
- throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
- ClusterI cluster = clusterArray[clusterKey];\r
- if (cluster == null)\r
- throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);\r
- return (T)cluster;\r
- }\r
-\r
- TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();\r
- int clusterLoadCounter = 0;\r
-\r
- @SuppressWarnings("unchecked")\r
- public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
- throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
- ClusterI c = clusterArray[clusterKey];\r
- if (c == null)\r
- return null;\r
- if (c.isLoaded()) {\r
- if ((counter++ & 4095) == 0)\r
- refreshImportance((ClusterImpl) c);\r
- return (T) c;\r
- }\r
- if (!(c instanceof ClusterSmall)) {\r
- Logger.defaultLogError("Proxy must be instance of ClusterSmall");\r
- return null;\r
- }\r
- ClusterI cluster;\r
- ClusterSmall cs = (ClusterSmall) c;\r
- try {\r
- if(DebugPolicy.REPORT_CLUSTER_LOADING) {\r
- long start = System.nanoTime();\r
- cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
- long load = System.nanoTime()-start;\r
- loadTime += load;\r
- if(DebugPolicy.REPORT_CLUSTER_LOADING) {\r
- int was = clusterLoadHistogram.get(cluster.getClusterId());\r
- clusterLoadHistogram.put(cluster.getClusterId(), was+1);\r
- clusterLoadCounter++;\r
- String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();\r
- if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {\r
- new Exception(text).printStackTrace();\r
- } else {\r
- System.err.println(text);\r
- }\r
- }\r
- } else {\r
- cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- e.printStackTrace();\r
- String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey\r
- + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));\r
- // TODO: this jams the system => needs refactoring.\r
- throw new RuntimeDatabaseException(msg, e);\r
- }\r
- return (T) cluster;\r
- }\r
-\r
- @SuppressWarnings("unchecked")\r
- final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
- if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
- throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
- ClusterI c = clusterArray[clusterKey];\r
- if (c == null)\r
- throw new RuntimeException("No cluster for resource key " + resourceKey);\r
- if (c.isLoaded()) {\r
- if ((counter++ & 4095) == 0)\r
- refreshImportance((ClusterImpl) c);\r
- return (T) c;\r
- }\r
- if (!(c instanceof ClusterSmall)) {\r
- Logger.defaultLogError("Proxy must be instance of ClusterSmall");\r
- return null;\r
- }\r
- ClusterI cluster;\r
- ClusterSmall cs = (ClusterSmall) c;\r
- try {\r
-// System.err.println("Load2 " + resourceKey);\r
- cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
- } catch (DatabaseException e) {\r
- if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
- e.printStackTrace();\r
- int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);\r
- long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);\r
- String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;\r
- Logger.defaultLogError(msg, e);\r
- c.setDeleted(true, null);\r
- return (T)c;\r
- }\r
- return (T) cluster;\r
- }\r
-\r
- void printDebugInfo(ClusterSupport support)\r
- throws DatabaseException {\r
- final int SIZE = clusters.size();\r
- long sum = 0;\r
- for (int i = 1; i < SIZE; ++i) {\r
- ClusterI c = clusterArray[i];\r
- c.getNumberOfResources(support);\r
- long size = c.getUsedSpace();\r
- System.out.println("cluster=" + c.getClusterId() + " size=" + size);\r
- c.printDebugInfo("koss: ", support);\r
- sum += size;\r
- }\r
- System.out.println("Total number of clusters " + SIZE);\r
- System.out.println("Total cluster size " + sum);\r
- }\r
-\r
- Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();\r
-\r
- public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {\r
- ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];\r
- if (curr.isLoaded())\r
- return curr;\r
-\r
-// getPrefetched();\r
-//\r
-// curr = (ClusterImpl) clusterArray[clusterKey];\r
-// if (curr.isLoaded())\r
-// return curr;\r
-\r
- final Semaphore s = new Semaphore(0);\r
- final DatabaseException[] ex = new DatabaseException[1];\r
-\r
- load2(clusterId, clusterKey, new Callback<DatabaseException>() {\r
- @Override\r
- public void run(DatabaseException e) {\r
- ex[0] = e;\r
- s.release();\r
- }\r
- });\r
- try {\r
- s.acquire();\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- if (null != ex[0])\r
- throw ex[0];\r
- if (Development.DEVELOPMENT) {\r
- if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {\r
- try {\r
- ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];\r
- if (loaded instanceof ClusterSmall)\r
- ((ClusterSmall) loaded).check();\r
- else if (loaded instanceof ClusterBig)\r
- ((ClusterBig) loaded).check();\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
- }\r
- \r
- validate(clusterKey);\r
- \r
- return (ClusterImpl) clusterArray[clusterKey];\r
- }\r
- \r
- void validate(int clusterKey) {\r
- \r
-// if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {\r
-// \r
-// ClusterImpl server = (ClusterImpl)clusterArray[clusterKey]; \r
-// String sd = server.dump(sessionImpl.clusterTranslator);\r
-// GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);\r
-// ClusterImpl memory = gci.clusters.get(server.clusterUID);\r
-// if(memory == null)\r
-// System.err.println("ad");\r
-// String md = memory.dump(gci.support);\r
-// \r
-// if(!sd.equals(md)) {\r
-// \r
-// int diffPos = 0;\r
-// int minLength = Math.min(sd.length(), md.length());\r
-// for (int i = 0; i < minLength; i++) {\r
-// if (sd.charAt(i) != md.charAt(i)) {\r
-// diffPos = i;\r
-// break;\r
-// }\r
-// }\r
-// \r
-// int start = Math.max(diffPos-200, 0);\r
-// int end = Math.min(minLength-1, diffPos+200);\r
-//\r
-// System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");\r
-// System.err.println("== SESSION == ");\r
-// System.err.println(sd.substring(start, end));\r
-// System.err.println("== MEM == ");\r
-// System.err.println(md.substring(start, end));\r
-// System.err.println("== CLUSTER DIFFERENCE ENDS == ");\r
-// \r
-// throw new IllegalStateException();\r
-// }\r
-// \r
-// }\r
- }\r
-\r
- public synchronized ClusterImpl getPrefetched() {\r
- synchronized(prefetch) {\r
- return null;\r
- }\r
- }\r
-\r
- public synchronized void load2(long clusterId, int clusterKey, final Callback<DatabaseException> runnable) {\r
-\r
- assert (Constants.ReservedClusterId != clusterId);\r
-\r
- ClusterImpl cluster = null;\r
- DatabaseException e = null;\r
-\r
- try {\r
-\r
- ClusterUID clusterUID = clusters.makeClusterUID(clusterId);\r
-// cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);\r
- Database.Session session = sessionImpl.graphSession.dbSession;\r
-// cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);\r
-// cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);\r
- \r
-// cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);\r
- \r
- cluster = session.clone(clusterUID, new ClusterCreator() {\r
- \r
- @Override\r
- public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
- ClusterSupport support = sessionImpl.clusterTranslator;\r
- try {\r
- return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- return null;\r
- }\r
- }\r
- \r
- });\r
- \r
- } catch (Throwable t) {\r
- // It's totally legal to call for non-existing cluster.\r
- // Sometimes this indicates an error, though.\r
- Logger.getDefault().logInfo("Load cluster failed.", t);\r
- if (t instanceof DatabaseException)\r
- e = (DatabaseException) t;\r
- else\r
- e = new DatabaseException("Load cluster failed.", t);\r
- }\r
- if (null == cluster) {\r
- runnable.run(e);\r
- return;\r
- }\r
-\r
-// new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();\r
-\r
- // Can not be called with null argument.\r
- replaceCluster(cluster);\r
- sessionImpl.onClusterLoaded(clusterId);\r
- runnable.run(null);\r
-\r
- }\r
-\r
- public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {\r
-\r
- assert (Constants.ReservedClusterId != clusterId);\r
-\r
- ClusterUID clusterUID = clusters.makeClusterUID(clusterId);\r
- \r
- Database.Session session = sessionImpl.graphSession.dbSession;\r
- \r
- ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {\r
- \r
- @Override\r
- public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
- ClusterSupport support = sessionImpl.clusterTranslator;\r
- try {\r
- return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- return null;\r
- }\r
- }\r
- \r
- });\r
- if (null == clusterArray[clusterKey])\r
- clusterArray[clusterKey] = cluster;\r
- else\r
- replaceCluster(cluster);\r
- sessionImpl.onClusterLoaded(clusterId);\r
- \r
- validate(clusterKey);\r
- \r
- return cluster;\r
- }\r
-\r
- public Collection<ClusterI> getClusters() {\r
- ArrayList<ClusterI> result = new ArrayList<ClusterI>();\r
- for (int i = 0; i < clusterArray.length; i++) {\r
- ClusterI cluster = clusterArray[i];\r
- if (cluster != null)\r
- result.add(cluster);\r
- }\r
- return result;\r
- }\r
-\r
- public int size() {\r
- return clusters.size();\r
- }\r
-\r
- public long timeCounter() {\r
- return timeCounter++;\r
- }\r
-\r
- public boolean hasVirtual(int index) {\r
- return virtuals[index];\r
- }\r
-\r
- public void markVirtual(int index) {\r
- virtuals[index] = true;\r
- }\r
-\r
- public ClusterImpl[] getClusterArray() {\r
- return clusterArray;\r
- }\r
-\r
- public boolean isImmutable(int id) {\r
-\r
- int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);\r
- Boolean exist = immutables[clusterKey];\r
- if(exist != null) return exist;\r
-\r
- ClusterI cluster = getClusterByResourceKey(id);\r
- boolean result = cluster == null ? false : cluster.getImmutable();\r
-\r
- markImmutable(cluster, result);\r
- return result;\r
-\r
- }\r
-\r
- public void markImmutable(ClusterI cluster, boolean value) {\r
- immutables[cluster.getClusterKey()] = value;\r
- }\r
- @Override\r
- public int getClusterKeyByUID(long first, long second) throws DatabaseException {\r
- return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database;
+import org.simantics.db.Database.Session.ClusterChanges;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.SessionVariables;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.ClusterDoesNotExistException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ResourceNotFoundException;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.impl.IClusterTable;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.procore.cluster.ClusterBig;
+import org.simantics.db.procore.cluster.ClusterImpl;
+import org.simantics.db.procore.cluster.ClusterSmall;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.procore.protocol.Constants;
+import org.simantics.db.service.ClusterCollectorPolicy;
+import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.Development;
+
+import fi.vtt.simantics.procore.DebugPolicy;
+import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
+import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
+import gnu.trove.map.hash.TLongIntHashMap;
+import gnu.trove.map.hash.TLongObjectHashMap;
+import gnu.trove.procedure.TIntProcedure;
+import gnu.trove.procedure.TLongObjectProcedure;
+import gnu.trove.procedure.TObjectProcedure;
+import gnu.trove.set.hash.TIntHashSet;
+
+public final class ClusterTable implements IClusterTable {
+
+ private static final boolean VALIDATE_SIZE = false;
+
+ int maximumBytes = 128 * 1024 * 1024;
+ int limit = (int)(0.8*(double)maximumBytes);
+
+ private final AtomicLong timeCounter = new AtomicLong(1);
+
+ final private SessionImplSocket sessionImpl;
+ final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
+ final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
+ final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
+ final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
+ final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
+ final private boolean[] virtuals = new boolean[ARRAY_SIZE];
+ static class ImportanceEntry implements CollectorCluster {
+ final public long importance;
+ final public long clusterId;
+ public ImportanceEntry(ClusterImpl impl) {
+ this(impl.getImportance(), impl.getClusterId());
+ }
+
+ public ImportanceEntry(long importance, long clusterId) {
+ this.importance = importance;
+ this.clusterId = clusterId;
+ }
+ @Override
+ public long getImportance() {
+ return importance;
+ }
+ @Override
+ public long getClusterId() {
+ return clusterId;
+ }
+ @Override
+ public String toString() {
+ return "CID " + clusterId;
+ }
+ }
+ private class Clusters {
+ // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
+ final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
+ //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
+ private Clusters() {
+ clear();
+ }
+ private void clear() {
+ hashMap.clear();
+ // clusterU2I.clear();
+ hashMap.put(0, null); // reserved for null value
+ // clusterU2I.put(ClusterUID.make(0, 0), null);
+ }
+ private int size() {
+ return hashMap.size();
+ }
+ private ClusterImpl getClusterByClusterId(long clusterId) {
+ return hashMap.get(clusterId);
+ }
+ private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
+ return getClusterByClusterId(clusterUID.second);
+ //return clusterU2I.get(clusterUID);
+ }
+ private ClusterImpl getClusterByClusterUID(long id1, long id2) {
+ return getClusterByClusterId(id2);
+ }
+ private ClusterImpl makeProxy(long clusterId) {
+ return makeProxy(ClusterUID.make(0, clusterId));
+ }
+ private ClusterImpl makeProxy(ClusterUID clusterUID) {
+ ClusterImpl proxy = hashMap.get(clusterUID.second);
+ if (null != proxy)
+ return proxy;
+ int clusterKey = hashMap.size();
+ ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
+ if (sentinel.clusterId != sentinel.clusterUID.second)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ create(sentinel);
+ return sentinel;
+ }
+ private void replace(ClusterImpl proxy) {
+ ClusterImpl old = clusterArray[proxy.clusterKey];
+ create(proxy);
+ if (null != old) {
+ if (old.clusterKey != proxy.clusterKey)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ if (old.clusterId != proxy.clusterId)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ if (!old.clusterUID.equals(proxy.clusterUID))
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ }
+ }
+ private ClusterSmall freeProxy(ClusterImpl proxy) {
+ ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
+ if (null == clusterImpl)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+// ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
+// ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
+// if (clusterImpl != clusterImpl2)
+// throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ if (proxy.clusterId != clusterImpl.clusterId)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ if (proxy.clusterKey != clusterImpl.clusterKey)
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
+ return (ClusterSmall)create(sentinel);
+ }
+ private ClusterImpl create(ClusterImpl clusterImpl) {
+ hashMap.put(clusterImpl.clusterId, clusterImpl);
+// clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
+ clusterArray[clusterImpl.clusterKey] = clusterImpl;
+ return clusterImpl;
+ }
+ private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
+ hashMap.forEachValue(procedure);
+ }
+ private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
+ hashMap.forEachEntry(procedure);
+ }
+ private ClusterUID makeClusterUID(long clusterId) {
+ return ClusterUID.make(0, clusterId);
+ }
+ private void removeProxy(ClusterImpl proxy) {
+ hashMap.remove(proxy.clusterId);
+ clusterArray[proxy.clusterKey] = null;
+ }
+ }
+ final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
+
+ private ClusterCollectorPolicy collectorPolicy;
+ private boolean dirtySizeInBytes = true;
+ private long sizeInBytes = 0;
+ private final Clusters clusters;
+ ClusterUID makeClusterUID(long clusterId) {
+ return clusters.makeClusterUID(clusterId);
+ }
+ public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
+ int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
+ return clusterArray[clusterKey].clusterUID;
+ }
+ ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
+ this.sessionImpl = sessionImpl;
+ clusters = new Clusters();
+ }
+
+ void dispose() {
+ clusters.clear();
+ importanceMap.clear();
+ }
+
+ public void setCollectorPolicy(ClusterCollectorPolicy policy) {
+ this.collectorPolicy = policy;
+ }
+
+ /**
+ * Sets Cluster Table allocation size by percentage of maximum usable
+ * memory. Allocation is limited between 5% and 50% of maximum memory.
+ *
+ * @param precentage
+ */
+ private void setAllocateSize(double percentage) {
+
+ percentage = Math.max(percentage, 0.05);
+ percentage = Math.min(percentage, 0.5);
+
+ maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
+ }
+
+ private double estimateProperAllocation(ClusterCollectorSupport support) {
+ long max = Runtime.getRuntime().maxMemory();
+ long free = Runtime.getRuntime().freeMemory();
+ long total = Runtime.getRuntime().totalMemory();
+ long realFree = max - total + free; // amount of free memory
+ long current = support.getCurrentSize(); // currently allocated cache
+ long inUseTotal = max - realFree; // currently used memory
+ long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
+ double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
+ double aimFree = 0.2; // percentage of maximum heap that we try to keep free
+ double estimate = 1.0 - otherUsePercentage - aimFree;
+ estimate = Math.min(estimate, 0.5);
+ estimate = Math.max(estimate, 0.05);
+ // System.out.println("Estimated allocation percentage " + estimate +
+// " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
+// inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
+ return estimate;
+ }
+
+ void checkCollect() {
+ collector.collect();
+ }
+
+ synchronized ClusterImpl getClusterByClusterId(long clusterId) {
+ return clusters.getClusterByClusterId(clusterId);
+ }
+
+ ClusterBase getClusterByClusterKey(int clusterKey) {
+ return clusterArray[clusterKey];
+ }
+
+ synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
+ if (clusterUID.second != clusterId)
+ throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
+ return clusters.makeProxy(clusterUID);
+ }
+
+ synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
+ checkCollect();
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
+ if (null != proxy)
+ return proxy;
+ ClusterUID clusterUID = ClusterUID.make(0, clusterId);
+ int clusterKey = clusters.size(); // new key
+ if (writeOnly) {
+ proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
+ writeOnlyClusters.add(proxy);
+ return clusters.create(proxy);
+ } else {
+ //printMaps("makeCluster");
+ ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
+ clusters.create(cluster);
+ if (!cluster.isLoaded())
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
+ importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
+ if (VALIDATE_SIZE)
+ validateSize("makeCluster");
+ if(collectorPolicy != null) collectorPolicy.added(cluster);
+ return cluster;
+ }
+ }
+
+ synchronized void replaceCluster(ClusterI cluster) {
+ //printMaps("replaceCluster");
+ checkCollect();
+ int clusterKey = cluster.getClusterKey();
+ ClusterI existing = clusterArray[clusterKey];
+ if (existing.hasVirtual())
+ cluster.markVirtual();
+
+ importanceMap.remove(existing.getImportance());
+ if (collectorPolicy != null)
+ collectorPolicy.removed((ClusterImpl)existing);
+
+ //System.out.println("ClusterTable.replace(" + existing + " (I=" + existing.getImportance() + ") => " + cluster + " (I=" + cluster.getImportance() + ")");
+
+ clusters.replace((ClusterImpl)cluster);
+ if (!cluster.isLoaded())
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
+
+ importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
+ if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
+
+ if (!dirtySizeInBytes) {
+ if (existing != cluster) {
+ adjustCachedSize(-existing.getCachedSize(), existing);
+ }
+ // This will update sizeInBytes through adjustCachedSize
+ cluster.getCachedSize();
+ }
+ if (VALIDATE_SIZE)
+ validateSize("replaceCluster");
+ }
+
+ synchronized void release(CollectorCluster cluster) {
+ importanceMap.remove(cluster.getImportance());
+ release(cluster.getClusterId());
+ }
+
+ synchronized void release(long clusterId) {
+ //System.out.println("ClusterTable.release(" + clusterId + "): " + sizeInBytes);
+ //validateSize();
+ ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
+ if (null == clusterImpl)
+ return;
+ if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
+ return;
+ //printMaps("release");
+ clusters.freeProxy(clusterImpl);
+ importanceMap.remove(clusterImpl.getImportance());
+ if (collectorPolicy != null)
+ collectorPolicy.removed(clusterImpl);
+ if (sessionImpl.writeState != null)
+ sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
+ if (!dirtySizeInBytes) {
+ adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl);
+ }
+ if (VALIDATE_SIZE)
+ validateSize("release");
+ }
+
+ synchronized void compact(long id) {
+ ClusterI impl = clusters.getClusterByClusterId(id);
+ if (impl != null)
+ impl.compact();
+ }
+
+ void updateSize() {
+// cachedSize = getSizeInBytes();
+ }
+
+ double getLoadProbability() {
+ // This can currently cause stack overflow
+ return 1.0;
+// if(cachedSize < SLOW_LIMIT) return 1.0;
+// if(cachedSize > HIGH_LIMIT) return 1e-2;
+//
+// double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
+// SLOW_LIMIT);
+// double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
+// return val;
+ }
+
+ class SizeProcedure implements TObjectProcedure<ClusterImpl> {
+ public long result = 0;
+
+ @Override
+ public boolean execute(ClusterImpl cluster) {
+ if (cluster != null) {
+ try {
+ if (cluster.isLoaded() && !cluster.isEmpty()) {
+ result += cluster.getCachedSize();
+ }
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+ return true;
+ }
+
+ public void clear() {
+ result = 0;
+ }
+
+ };
+
+ private SizeProcedure sizeProcedure = new SizeProcedure();
+ long getSizeInBytes() {
+ if (dirtySizeInBytes) {
+ sizeProcedure.clear();
+ clusters.forEachValue(sizeProcedure);
+ sizeInBytes = sizeProcedure.result;
+ // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
+ setDirtySizeInBytes(false);
+ }
+ return sizeInBytes;
+ }
+
+ public void setDirtySizeInBytes(boolean value) {
+ dirtySizeInBytes = value;
+ }
+
+ ClusterStateImpl getState() {
+ final ClusterStateImpl result = new ClusterStateImpl();
+ clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
+ @Override
+ public boolean execute(long arg0, ClusterImpl arg1) {
+ if (arg1 == null)
+ return true;
+ if (arg1.isLoaded() && !arg1.isEmpty()) {
+ result.ids.add(new ImportanceEntry(arg1));
+ }
+ return true;
+ }
+ });
+ return result;
+ }
+
+ void restoreState(ClusterStateImpl state) {
+ ClusterStateImpl current = getState();
+ for (CollectorCluster id : current.ids)
+ if (!state.ids.contains(id))
+ collectorSupport.release(id);
+ }
+
+ class ClusterCollectorImpl implements ClusterCollector {
+
+ final private ClusterCollectorSupport support;
+ private ClusterCollectorPolicy policy;
+ ClusterCollectorImpl(ClusterCollectorSupport support) {
+ this.support = support;
+ }
+
+ ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
+
+ ClusterCollectorPolicy oldPolicy = policy;
+ policy = newPolicy;
+
+ if(policy != null) {
+ for (CollectorCluster id : support.getResidentClusters()) {
+ policy.added(getClusterByClusterId(id.getClusterId()));
+ }
+ }
+
+ support.setPolicy(policy);
+
+ return oldPolicy;
+
+ }
+
+ @Override
+ public void collect() {
+
+ if(policy != null) {
+
+ release(policy.select());
+
+ } else {
+
+ int size = support.getCurrentSize();
+ boolean dynamicAllocation = useDynamicAllocation();
+ if (dynamicAllocation)
+ setAllocateSize(estimateProperAllocation(support));
+ if (DebugPolicy.CLUSTER_COLLECTION) {
+ System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
+ }
+ if (dynamicAllocation) {
+ int collectSize = maximumBytes / 2;
+ collectSize = Math.min(collectSize, 32 * 1024 * 1024);
+ // try to keep allocated clusters below the maximum
+ if (maximumBytes - size > collectSize)
+ return;
+ collectSize += size - maximumBytes;
+ collect(collectSize);
+ } else {
+ // try to keep allocated clusters below the maximum
+ if (size < maximumBytes)
+ return;
+ // shave off 20%
+ collect(size-limit);
+ }
+
+ }
+
+ }
+
+ private boolean useDynamicAllocation() {
+ return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
+ }
+
+ @Override
+ public void collect(int target) {
+
+ if(policy != null) {
+
+ release(policy.select(target));
+
+ } else {
+
+ ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
+
+ for (CollectorCluster cluster : support.getResidentClusters()) {
+ target -= support.getClusterSize(cluster);
+ if (target > 0) {
+ toRelease.add(cluster);
+ } else {
+ break;
+ }
+ }
+
+ release(toRelease);
+
+ if (DebugPolicy.CLUSTER_COLLECTION) {
+ System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
+ }
+
+ }
+
+ }
+
+ void release(Collection<CollectorCluster> toRelease) {
+ for (CollectorCluster id : toRelease) {
+ support.release(id);
+ }
+ }
+
+ }
+
+ private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
+ ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
+
+ void gc() {
+ collector.collect();
+ }
+
+ private long newResourceClusterId = Constants.NewClusterId;
+ public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
+
+ /*
+ * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
+ * vain jo k�yt�ss� olevia klustereita
+ */
+
+ ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
+ throws DatabaseException {
+ ClusterImpl result = null;
+ if (Constants.NewClusterId == newResourceClusterId) {
+ newResourceClusterId = graphSession.newClusterId();
+ result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
+ } else {
+ ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
+ if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
+ newResourceClusterId = graphSession.newClusterId();
+ cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
+ }
+ result = cluster;
+ }
+ return ensureLoaded(result);
+ }
+
+ void flushCluster(GraphSession graphSession) {
+// We seem to disagree about this.
+// graphSession.newClusterId();
+ newResourceClusterId = Constants.NewClusterId;
+ }
+
+ void writeOnlyInvalidate(ClusterI impl) {
+ writeOnlyInvalidates.add(impl.getClusterKey());
+ }
+
+ void removeWriteOnlyClusters() {
+ for (ClusterI proxy : writeOnlyClusters) {
+ if (!(proxy instanceof ClusterImpl))
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
+ clusters.freeProxy((ClusterImpl)proxy);
+ }
+ writeOnlyClusters.clear();
+ writeOnlyInvalidates.forEach(new TIntProcedure() {
+ @Override
+ public boolean execute(int clusterKey) {
+ ClusterImpl proxy = clusterArray[clusterKey];
+ ClusterUID clusterUID = proxy.getClusterUID();
+ System.err.println("writeOnlyInvalidate " + clusterUID);
+ clusters.freeProxy(proxy);
+ return true;
+ }
+ });
+ writeOnlyInvalidates.clear();
+ }
+
+ public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
+ synchronized (this) {
+ return clusters.getClusterByClusterUID(clusterUID);
+ }
+ }
+ public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
+ return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
+ }
+ public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
+ return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
+ }
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
+ synchronized (this) {
+ ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
+ if (null == clusterImpl)
+ clusterImpl = clusters.makeProxy(clusterUID);
+ return clusterImpl;
+ }
+ }
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
+ synchronized (this) {
+ ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
+ if (null == clusterImpl)
+ clusterImpl = clusters.makeProxy(id2);
+ return clusterImpl;
+ }
+ }
+ ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
+ synchronized (this) {
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
+ int clusterKey = 0;
+ if (proxy != null)
+ if (proxy.isLoaded())
+ return proxy;
+ else
+ clusterKey = proxy.getClusterKey();
+ try {
+ if (clusterKey == 0) {
+ proxy = clusters.makeProxy(clusterId);
+ clusterKey = proxy.getClusterKey();
+ }
+ ClusterImpl ci = tryLoad(clusterId, clusterKey);
+ if (null == ci)
+ throw new ResourceNotFoundException(clusterId);
+ return ci;
+ } catch (ClusterDoesNotExistException t) {
+ clusters.removeProxy(proxy);
+ throw t;
+ }
+ }
+ }
+
+ ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
+ synchronized (this) {
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
+ if (proxy != null)
+ return proxy;
+ else
+ return makeCluster(clusterId, writeOnly);
+ }
+ }
+ ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
+ synchronized (this) {
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
+ if (null == proxy)
+ throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
+ return proxy;
+ }
+ }
+ long getClusterIdOrCreate(ClusterUID clusterUID) {
+ return clusterUID.second;
+ }
+ final long getClusterIdByResourceKey(final int resourceKey)
+ throws DatabaseException {
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
+ ClusterI c = clusterArray[clusterKey];
+ if (c == null)
+ throw new RuntimeException("No cluster for key " + resourceKey);
+ return c.getClusterId();
+ }
+ final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
+ Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
+ return 0;
+ }
+ ClusterI c = clusterArray[clusterKey];
+ if (c == null) {
+ Logger.defaultLogError("No cluster for key " + resourceKey);
+ return 0;
+ }
+ return c.getClusterId();
+ }
+
+ int counter = 0;
+
+ void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
+ synchronized (this) {
+ session.flushCounter = 0;
+ session.clusterStream.reallyFlush();
+ ClientChangesImpl cs = new ClientChangesImpl(session);
+ if (session.clientChanges == null)
+ session.clientChanges = cs;
+ //printMaps("refresh");
+ for (int i=0; i<clusterUID.length; ++i) {
+ try {
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
+ ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
+ if (null == oldCluster)
+ continue;
+ if (!oldCluster.isLoaded())
+ continue;
+ int clusterKey = oldCluster.getClusterKey();
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
+ continue;
+ boolean big = oldCluster instanceof ClusterBig;
+ boolean small = oldCluster instanceof ClusterSmall;
+ if (!big && !small)
+ continue;
+// ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
+ Database.Session dbSession = sessionImpl.graphSession.dbSession;
+
+ ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
+
+ @Override
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
+ ClusterSupport support = sessionImpl.clusterTranslator;
+ try {
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ });
+ if (null == newCluster)
+ continue;
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ System.err.println("cluster=" + newCluster + " updated.");
+ importanceMap.remove(oldCluster.getImportance());
+ if (collectorPolicy != null)
+ collectorPolicy.removed(oldCluster);
+ clusters.replace(newCluster);
+ if (!newCluster.isLoaded())
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
+ importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
+ if (collectorPolicy != null)
+ collectorPolicy.added(newCluster);
+ // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
+ refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
+ } catch (Throwable t) {
+ Logger.defaultLogError("Failed to load cluster in refresh.", t);
+ }
+ }
+ if (VALIDATE_SIZE)
+ validateSize("refresh");
+ // Fake update of cluster changes.
+ QueryProcessor queryProcessor = session.getQueryProvider2();
+ WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
+ TaskHelper th = null;
+ if (null == session.writeState) {
+ th = new TaskHelper("Refresh");
+ session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
+ try {
+ session.getQueryProvider2().performDirtyUpdates(writer);
+ session.fireMetadataListeners(writer, cs);
+ session.getQueryProvider2().performScheduledUpdates(writer);
+ session.fireReactionsToSynchronize(cs);
+ session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+ session.printDiagnostics();
+ } finally {
+ if (null != th)
+ session.writeState = null;
+ cs.dispose();
+ }
+ }
+ }
+ }
+ final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
+ throws DatabaseException {
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
+ // get cluster change sets
+ QueryProcessor queryProcessor = session.getQueryProvider2();
+ ClusterChanges cc;
+ try {
+ cc = session.graphSession.getClusterChanges(clusterUID, csid);
+ } catch (Exception e) {
+ Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
+ release(clusterId);
+ return;
+ }
+ for (int i=0; i<cc.getResourceIndex().length; ++i) {
+ int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
+ ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
+ ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
+ if (null == pCluster)
+ continue;
+ int pClusterKey = pCluster.getClusterKey();
+ int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
+ queryProcessor.updateStatements(resource, predicate);
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
+ }
+ for (int i=0; i<cc.getValueIndex().length; ++i) {
+ int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
+ queryProcessor.updateValue(resource);
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ System.err.println("value " + cc.getValueIndex()[i] + " changed.");
+ }
+ }
+ final void refreshImportance(ClusterImpl c) {
+
+ if (c.isWriteOnly())
+ return;
+
+ //printMaps("refreshImportance");
+
+ importanceMap.remove(c.getImportance());
+ if(collectorPolicy != null) collectorPolicy.removed(c);
+
+ long newImportance = timeCounter();
+// System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
+ c.setImportance(newImportance);
+ if (!c.isLoaded())
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
+
+ importanceMap.put(c.getImportance(), new ImportanceEntry(c));
+ if(collectorPolicy != null) collectorPolicy.added(c);
+ if (VALIDATE_SIZE)
+ validateSize("refreshImportance");
+
+ }
+
+ static long loadTime = 0;
+
+ @SuppressWarnings("unchecked")
+ public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
+ ClusterI cluster = clusterArray[clusterKey];
+ if (cluster == null)
+ throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
+ return (T)cluster;
+ }
+
+ TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
+ int clusterLoadCounter = 0;
+
+ private <T extends ClusterI> T ensureLoaded(T c) {
+ ClusterI cluster;
+ ClusterImpl cs = (ClusterImpl) c;
+ try {
+ if(DebugPolicy.REPORT_CLUSTER_LOADING) {
+ long start = System.nanoTime();
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());
+ long load = System.nanoTime()-start;
+ loadTime += load;
+ if(DebugPolicy.REPORT_CLUSTER_LOADING) {
+ int was = clusterLoadHistogram.get(cluster.getClusterId());
+ clusterLoadHistogram.put(cluster.getClusterId(), was+1);
+ clusterLoadCounter++;
+ String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
+ if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
+ new Exception(text).printStackTrace();
+ } else {
+ System.err.println(text);
+ }
+ }
+ } else {
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ e.printStackTrace();
+ String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
+ // TODO: this jams the system => needs refactoring.
+ throw new RuntimeDatabaseException(msg, e);
+ }
+ return (T) cluster;
+ }
+
+ @SuppressWarnings("unchecked")
+ public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
+ ClusterI c = clusterArray[clusterKey];
+ if (c == null)
+ return null;
+ if (c.isLoaded()) {
+ if ((counter++ & 4095) == 0)
+ refreshImportance((ClusterImpl) c);
+ return (T) c;
+ }
+ if (!(c instanceof ClusterSmall)) {
+ Logger.defaultLogError("Proxy must be instance of ClusterSmall");
+ return null;
+ }
+ return ensureLoaded((T)c);
+ }
+
+ @SuppressWarnings("unchecked")
+ final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
+ ClusterI c = clusterArray[clusterKey];
+ if (c == null)
+ throw new RuntimeException("No cluster for resource key " + resourceKey);
+ if (c.isLoaded()) {
+ if ((counter++ & 4095) == 0)
+ refreshImportance((ClusterImpl) c);
+ return (T) c;
+ }
+ if (!(c instanceof ClusterSmall)) {
+ Logger.defaultLogError("Proxy must be instance of ClusterSmall");
+ return null;
+ }
+ ClusterI cluster;
+ ClusterSmall cs = (ClusterSmall) c;
+ try {
+// System.err.println("Load2 " + resourceKey);
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());
+ } catch (DatabaseException e) {
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)
+ e.printStackTrace();
+ int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
+ long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
+ String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
+ Logger.defaultLogError(msg, e);
+ c.setDeleted(true, null);
+ return (T)c;
+ }
+ return (T) cluster;
+ }
+
+ void printDebugInfo(ClusterSupport support)
+ throws DatabaseException {
+ final int SIZE = clusters.size();
+ long sum = 0;
+ for (int i = 1; i < SIZE; ++i) {
+ ClusterI c = clusterArray[i];
+ c.getNumberOfResources(support);
+ long size = c.getUsedSpace();
+ System.out.println("cluster=" + c.getClusterId() + " size=" + size);
+ c.printDebugInfo("koss: ", support);
+ sum += size;
+ }
+ System.out.println("Total number of clusters " + SIZE);
+ System.out.println("Total cluster size " + sum);
+ }
+
+ Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
+
+ public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
+ ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
+ if (curr.isLoaded())
+ return curr;
+
+// getPrefetched();
+//
+// curr = (ClusterImpl) clusterArray[clusterKey];
+// if (curr.isLoaded())
+// return curr;
+
+ final Semaphore s = new Semaphore(0);
+ final DatabaseException[] ex = new DatabaseException[1];
+
+ load2(clusterId, clusterKey, e -> {
+ ex[0] = e;
+ s.release();
+ });
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ }
+ if (null != ex[0])
+ throw ex[0];
+ if (Development.DEVELOPMENT) {
+ if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
+ try {
+ ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
+ if (loaded instanceof ClusterSmall)
+ ((ClusterSmall) loaded).check();
+ else if (loaded instanceof ClusterBig)
+ ((ClusterBig) loaded).check();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+
+ validate(clusterKey);
+
+ return (ClusterImpl) clusterArray[clusterKey];
+ }
+
+ void validate(int clusterKey) {
+
+// if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
+//
+// ClusterImpl server = (ClusterImpl)clusterArray[clusterKey];
+// String sd = server.dump(sessionImpl.clusterTranslator);
+// GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
+// ClusterImpl memory = gci.clusters.get(server.clusterUID);
+// if(memory == null)
+// System.err.println("ad");
+// String md = memory.dump(gci.support);
+//
+// if(!sd.equals(md)) {
+//
+// int diffPos = 0;
+// int minLength = Math.min(sd.length(), md.length());
+// for (int i = 0; i < minLength; i++) {
+// if (sd.charAt(i) != md.charAt(i)) {
+// diffPos = i;
+// break;
+// }
+// }
+//
+// int start = Math.max(diffPos-200, 0);
+// int end = Math.min(minLength-1, diffPos+200);
+//
+// System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");
+// System.err.println("== SESSION == ");
+// System.err.println(sd.substring(start, end));
+// System.err.println("== MEM == ");
+// System.err.println(md.substring(start, end));
+// System.err.println("== CLUSTER DIFFERENCE ENDS == ");
+//
+// throw new IllegalStateException();
+// }
+//
+// }
+ }
+
+ public synchronized ClusterImpl getPrefetched() {
+ synchronized(prefetch) {
+ return null;
+ }
+ }
+
+ public synchronized void load2(long clusterId, int clusterKey, final Consumer<DatabaseException> runnable) {
+
+ assert (Constants.ReservedClusterId != clusterId);
+
+ ClusterImpl cluster = null;
+ DatabaseException e = null;
+
+ try {
+
+ ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
+// cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
+ Database.Session session = sessionImpl.graphSession.dbSession;
+// cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
+// cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
+
+// cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
+
+ cluster = session.clone(clusterUID, new ClusterCreator() {
+
+ @Override
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
+ ClusterSupport support = sessionImpl.clusterTranslator;
+ try {
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ });
+
+ } catch (Throwable t) {
+ // It's totally legal to call for non-existing cluster.
+ // Sometimes this indicates an error, though.
+ Logger.getDefault().logInfo("Load cluster failed.", t);
+ if (t instanceof DatabaseException)
+ e = (DatabaseException) t;
+ else
+ e = new DatabaseException("Load cluster failed.", t);
+ }
+ if (null == cluster) {
+ runnable.accept(e);
+ return;
+ }
+
+// new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
+
+ // Can not be called with null argument.
+ replaceCluster(cluster);
+ sessionImpl.onClusterLoaded(clusterId);
+ runnable.accept(null);
+
+ }
+
+ public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
+
+ assert (Constants.ReservedClusterId != clusterId);
+
+ ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
+
+ Database.Session session = sessionImpl.graphSession.dbSession;
+
+ ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
+
+ @Override
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
+ ClusterSupport support = sessionImpl.clusterTranslator;
+ try {
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ });
+ if (null == clusterArray[clusterKey])
+ clusterArray[clusterKey] = cluster;
+ else
+ replaceCluster(cluster);
+ sessionImpl.onClusterLoaded(clusterId);
+
+ validate(clusterKey);
+
+ return cluster;
+ }
+
+ public Collection<ClusterI> getClusters() {
+ ArrayList<ClusterI> result = new ArrayList<ClusterI>();
+ for (int i = 0; i < clusterArray.length; i++) {
+ ClusterI cluster = clusterArray[i];
+ if (cluster != null)
+ result.add(cluster);
+ }
+ return result;
+ }
+
+ public int size() {
+ return clusters.size();
+ }
+
+ public long timeCounter() {
+ return timeCounter.getAndIncrement();
+ }
+
+ public boolean hasVirtual(int index) {
+ return virtuals[index];
+ }
+
+ public void markVirtual(int index) {
+ virtuals[index] = true;
+ }
+
+ public ClusterImpl[] getClusterArray() {
+ return clusterArray;
+ }
+
+ public boolean isImmutable(int id) {
+
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
+ Boolean exist = immutables[clusterKey];
+ if(exist != null) return exist;
+
+ ClusterI cluster = getClusterByResourceKey(id);
+ if(cluster == null) {
+ return false;
+ } else {
+ boolean result = cluster.getImmutable();
+ markImmutable(cluster, result);
+ return result;
+ }
+
+ }
+
+ public void markImmutable(ClusterI cluster, boolean value) {
+ immutables[cluster.getClusterKey()] = value;
+ }
+ @Override
+ public int getClusterKeyByUID(long first, long second) throws DatabaseException {
+ return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
+ }
+
+ public void adjustCachedSize(long l, ClusterI cluster) {
+ if (l != 0) {
+ //System.out.println("ClusterTable: adjusting cluster table cached size by " + l + ": " + sizeInBytes + " -> "
+ // + (sizeInBytes + l) + ", for cluster " + cluster.getClusterId() + " (" + cluster + ")");
+ sizeInBytes += l;
+ }
+ }
+
+ private void validateSize(String place) {
+ if (!VALIDATE_SIZE)
+ return;
+
+ int ims = importanceMap.size();
+ int ihms = countImportantClusters();
+
+// System.out.format("[ClusterTable.%s] Validating: byteSize=%d (%d MB), hashMap=%d/%d, importanceMap=%d%n",
+// place, sizeInBytes, sizeInBytes / (1024*1024), ihms, clusters.hashMap.size(), ims);
+
+ int i = clusterArray.length;
+ long size = 0;
+ for (int j = 0; j < i; ++j) {
+ ClusterI c = clusterArray[j];
+ if (c == null)
+ continue;
+ size += c.getCachedSize();
+ }
+ if (sizeInBytes != size) {
+ if (!dirtySizeInBytes)
+ System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
+ //else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
+ }
+ if (ims != ihms) {
+ System.out.println("BUG2: hashmap and importanceMap sizes differ: " + ihms + " != " + ims + ", delta=" + (ihms - ims));
+ printMaps("validateSize");
+ }
+ //System.out.println("[" + place + "] VALIDATED");
+ }
+
+ private void printMaps(String place) {
+ int ihms = countImportantClusters();
+ System.out.println("## printMaps(" + place + ") - " + importanceMap.size() + " - (" + ihms + "/" + clusters.hashMap.size() + ")");
+ System.out.println("importanceMap (" + importanceMap.size() + "):");
+ importanceMap.forEach((importance, ie) -> {
+ System.out.format("\t%d: %s%n", importance, ie.toString());
+ });
+ System.out.println("clusters.hashMap (" + ihms + "/" + clusters.hashMap.size() + "):");
+ clusters.hashMap.forEachEntry((cid, c) -> {
+ boolean important = importantCluster(c);
+ boolean wo = c != null && c.isWriteOnly();
+ boolean empty = c != null && c.isEmpty();
+ boolean loaded = c != null && c.isLoaded();
+ System.out.format("\t%s: %d - %s (writeOnly=%b, empty=%b, loaded=%b)%n", important ? " I" : "NI", cid, c, wo, empty, loaded);
+ return true;
+ });
+ }
+
+ private int countImportantClusters() {
+ int[] result = { 0 };
+ clusters.hashMap.forEachEntry((cid, c) -> {
+ if (importantCluster(c))
+ result[0]++;
+ return true;
+ });
+ return result[0];
+ }
+
+ private static boolean importantCluster(ClusterImpl c) {
+ return c != null && !c.isWriteOnly() && c.isLoaded();// && !c.isEmpty();
+ }
+
+}