--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.io.File;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.TreeMap;\r
+import java.util.concurrent.Semaphore;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.db.ClusterCreator;\r
+import org.simantics.db.Database;\r
+import org.simantics.db.DevelopmentKeys;\r
+import org.simantics.db.SessionVariables;\r
+import org.simantics.db.Database.Session.ClusterChanges;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.ClusterDoesNotExistException;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.ResourceNotFoundException;\r
+import org.simantics.db.exception.RuntimeDatabaseException;\r
+import org.simantics.db.impl.ClusterBase;\r
+import org.simantics.db.impl.ClusterI;\r
+import org.simantics.db.impl.ClusterSupport;\r
+import org.simantics.db.impl.ClusterTraitsBase;\r
+import org.simantics.db.impl.IClusterTable;\r
+import org.simantics.db.impl.graph.WriteGraphImpl;\r
+import org.simantics.db.impl.query.QueryProcessor;\r
+import org.simantics.db.procore.cluster.ClusterBig;\r
+import org.simantics.db.procore.cluster.ClusterImpl;\r
+import org.simantics.db.procore.cluster.ClusterSmall;\r
+import org.simantics.db.procore.cluster.ClusterTraits;\r
+import org.simantics.db.procore.protocol.Constants;\r
+import org.simantics.db.service.ClusterCollectorPolicy;\r
+import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;\r
+import org.simantics.db.service.ClusterUID;\r
+import org.simantics.utils.Development;\r
+import org.simantics.utils.datastructures.Callback;\r
+\r
+import fi.vtt.simantics.procore.DebugPolicy;\r
+import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;\r
+import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;\r
+import gnu.trove.map.hash.TLongIntHashMap;\r
+import gnu.trove.map.hash.TLongObjectHashMap;\r
+import gnu.trove.procedure.TIntProcedure;\r
+import gnu.trove.procedure.TLongObjectProcedure;\r
+import gnu.trove.procedure.TObjectProcedure;\r
+import gnu.trove.set.hash.TIntHashSet;\r
+\r
+public final class ClusterTable implements IClusterTable {\r
+\r
+ int maximumBytes = 128 * 1024 * 1024;\r
+ int limit = (int)(0.8*(double)maximumBytes);\r
+\r
+ long timeCounter = 0;\r
+\r
+ final private SessionImplSocket sessionImpl;\r
+ final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();\r
+ final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();\r
+ final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();\r
+ final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];\r
+ final private Boolean[] immutables = new Boolean[ARRAY_SIZE];\r
+ final private boolean[] virtuals = new boolean[ARRAY_SIZE];\r
+ static class ImportanceEntry implements CollectorCluster {\r
+ final public long importance;\r
+ final public long clusterId;\r
+ public ImportanceEntry(ClusterImpl impl) {\r
+ this(impl.getImportance(), impl.getClusterId());\r
+ }\r
+\r
+ public ImportanceEntry(long importance, long clusterId) {\r
+ this.importance = importance;\r
+ this.clusterId = clusterId;\r
+ }\r
+ @Override\r
+ public long getImportance() {\r
+ return importance;\r
+ }\r
+ @Override\r
+ public long getClusterId() {\r
+ return clusterId;\r
+ }\r
+ }\r
+ private class Clusters {\r
+ // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)\r
+ final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);\r
+ //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.\r
+ private Clusters() {\r
+ clear();\r
+ }\r
+ private void clear() {\r
+ hashMap.clear();\r
+ // clusterU2I.clear();\r
+ hashMap.put(0, null); // reserved for null value\r
+ // clusterU2I.put(ClusterUID.make(0, 0), null);\r
+ }\r
+ private int size() {\r
+ return hashMap.size();\r
+ }\r
+ private ClusterImpl getClusterByClusterId(long clusterId) {\r
+ return hashMap.get(clusterId);\r
+ }\r
+ private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {\r
+ return getClusterByClusterId(clusterUID.second);\r
+ //return clusterU2I.get(clusterUID);\r
+ }\r
+ private ClusterImpl getClusterByClusterUID(long id1, long id2) {\r
+ return getClusterByClusterId(id2);\r
+ }\r
+ private ClusterImpl makeProxy(long clusterId) {\r
+ return makeProxy(ClusterUID.make(0, clusterId));\r
+ }\r
+ private ClusterImpl makeProxy(ClusterUID clusterUID) {\r
+ ClusterImpl proxy = hashMap.get(clusterUID.second);\r
+ if (null != proxy)\r
+ return proxy;\r
+ int clusterKey = hashMap.size();\r
+ ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);\r
+ if (sentinel.clusterId != sentinel.clusterUID.second)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ create(sentinel);\r
+ return sentinel;\r
+ }\r
+ private void replace(ClusterImpl proxy) {\r
+ ClusterImpl old = clusterArray[proxy.clusterKey];\r
+ create(proxy);\r
+ if (null != old) {\r
+ if (old.clusterKey != proxy.clusterKey)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ if (old.clusterId != proxy.clusterId)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ if (!old.clusterUID.equals(proxy.clusterUID))\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ }\r
+ }\r
+ private ClusterSmall freeProxy(ClusterImpl proxy) {\r
+ ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);\r
+ if (null == clusterImpl)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+// ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);\r
+// ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );\r
+// if (clusterImpl != clusterImpl2)\r
+// throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ if (proxy.clusterId != clusterImpl.clusterId)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ if (proxy.clusterKey != clusterImpl.clusterKey)\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);\r
+ return (ClusterSmall)create(sentinel);\r
+ }\r
+ private ClusterImpl create(ClusterImpl clusterImpl) {\r
+ hashMap.put(clusterImpl.clusterId, clusterImpl);\r
+// clusterU2I.put(clusterImpl.clusterUID, clusterImpl);\r
+ clusterArray[clusterImpl.clusterKey] = clusterImpl;\r
+ return clusterImpl;\r
+ }\r
+ private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {\r
+ hashMap.forEachValue(procedure);\r
+ }\r
+ private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {\r
+ hashMap.forEachEntry(procedure);\r
+ }\r
+ private ClusterUID makeClusterUID(long clusterId) {\r
+ return ClusterUID.make(0, clusterId);\r
+ }\r
+ private void removeProxy(ClusterImpl proxy) {\r
+ hashMap.remove(proxy.clusterId);\r
+ clusterArray[proxy.clusterKey] = null;\r
+ }\r
+ }\r
+ final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();\r
+\r
+ private ClusterCollectorPolicy collectorPolicy;\r
+ private boolean dirtySizeInBytes = true;\r
+ private long sizeInBytes = 0;\r
+ private final Clusters clusters;\r
+ ClusterUID makeClusterUID(long clusterId) {\r
+ return clusters.makeClusterUID(clusterId);\r
+ }\r
+ public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {\r
+ int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);\r
+ return clusterArray[clusterKey].clusterUID;\r
+ }\r
+ ClusterTable(SessionImplSocket sessionImpl, File folderPath) {\r
+ this.sessionImpl = sessionImpl;\r
+ clusters = new Clusters();\r
+ }\r
+\r
+ void dispose() {\r
+ clusters.clear();\r
+ importanceMap.clear();\r
+ }\r
+\r
+ public void setCollectorPolicy(ClusterCollectorPolicy policy) {\r
+ this.collectorPolicy = policy;\r
+ }\r
+\r
+ /**\r
+ * Sets Cluster Table allocation size by percentage of maximum usable\r
+ * memory. Allocation is limited between 5% and 50% of maximum memory.\r
+ *\r
+ * @param precentage\r
+ */\r
+ private void setAllocateSize(double percentage) {\r
+\r
+ percentage = Math.max(percentage, 0.05);\r
+ percentage = Math.min(percentage, 0.5);\r
+\r
+ maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());\r
+ }\r
+\r
+ private double estimateProperAllocation(ClusterCollectorSupport support) {\r
+ long max = Runtime.getRuntime().maxMemory();\r
+ long free = Runtime.getRuntime().freeMemory();\r
+ long total = Runtime.getRuntime().totalMemory();\r
+ long realFree = max - total + free; // amount of free memory\r
+ long current = support.getCurrentSize(); // currently allocated cache\r
+ long inUseTotal = max - realFree; // currently used memory\r
+ long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)\r
+ double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses\r
+ double aimFree = 0.2; // percentage of maximum heap that we try to keep free\r
+ double estimate = 1.0 - otherUsePercentage - aimFree;\r
+ estimate = Math.min(estimate, 0.5);\r
+ estimate = Math.max(estimate, 0.05);\r
+ // System.out.println("Estimated allocation percentage " + estimate +\r
+// " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +\r
+// inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);\r
+ return estimate;\r
+ }\r
+\r
+ void checkCollect() {\r
+ collector.collect();\r
+ }\r
+\r
+ synchronized ClusterImpl getClusterByClusterId(long clusterId) {\r
+ return clusters.getClusterByClusterId(clusterId);\r
+ }\r
+\r
+ ClusterBase getClusterByClusterKey(int clusterKey) {\r
+ return clusterArray[clusterKey];\r
+ }\r
+\r
+ synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {\r
+ if (clusterUID.second != clusterId)\r
+ throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);\r
+ return clusters.makeProxy(clusterUID);\r
+ }\r
+\r
+ synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {\r
+ checkCollect();\r
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
+ if (null != proxy)\r
+ return proxy;\r
+ ClusterUID clusterUID = ClusterUID.make(0, clusterId);\r
+ int clusterKey = clusters.size(); // new key\r
+ if (writeOnly) {\r
+ proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);\r
+ writeOnlyClusters.add(proxy);\r
+ return clusters.create(proxy);\r
+ } else {\r
+ ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);\r
+ clusters.create(cluster);\r
+ if (!cluster.isLoaded())\r
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));\r
+ importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));\r
+ if(collectorPolicy != null) collectorPolicy.added(cluster);\r
+ return cluster;\r
+ }\r
+ }\r
+\r
+ synchronized void replaceCluster(ClusterI cluster) {\r
+ checkCollect();\r
+ int clusterKey = cluster.getClusterKey();\r
+ ClusterI existing = clusterArray[clusterKey];\r
+ if (existing.hasVirtual())\r
+ cluster.markVirtual();\r
+\r
+ importanceMap.remove(existing.getImportance());\r
+ if (collectorPolicy != null)\r
+ collectorPolicy.removed((ClusterImpl)existing);\r
+\r
+ clusters.replace((ClusterImpl)cluster);\r
+ if (!cluster.isLoaded())\r
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));\r
+\r
+ importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));\r
+ if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);\r
+\r
+ if (!dirtySizeInBytes && !existing.isLoaded()) {\r
+ sizeInBytes += cluster.getCachedSize();\r
+ }\r
+\r
+ }\r
+\r
+ synchronized void release(CollectorCluster cluster) {\r
+ importanceMap.remove(cluster.getImportance());\r
+ release(cluster.getClusterId());\r
+ }\r
+\r
+ synchronized void release(long clusterId) {\r
+ ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);\r
+ if (null == clusterImpl)\r
+ return;\r
+ if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())\r
+ return;\r
+ clusters.freeProxy(clusterImpl);\r
+ importanceMap.remove(clusterImpl.getImportance());\r
+ if (collectorPolicy != null)\r
+ collectorPolicy.removed(clusterImpl);\r
+ if (sessionImpl.writeState != null)\r
+ sessionImpl.clusterStream.flush(clusterImpl.clusterUID);\r
+ if (!dirtySizeInBytes) {\r
+ sizeInBytes -= clusterImpl.getCachedSize();\r
+ }\r
+ }\r
+\r
+ synchronized void compact(long id) {\r
+ ClusterI impl = clusters.getClusterByClusterId(id);\r
+ if (impl != null)\r
+ impl.compact();\r
+ }\r
+\r
+ void updateSize() {\r
+// cachedSize = getSizeInBytes();\r
+ }\r
+\r
+ double getLoadProbability() {\r
+ // This can currently cause stack overflow\r
+ return 1.0;\r
+// if(cachedSize < SLOW_LIMIT) return 1.0;\r
+// if(cachedSize > HIGH_LIMIT) return 1e-2;\r
+//\r
+// double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -\r
+// SLOW_LIMIT);\r
+// double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;\r
+// return val;\r
+ }\r
+\r
+ class SizeProcedure implements TObjectProcedure<ClusterImpl> {\r
+ public long result = 0;\r
+\r
+ @Override\r
+ public boolean execute(ClusterImpl cluster) {\r
+ if (cluster != null) {\r
+ try {\r
+ if (cluster.isLoaded() && !cluster.isEmpty()) {\r
+ result += cluster.getCachedSize();\r
+ }\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ }\r
+ }\r
+ return true;\r
+ }\r
+\r
+ public void clear() {\r
+ result = 0;\r
+ }\r
+\r
+ };\r
+\r
+ private SizeProcedure sizeProcedure = new SizeProcedure();\r
+ long getSizeInBytes() {\r
+ if (dirtySizeInBytes) {\r
+ sizeProcedure.clear();\r
+ clusters.forEachValue(sizeProcedure);\r
+ sizeInBytes = sizeProcedure.result;\r
+ // System.err.println("recomputed size of clusterTable => " + sizeInBytes);\r
+ setDirtySizeInBytes(false);\r
+ }\r
+ return sizeInBytes;\r
+ }\r
+\r
+ public void setDirtySizeInBytes(boolean value) {\r
+ dirtySizeInBytes = value;\r
+ }\r
+\r
+ ClusterStateImpl getState() {\r
+ final ClusterStateImpl result = new ClusterStateImpl();\r
+ clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {\r
+ @Override\r
+ public boolean execute(long arg0, ClusterImpl arg1) {\r
+ if (arg1 == null)\r
+ return true;\r
+ if (arg1.isLoaded() && !arg1.isEmpty()) {\r
+ result.ids.add(new ImportanceEntry(arg1));\r
+ }\r
+ return true;\r
+ }\r
+ });\r
+ return result;\r
+ }\r
+\r
+ void restoreState(ClusterStateImpl state) {\r
+ ClusterStateImpl current = getState();\r
+ for (CollectorCluster id : current.ids)\r
+ if (!state.ids.contains(id))\r
+ collectorSupport.release(id);\r
+ }\r
+\r
+ class ClusterCollectorImpl implements ClusterCollector {\r
+\r
+ final private ClusterCollectorSupport support;\r
+ private ClusterCollectorPolicy policy;\r
+ ClusterCollectorImpl(ClusterCollectorSupport support) {\r
+ this.support = support;\r
+ }\r
+\r
+ ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {\r
+\r
+ ClusterCollectorPolicy oldPolicy = policy;\r
+ policy = newPolicy;\r
+\r
+ if(policy != null) {\r
+ for (CollectorCluster id : support.getResidentClusters()) {\r
+ policy.added(getClusterByClusterId(id.getClusterId()));\r
+ }\r
+ }\r
+\r
+ support.setPolicy(policy);\r
+\r
+ return oldPolicy;\r
+\r
+ }\r
+\r
+ @Override\r
+ public void collect() {\r
+\r
+ if(policy != null) {\r
+\r
+ release(policy.select());\r
+\r
+ } else {\r
+\r
+ int size = support.getCurrentSize();\r
+ boolean dynamicAllocation = useDynamicAllocation();\r
+ if (dynamicAllocation)\r
+ setAllocateSize(estimateProperAllocation(support));\r
+ if (DebugPolicy.CLUSTER_COLLECTION) {\r
+ System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);\r
+ }\r
+ if (dynamicAllocation) {\r
+ int collectSize = maximumBytes / 2;\r
+ collectSize = Math.min(collectSize, 32 * 1024 * 1024);\r
+ // try to keep allocated clusters below the maximum\r
+ if (maximumBytes - size > collectSize)\r
+ return;\r
+ collectSize += size - maximumBytes;\r
+ collect(collectSize);\r
+ } else {\r
+ // try to keep allocated clusters below the maximum\r
+ if (size < maximumBytes)\r
+ return;\r
+ // shave off 20%\r
+ collect(size-limit);\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+ private boolean useDynamicAllocation() {\r
+ return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));\r
+ }\r
+\r
+ @Override\r
+ public void collect(int target) {\r
+\r
+ if(policy != null) {\r
+\r
+ release(policy.select(target));\r
+\r
+ } else {\r
+\r
+ ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();\r
+\r
+ for (CollectorCluster cluster : support.getResidentClusters()) {\r
+ target -= support.getClusterSize(cluster);\r
+ if (target > 0) {\r
+ toRelease.add(cluster);\r
+ } else {\r
+ break;\r
+ }\r
+ }\r
+\r
+ release(toRelease);\r
+\r
+ if (DebugPolicy.CLUSTER_COLLECTION) {\r
+ System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+ void release(Collection<CollectorCluster> toRelease) {\r
+ for (CollectorCluster id : toRelease) {\r
+ support.release(id);\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);\r
+ ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);\r
+\r
+ void gc() {\r
+ collector.collect();\r
+ }\r
+\r
+ private long newResourceClusterId = Constants.NewClusterId;\r
+ public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();\r
+\r
+ /*\r
+ * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��\r
+ * vain jo k�yt�ss� olevia klustereita\r
+ */\r
+\r
+ ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)\r
+ throws DatabaseException {\r
+ if (Constants.NewClusterId == newResourceClusterId) {\r
+ newResourceClusterId = graphSession.newClusterId();\r
+ return getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);\r
+ } else {\r
+ ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);\r
+ if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {\r
+ newResourceClusterId = graphSession.newClusterId();\r
+ cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);\r
+ }\r
+ return cluster;\r
+ }\r
+ }\r
+\r
+ void flushCluster(GraphSession graphSession) {\r
+// We seem to disagree about this.\r
+// graphSession.newClusterId();\r
+ newResourceClusterId = Constants.NewClusterId;\r
+ }\r
+\r
+ void writeOnlyInvalidate(ClusterI impl) {\r
+ writeOnlyInvalidates.add(impl.getClusterKey());\r
+ }\r
+\r
+ void removeWriteOnlyClusters() {\r
+ for (ClusterI proxy : writeOnlyClusters) {\r
+ if (!(proxy instanceof ClusterImpl))\r
+ throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");\r
+ clusters.freeProxy((ClusterImpl)proxy);\r
+ }\r
+ writeOnlyClusters.clear();\r
+ writeOnlyInvalidates.forEach(new TIntProcedure() {\r
+ @Override\r
+ public boolean execute(int clusterKey) {\r
+ ClusterImpl proxy = clusterArray[clusterKey];\r
+ ClusterUID clusterUID = proxy.getClusterUID();\r
+ System.err.println("writeOnlyInvalidate " + clusterUID);\r
+ clusters.freeProxy(proxy);\r
+ return true;\r
+ }\r
+ });\r
+ writeOnlyInvalidates.clear();\r
+ }\r
+\r
+ public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {\r
+ synchronized (this) {\r
+ return clusters.getClusterByClusterUID(clusterUID);\r
+ }\r
+ }\r
+ public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {\r
+ return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);\r
+ }\r
+ public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {\r
+ return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;\r
+ }\r
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {\r
+ synchronized (this) {\r
+ ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);\r
+ if (null == clusterImpl)\r
+ clusterImpl = clusters.makeProxy(clusterUID);\r
+ return clusterImpl;\r
+ }\r
+ }\r
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {\r
+ synchronized (this) {\r
+ ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);\r
+ if (null == clusterImpl)\r
+ clusterImpl = clusters.makeProxy(id2);\r
+ return clusterImpl;\r
+ }\r
+ }\r
+ ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {\r
+ synchronized (this) {\r
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
+ int clusterKey = 0;\r
+ if (proxy != null)\r
+ if (proxy.isLoaded())\r
+ return proxy;\r
+ else\r
+ clusterKey = proxy.getClusterKey();\r
+ try {\r
+ if (clusterKey == 0) {\r
+ proxy = clusters.makeProxy(clusterId);\r
+ clusterKey = proxy.getClusterKey();\r
+ }\r
+ ClusterImpl ci = tryLoad(clusterId, clusterKey);\r
+ if (null == ci)\r
+ throw new ResourceNotFoundException(clusterId);\r
+ return ci;\r
+ } catch (ClusterDoesNotExistException t) {\r
+ clusters.removeProxy(proxy);\r
+ throw t;\r
+ }\r
+ }\r
+ }\r
+\r
+ ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {\r
+ synchronized (this) {\r
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
+ if (proxy != null)\r
+ return proxy;\r
+ else\r
+ return makeCluster(clusterId, writeOnly);\r
+ }\r
+ }\r
+ ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {\r
+ synchronized (this) {\r
+ ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);\r
+ if (null == proxy)\r
+ throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");\r
+ return proxy;\r
+ }\r
+ }\r
+ long getClusterIdOrCreate(ClusterUID clusterUID) {\r
+ return clusterUID.second;\r
+ }\r
+ final long getClusterIdByResourceKey(final int resourceKey)\r
+ throws DatabaseException {\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
+ ClusterI c = clusterArray[clusterKey];\r
+ if (c == null)\r
+ throw new RuntimeException("No cluster for key " + resourceKey);\r
+ return c.getClusterId();\r
+ }\r
+ final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {\r
+ Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);\r
+ return 0;\r
+ }\r
+ ClusterI c = clusterArray[clusterKey];\r
+ if (c == null) {\r
+ Logger.defaultLogError("No cluster for key " + resourceKey);\r
+ return 0;\r
+ }\r
+ return c.getClusterId();\r
+ }\r
+\r
+ int counter = 0;\r
+\r
+ void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {\r
+ synchronized (this) {\r
+ session.flushCounter = 0;\r
+ session.clusterStream.reallyFlush();\r
+ ClientChangesImpl cs = new ClientChangesImpl(session);\r
+ if (session.clientChanges == null)\r
+ session.clientChanges = cs;\r
+ for (int i=0; i<clusterUID.length; ++i) {\r
+ try {\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);\r
+ ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);\r
+ if (null == oldCluster)\r
+ continue;\r
+ if (!oldCluster.isLoaded())\r
+ continue;\r
+ int clusterKey = oldCluster.getClusterKey();\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
+ continue;\r
+ boolean big = oldCluster instanceof ClusterBig;\r
+ boolean small = oldCluster instanceof ClusterSmall;\r
+ if (!big && !small)\r
+ continue;\r
+// ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);\r
+ Database.Session dbSession = sessionImpl.graphSession.dbSession;\r
+ \r
+ ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {\r
+ \r
+ @Override\r
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
+ ClusterSupport support = sessionImpl.clusterTranslator;\r
+ try {\r
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
+ } catch (DatabaseException e) {\r
+ e.printStackTrace();\r
+ return null;\r
+ }\r
+ }\r
+ \r
+ });\r
+ if (null == newCluster)\r
+ continue;\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ System.err.println("cluster=" + newCluster + " updated.");\r
+ importanceMap.remove(oldCluster.getImportance());\r
+ if (collectorPolicy != null)\r
+ collectorPolicy.removed(oldCluster);\r
+ clusters.replace(newCluster);\r
+ if (!newCluster.isLoaded())\r
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));\r
+ importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));\r
+ if (collectorPolicy != null)\r
+ collectorPolicy.added(newCluster);\r
+ // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.\r
+ refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError("Failed to load cluster in refresh.", t);\r
+ }\r
+ }\r
+ // Fake update of cluster changes.\r
+ QueryProcessor queryProcessor = session.getQueryProvider2();\r
+ WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);\r
+ TaskHelper th = null;\r
+ if (null == session.writeState) {\r
+ th = new TaskHelper("Refresh");\r
+ session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);\r
+ try {\r
+ session.getQueryProvider2().performDirtyUpdates(writer);\r
+ session.fireMetadataListeners(writer, cs);\r
+ session.getQueryProvider2().performScheduledUpdates(writer);\r
+ session.fireReactionsToSynchronize(cs);\r
+ session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
+ session.printDiagnostics();\r
+ } finally {\r
+ if (null != th)\r
+ session.writeState = null;\r
+ }\r
+ }\r
+ }\r
+ }\r
+ final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)\r
+ throws DatabaseException {\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");\r
+ // get cluster change sets\r
+ QueryProcessor queryProcessor = session.getQueryProvider2();\r
+ ClusterChanges cc;\r
+ try {\r
+ cc = session.graphSession.getClusterChanges(clusterUID, csid);\r
+ } catch (Exception e) {\r
+ Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);\r
+ release(clusterId);\r
+ return;\r
+ }\r
+ for (int i=0; i<cc.getResourceIndex().length; ++i) {\r
+ int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);\r
+ ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);\r
+ ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);\r
+ if (null == pCluster)\r
+ continue;\r
+ int pClusterKey = pCluster.getClusterKey();\r
+ int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);\r
+ queryProcessor.updateStatements(resource, predicate);\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");\r
+ }\r
+ for (int i=0; i<cc.getValueIndex().length; ++i) {\r
+ int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);\r
+ queryProcessor.updateValue(resource);\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ System.err.println("value " + cc.getValueIndex()[i] + " changed.");\r
+ }\r
+ }\r
+ final void refreshImportance(ClusterImpl c) {\r
+\r
+ if (c.isWriteOnly())\r
+ return;\r
+\r
+ importanceMap.remove(c.getImportance());\r
+ if(collectorPolicy != null) collectorPolicy.removed(c);\r
+\r
+ long newImportance = timeCounter();\r
+// System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);\r
+ c.setImportance(newImportance);\r
+ if (!c.isLoaded())\r
+ Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));\r
+\r
+ importanceMap.put(c.getImportance(), new ImportanceEntry(c));\r
+ if(collectorPolicy != null) collectorPolicy.added(c);\r
+\r
+ }\r
+\r
+ static long loadTime = 0;\r
+\r
+ @SuppressWarnings("unchecked")\r
+ public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
+ ClusterI cluster = clusterArray[clusterKey];\r
+ if (cluster == null)\r
+ throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);\r
+ return (T)cluster;\r
+ }\r
+\r
+ TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();\r
+ int clusterLoadCounter = 0;\r
+\r
+ @SuppressWarnings("unchecked")\r
+ public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
+ ClusterI c = clusterArray[clusterKey];\r
+ if (c == null)\r
+ return null;\r
+ if (c.isLoaded()) {\r
+ if ((counter++ & 4095) == 0)\r
+ refreshImportance((ClusterImpl) c);\r
+ return (T) c;\r
+ }\r
+ if (!(c instanceof ClusterSmall)) {\r
+ Logger.defaultLogError("Proxy must be instance of ClusterSmall");\r
+ return null;\r
+ }\r
+ ClusterI cluster;\r
+ ClusterSmall cs = (ClusterSmall) c;\r
+ try {\r
+ if(DebugPolicy.REPORT_CLUSTER_LOADING) {\r
+ long start = System.nanoTime();\r
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
+ long load = System.nanoTime()-start;\r
+ loadTime += load;\r
+ if(DebugPolicy.REPORT_CLUSTER_LOADING) {\r
+ int was = clusterLoadHistogram.get(cluster.getClusterId());\r
+ clusterLoadHistogram.put(cluster.getClusterId(), was+1);\r
+ clusterLoadCounter++;\r
+ String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();\r
+ if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {\r
+ new Exception(text).printStackTrace();\r
+ } else {\r
+ System.err.println(text);\r
+ }\r
+ }\r
+ } else {\r
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
+ }\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogError(e);\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ e.printStackTrace();\r
+ String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey\r
+ + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));\r
+ // TODO: this jams the system => needs refactoring.\r
+ throw new RuntimeDatabaseException(msg, e);\r
+ }\r
+ return (T) cluster;\r
+ }\r
+\r
+ @SuppressWarnings("unchecked")\r
+ final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);\r
+ if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))\r
+ throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");\r
+ ClusterI c = clusterArray[clusterKey];\r
+ if (c == null)\r
+ throw new RuntimeException("No cluster for resource key " + resourceKey);\r
+ if (c.isLoaded()) {\r
+ if ((counter++ & 4095) == 0)\r
+ refreshImportance((ClusterImpl) c);\r
+ return (T) c;\r
+ }\r
+ if (!(c instanceof ClusterSmall)) {\r
+ Logger.defaultLogError("Proxy must be instance of ClusterSmall");\r
+ return null;\r
+ }\r
+ ClusterI cluster;\r
+ ClusterSmall cs = (ClusterSmall) c;\r
+ try {\r
+// System.err.println("Load2 " + resourceKey);\r
+ cluster = load2(cs.getClusterId(), cs.getClusterKey());\r
+ } catch (DatabaseException e) {\r
+ if (DebugPolicy.REPORT_CLUSTER_EVENTS)\r
+ e.printStackTrace();\r
+ int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);\r
+ long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);\r
+ String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;\r
+ Logger.defaultLogError(msg, e);\r
+ c.setDeleted(true, null);\r
+ return (T)c;\r
+ }\r
+ return (T) cluster;\r
+ }\r
+\r
+ void printDebugInfo(ClusterSupport support)\r
+ throws DatabaseException {\r
+ final int SIZE = clusters.size();\r
+ long sum = 0;\r
+ for (int i = 1; i < SIZE; ++i) {\r
+ ClusterI c = clusterArray[i];\r
+ c.getNumberOfResources(support);\r
+ long size = c.getUsedSpace();\r
+ System.out.println("cluster=" + c.getClusterId() + " size=" + size);\r
+ c.printDebugInfo("koss: ", support);\r
+ sum += size;\r
+ }\r
+ System.out.println("Total number of clusters " + SIZE);\r
+ System.out.println("Total cluster size " + sum);\r
+ }\r
+\r
+ Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();\r
+\r
+ public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {\r
+ ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];\r
+ if (curr.isLoaded())\r
+ return curr;\r
+\r
+// getPrefetched();\r
+//\r
+// curr = (ClusterImpl) clusterArray[clusterKey];\r
+// if (curr.isLoaded())\r
+// return curr;\r
+\r
+ final Semaphore s = new Semaphore(0);\r
+ final DatabaseException[] ex = new DatabaseException[1];\r
+\r
+ load2(clusterId, clusterKey, new Callback<DatabaseException>() {\r
+ @Override\r
+ public void run(DatabaseException e) {\r
+ ex[0] = e;\r
+ s.release();\r
+ }\r
+ });\r
+ try {\r
+ s.acquire();\r
+ } catch (InterruptedException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+ if (null != ex[0])\r
+ throw ex[0];\r
+ if (Development.DEVELOPMENT) {\r
+ if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {\r
+ try {\r
+ ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];\r
+ if (loaded instanceof ClusterSmall)\r
+ ((ClusterSmall) loaded).check();\r
+ else if (loaded instanceof ClusterBig)\r
+ ((ClusterBig) loaded).check();\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ \r
+ validate(clusterKey);\r
+ \r
+ return (ClusterImpl) clusterArray[clusterKey];\r
+ }\r
+ \r
+ void validate(int clusterKey) {\r
+ \r
+// if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {\r
+// \r
+// ClusterImpl server = (ClusterImpl)clusterArray[clusterKey]; \r
+// String sd = server.dump(sessionImpl.clusterTranslator);\r
+// GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);\r
+// ClusterImpl memory = gci.clusters.get(server.clusterUID);\r
+// if(memory == null)\r
+// System.err.println("ad");\r
+// String md = memory.dump(gci.support);\r
+// \r
+// if(!sd.equals(md)) {\r
+// \r
+// int diffPos = 0;\r
+// int minLength = Math.min(sd.length(), md.length());\r
+// for (int i = 0; i < minLength; i++) {\r
+// if (sd.charAt(i) != md.charAt(i)) {\r
+// diffPos = i;\r
+// break;\r
+// }\r
+// }\r
+// \r
+// int start = Math.max(diffPos-200, 0);\r
+// int end = Math.min(minLength-1, diffPos+200);\r
+//\r
+// System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");\r
+// System.err.println("== SESSION == ");\r
+// System.err.println(sd.substring(start, end));\r
+// System.err.println("== MEM == ");\r
+// System.err.println(md.substring(start, end));\r
+// System.err.println("== CLUSTER DIFFERENCE ENDS == ");\r
+// \r
+// throw new IllegalStateException();\r
+// }\r
+// \r
+// }\r
+ }\r
+\r
+ public synchronized ClusterImpl getPrefetched() {\r
+ synchronized(prefetch) {\r
+ return null;\r
+ }\r
+ }\r
+\r
+ public synchronized void load2(long clusterId, int clusterKey, final Callback<DatabaseException> runnable) {\r
+\r
+ assert (Constants.ReservedClusterId != clusterId);\r
+\r
+ ClusterImpl cluster = null;\r
+ DatabaseException e = null;\r
+\r
+ try {\r
+\r
+ ClusterUID clusterUID = clusters.makeClusterUID(clusterId);\r
+// cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);\r
+ Database.Session session = sessionImpl.graphSession.dbSession;\r
+// cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);\r
+// cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);\r
+ \r
+// cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);\r
+ \r
+ cluster = session.clone(clusterUID, new ClusterCreator() {\r
+ \r
+ @Override\r
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
+ ClusterSupport support = sessionImpl.clusterTranslator;\r
+ try {\r
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
+ } catch (DatabaseException e) {\r
+ e.printStackTrace();\r
+ return null;\r
+ }\r
+ }\r
+ \r
+ });\r
+ \r
+ } catch (Throwable t) {\r
+ // It's totally legal to call for non-existing cluster.\r
+ // Sometimes this indicates an error, though.\r
+ Logger.getDefault().logInfo("Load cluster failed.", t);\r
+ if (t instanceof DatabaseException)\r
+ e = (DatabaseException) t;\r
+ else\r
+ e = new DatabaseException("Load cluster failed.", t);\r
+ }\r
+ if (null == cluster) {\r
+ runnable.run(e);\r
+ return;\r
+ }\r
+\r
+// new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();\r
+\r
+ // Can not be called with null argument.\r
+ replaceCluster(cluster);\r
+ sessionImpl.onClusterLoaded(clusterId);\r
+ runnable.run(null);\r
+\r
+ }\r
+\r
+ public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {\r
+\r
+ assert (Constants.ReservedClusterId != clusterId);\r
+\r
+ ClusterUID clusterUID = clusters.makeClusterUID(clusterId);\r
+ \r
+ Database.Session session = sessionImpl.graphSession.dbSession;\r
+ \r
+ ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {\r
+ \r
+ @Override\r
+ public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {\r
+ ClusterSupport support = sessionImpl.clusterTranslator;\r
+ try {\r
+ return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));\r
+ } catch (DatabaseException e) {\r
+ e.printStackTrace();\r
+ return null;\r
+ }\r
+ }\r
+ \r
+ });\r
+ if (null == clusterArray[clusterKey])\r
+ clusterArray[clusterKey] = cluster;\r
+ else\r
+ replaceCluster(cluster);\r
+ sessionImpl.onClusterLoaded(clusterId);\r
+ \r
+ validate(clusterKey);\r
+ \r
+ return cluster;\r
+ }\r
+\r
+ public Collection<ClusterI> getClusters() {\r
+ ArrayList<ClusterI> result = new ArrayList<ClusterI>();\r
+ for (int i = 0; i < clusterArray.length; i++) {\r
+ ClusterI cluster = clusterArray[i];\r
+ if (cluster != null)\r
+ result.add(cluster);\r
+ }\r
+ return result;\r
+ }\r
+\r
+ public int size() {\r
+ return clusters.size();\r
+ }\r
+\r
+ public long timeCounter() {\r
+ return timeCounter++;\r
+ }\r
+\r
+ public boolean hasVirtual(int index) {\r
+ return virtuals[index];\r
+ }\r
+\r
+ public void markVirtual(int index) {\r
+ virtuals[index] = true;\r
+ }\r
+\r
+ public ClusterImpl[] getClusterArray() {\r
+ return clusterArray;\r
+ }\r
+\r
+ public boolean isImmutable(int id) {\r
+\r
+ int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);\r
+ Boolean exist = immutables[clusterKey];\r
+ if(exist != null) return exist;\r
+\r
+ ClusterI cluster = getClusterByResourceKey(id);\r
+ boolean result = cluster == null ? false : cluster.getImmutable();\r
+\r
+ markImmutable(cluster, result);\r
+ return result;\r
+\r
+ }\r
+\r
+ public void markImmutable(ClusterI cluster, boolean value) {\r
+ immutables[cluster.getClusterKey()] = value;\r
+ }\r
+ @Override\r
+ public int getClusterKeyByUID(long first, long second) throws DatabaseException {\r
+ return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));\r
+ }\r
+\r
+}\r