/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Semaphore; import org.simantics.databoard.Bindings; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.DevelopmentKeys; import org.simantics.db.SessionVariables; import org.simantics.db.Database.Session.ClusterChanges; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.ClusterDoesNotExistException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterSupport; import org.simantics.db.impl.ClusterTraitsBase; import org.simantics.db.impl.IClusterTable; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.procore.cluster.ClusterBig; import org.simantics.db.procore.cluster.ClusterImpl; import org.simantics.db.procore.cluster.ClusterSmall; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.procore.protocol.Constants; import org.simantics.db.service.ClusterCollectorPolicy; import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster; import org.simantics.db.service.ClusterUID; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Callback; import fi.vtt.simantics.procore.DebugPolicy; import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl; import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper; import gnu.trove.map.hash.TLongIntHashMap; import gnu.trove.map.hash.TLongObjectHashMap; import gnu.trove.procedure.TIntProcedure; import gnu.trove.procedure.TLongObjectProcedure; import gnu.trove.procedure.TObjectProcedure; import gnu.trove.set.hash.TIntHashSet; public final class ClusterTable implements IClusterTable { int maximumBytes = 128 * 1024 * 1024; int limit = (int)(0.8*(double)maximumBytes); long timeCounter = 0; final private SessionImplSocket sessionImpl; final private ArrayList writeOnlyClusters = new ArrayList(); final private TIntHashSet writeOnlyInvalidates = new TIntHashSet(); final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize(); final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE]; final private Boolean[] immutables = new Boolean[ARRAY_SIZE]; final private boolean[] virtuals = new boolean[ARRAY_SIZE]; static class ImportanceEntry implements CollectorCluster { final public long importance; final public long clusterId; public ImportanceEntry(ClusterImpl impl) { this(impl.getImportance(), impl.getClusterId()); } public ImportanceEntry(long importance, long clusterId) { this.importance = importance; this.clusterId = clusterId; } @Override public long getImportance() { return importance; } @Override public long getClusterId() { return clusterId; } } private class Clusters { // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur) final public TLongObjectHashMap hashMap = new TLongObjectHashMap(2 * ARRAY_SIZE); //private final HashMap clusterU2I = new HashMap(); // Maps cluster UID to cluster. private Clusters() { clear(); } private void clear() { hashMap.clear(); // clusterU2I.clear(); hashMap.put(0, null); // reserved for null value // clusterU2I.put(ClusterUID.make(0, 0), null); } private int size() { return hashMap.size(); } private ClusterImpl getClusterByClusterId(long clusterId) { return hashMap.get(clusterId); } private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) { return getClusterByClusterId(clusterUID.second); //return clusterU2I.get(clusterUID); } private ClusterImpl getClusterByClusterUID(long id1, long id2) { return getClusterByClusterId(id2); } private ClusterImpl makeProxy(long clusterId) { return makeProxy(ClusterUID.make(0, clusterId)); } private ClusterImpl makeProxy(ClusterUID clusterUID) { ClusterImpl proxy = hashMap.get(clusterUID.second); if (null != proxy) return proxy; int clusterKey = hashMap.size(); ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator); if (sentinel.clusterId != sentinel.clusterUID.second) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); create(sentinel); return sentinel; } private void replace(ClusterImpl proxy) { ClusterImpl old = clusterArray[proxy.clusterKey]; create(proxy); if (null != old) { if (old.clusterKey != proxy.clusterKey) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); if (old.clusterId != proxy.clusterId) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); if (!old.clusterUID.equals(proxy.clusterUID)) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); } } private ClusterSmall freeProxy(ClusterImpl proxy) { ClusterImpl clusterImpl = hashMap.get(proxy.clusterId); if (null == clusterImpl) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId); // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID ); // if (clusterImpl != clusterImpl2) // throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); if (proxy.clusterId != clusterImpl.clusterId) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); if (proxy.clusterKey != clusterImpl.clusterKey) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator); return (ClusterSmall)create(sentinel); } private ClusterImpl create(ClusterImpl clusterImpl) { hashMap.put(clusterImpl.clusterId, clusterImpl); // clusterU2I.put(clusterImpl.clusterUID, clusterImpl); clusterArray[clusterImpl.clusterKey] = clusterImpl; return clusterImpl; } private void forEachValue(TObjectProcedure procedure) { hashMap.forEachValue(procedure); } private void forEachEntry(TLongObjectProcedure procedure) { hashMap.forEachEntry(procedure); } private ClusterUID makeClusterUID(long clusterId) { return ClusterUID.make(0, clusterId); } private void removeProxy(ClusterImpl proxy) { hashMap.remove(proxy.clusterId); clusterArray[proxy.clusterKey] = null; } } final public TreeMap importanceMap = new TreeMap(); private ClusterCollectorPolicy collectorPolicy; private boolean dirtySizeInBytes = true; private long sizeInBytes = 0; private final Clusters clusters; ClusterUID makeClusterUID(long clusterId) { return clusters.makeClusterUID(clusterId); } public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException { int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey); return clusterArray[clusterKey].clusterUID; } ClusterTable(SessionImplSocket sessionImpl, File folderPath) { this.sessionImpl = sessionImpl; clusters = new Clusters(); } void dispose() { clusters.clear(); importanceMap.clear(); } public void setCollectorPolicy(ClusterCollectorPolicy policy) { this.collectorPolicy = policy; } /** * Sets Cluster Table allocation size by percentage of maximum usable * memory. Allocation is limited between 5% and 50% of maximum memory. * * @param precentage */ private void setAllocateSize(double percentage) { percentage = Math.max(percentage, 0.05); percentage = Math.min(percentage, 0.5); maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory()); } private double estimateProperAllocation(ClusterCollectorSupport support) { long max = Runtime.getRuntime().maxMemory(); long free = Runtime.getRuntime().freeMemory(); long total = Runtime.getRuntime().totalMemory(); long realFree = max - total + free; // amount of free memory long current = support.getCurrentSize(); // currently allocated cache long inUseTotal = max - realFree; // currently used memory long inUseOther = inUseTotal - current; // memory that the application uses (without the cache) double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses double aimFree = 0.2; // percentage of maximum heap that we try to keep free double estimate = 1.0 - otherUsePercentage - aimFree; estimate = Math.min(estimate, 0.5); estimate = Math.max(estimate, 0.05); // System.out.println("Estimated allocation percentage " + estimate + // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " + // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes); return estimate; } void checkCollect() { collector.collect(); } synchronized ClusterImpl getClusterByClusterId(long clusterId) { return clusters.getClusterByClusterId(clusterId); } ClusterBase getClusterByClusterKey(int clusterKey) { return clusterArray[clusterKey]; } synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) { if (clusterUID.second != clusterId) throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId); return clusters.makeProxy(clusterUID); } synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) { checkCollect(); ClusterImpl proxy = clusters.getClusterByClusterId(clusterId); if (null != proxy) return proxy; ClusterUID clusterUID = ClusterUID.make(0, clusterId); int clusterKey = clusters.size(); // new key if (writeOnly) { proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl); writeOnlyClusters.add(proxy); return clusters.create(proxy); } else { ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator); clusters.create(cluster); if (!cluster.isLoaded()) Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded")); importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster)); if(collectorPolicy != null) collectorPolicy.added(cluster); return cluster; } } synchronized void replaceCluster(ClusterI cluster) { checkCollect(); int clusterKey = cluster.getClusterKey(); ClusterI existing = clusterArray[clusterKey]; if (existing.hasVirtual()) cluster.markVirtual(); importanceMap.remove(existing.getImportance()); if (collectorPolicy != null) collectorPolicy.removed((ClusterImpl)existing); clusters.replace((ClusterImpl)cluster); if (!cluster.isLoaded()) Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded")); importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster)); if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster); if (!dirtySizeInBytes && !existing.isLoaded()) { sizeInBytes += cluster.getCachedSize(); } } synchronized void release(CollectorCluster cluster) { importanceMap.remove(cluster.getImportance()); release(cluster.getClusterId()); } synchronized void release(long clusterId) { ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId); if (null == clusterImpl) return; if(!clusterImpl.isLoaded() || clusterImpl.isEmpty()) return; clusters.freeProxy(clusterImpl); importanceMap.remove(clusterImpl.getImportance()); if (collectorPolicy != null) collectorPolicy.removed(clusterImpl); if (sessionImpl.writeState != null) sessionImpl.clusterStream.flush(clusterImpl.clusterUID); if (!dirtySizeInBytes) { sizeInBytes -= clusterImpl.getCachedSize(); } } synchronized void compact(long id) { ClusterI impl = clusters.getClusterByClusterId(id); if (impl != null) impl.compact(); } void updateSize() { // cachedSize = getSizeInBytes(); } double getLoadProbability() { // This can currently cause stack overflow return 1.0; // if(cachedSize < SLOW_LIMIT) return 1.0; // if(cachedSize > HIGH_LIMIT) return 1e-2; // // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT - // SLOW_LIMIT); // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2; // return val; } class SizeProcedure implements TObjectProcedure { public long result = 0; @Override public boolean execute(ClusterImpl cluster) { if (cluster != null) { try { if (cluster.isLoaded() && !cluster.isEmpty()) { result += cluster.getCachedSize(); } } catch (Throwable t) { Logger.defaultLogError(t); } } return true; } public void clear() { result = 0; } }; private SizeProcedure sizeProcedure = new SizeProcedure(); long getSizeInBytes() { if (dirtySizeInBytes) { sizeProcedure.clear(); clusters.forEachValue(sizeProcedure); sizeInBytes = sizeProcedure.result; // System.err.println("recomputed size of clusterTable => " + sizeInBytes); setDirtySizeInBytes(false); } return sizeInBytes; } public void setDirtySizeInBytes(boolean value) { dirtySizeInBytes = value; } ClusterStateImpl getState() { final ClusterStateImpl result = new ClusterStateImpl(); clusters.forEachEntry(new TLongObjectProcedure() { @Override public boolean execute(long arg0, ClusterImpl arg1) { if (arg1 == null) return true; if (arg1.isLoaded() && !arg1.isEmpty()) { result.ids.add(new ImportanceEntry(arg1)); } return true; } }); return result; } void restoreState(ClusterStateImpl state) { ClusterStateImpl current = getState(); for (CollectorCluster id : current.ids) if (!state.ids.contains(id)) collectorSupport.release(id); } class ClusterCollectorImpl implements ClusterCollector { final private ClusterCollectorSupport support; private ClusterCollectorPolicy policy; ClusterCollectorImpl(ClusterCollectorSupport support) { this.support = support; } ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) { ClusterCollectorPolicy oldPolicy = policy; policy = newPolicy; if(policy != null) { for (CollectorCluster id : support.getResidentClusters()) { policy.added(getClusterByClusterId(id.getClusterId())); } } support.setPolicy(policy); return oldPolicy; } @Override public void collect() { if(policy != null) { release(policy.select()); } else { int size = support.getCurrentSize(); boolean dynamicAllocation = useDynamicAllocation(); if (dynamicAllocation) setAllocateSize(estimateProperAllocation(support)); if (DebugPolicy.CLUSTER_COLLECTION) { System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes); } if (dynamicAllocation) { int collectSize = maximumBytes / 2; collectSize = Math.min(collectSize, 32 * 1024 * 1024); // try to keep allocated clusters below the maximum if (maximumBytes - size > collectSize) return; collectSize += size - maximumBytes; collect(collectSize); } else { // try to keep allocated clusters below the maximum if (size < maximumBytes) return; // shave off 20% collect(size-limit); } } } private boolean useDynamicAllocation() { return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc")); } @Override public void collect(int target) { if(policy != null) { release(policy.select(target)); } else { ArrayList toRelease = new ArrayList(); for (CollectorCluster cluster : support.getResidentClusters()) { target -= support.getClusterSize(cluster); if (target > 0) { toRelease.add(cluster); } else { break; } } release(toRelease); if (DebugPolicy.CLUSTER_COLLECTION) { System.out.println("Cluster collector finished, current size = " + support.getCurrentSize()); } } } void release(Collection toRelease) { for (CollectorCluster id : toRelease) { support.release(id); } } } private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this); ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport); void gc() { collector.collect(); } private long newResourceClusterId = Constants.NewClusterId; public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources(); /* * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt�� * vain jo k�yt�ss� olevia klustereita */ ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly) throws DatabaseException { if (Constants.NewClusterId == newResourceClusterId) { newResourceClusterId = graphSession.newClusterId(); return getClusterByClusterIdOrMake(newResourceClusterId, writeOnly); } else { ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId); if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) { newResourceClusterId = graphSession.newClusterId(); cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly); } return cluster; } } void flushCluster(GraphSession graphSession) { // We seem to disagree about this. // graphSession.newClusterId(); newResourceClusterId = Constants.NewClusterId; } void writeOnlyInvalidate(ClusterI impl) { writeOnlyInvalidates.add(impl.getClusterKey()); } void removeWriteOnlyClusters() { for (ClusterI proxy : writeOnlyClusters) { if (!(proxy instanceof ClusterImpl)) throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support."); clusters.freeProxy((ClusterImpl)proxy); } writeOnlyClusters.clear(); writeOnlyInvalidates.forEach(new TIntProcedure() { @Override public boolean execute(int clusterKey) { ClusterImpl proxy = clusterArray[clusterKey]; ClusterUID clusterUID = proxy.getClusterUID(); System.err.println("writeOnlyInvalidate " + clusterUID); clusters.freeProxy(proxy); return true; } }); writeOnlyInvalidates.clear(); } public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) { synchronized (this) { return clusters.getClusterByClusterUID(clusterUID); } } public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) { return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second); } public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) { return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey; } public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) { synchronized (this) { ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID); if (null == clusterImpl) clusterImpl = clusters.makeProxy(clusterUID); return clusterImpl; } } public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) { synchronized (this) { ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2); if (null == clusterImpl) clusterImpl = clusters.makeProxy(id2); return clusterImpl; } } ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException { synchronized (this) { ClusterImpl proxy = clusters.getClusterByClusterId(clusterId); int clusterKey = 0; if (proxy != null) if (proxy.isLoaded()) return proxy; else clusterKey = proxy.getClusterKey(); try { if (clusterKey == 0) { proxy = clusters.makeProxy(clusterId); clusterKey = proxy.getClusterKey(); } ClusterImpl ci = tryLoad(clusterId, clusterKey); if (null == ci) throw new ResourceNotFoundException(clusterId); return ci; } catch (ClusterDoesNotExistException t) { clusters.removeProxy(proxy); throw t; } } } ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) { synchronized (this) { ClusterImpl proxy = clusters.getClusterByClusterId(clusterId); if (proxy != null) return proxy; else return makeCluster(clusterId, writeOnly); } } ClusterImpl getClusterByClusterIdOrThrow(long clusterId) { synchronized (this) { ClusterImpl proxy = clusters.getClusterByClusterId(clusterId); if (null == proxy) throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created."); return proxy; } } long getClusterIdOrCreate(ClusterUID clusterUID) { return clusterUID.second; } final long getClusterIdByResourceKey(final int resourceKey) throws DatabaseException { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey); if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); ClusterI c = clusterArray[clusterKey]; if (c == null) throw new RuntimeException("No cluster for key " + resourceKey); return c.getClusterId(); } final long getClusterIdByResourceKeyNoThrow(final int resourceKey) { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) { Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey); return 0; } ClusterI c = clusterArray[clusterKey]; if (c == null) { Logger.defaultLogError("No cluster for key " + resourceKey); return 0; } return c.getClusterId(); } int counter = 0; void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) { synchronized (this) { session.flushCounter = 0; session.clusterStream.reallyFlush(); ClientChangesImpl cs = new ClientChangesImpl(session); if (session.clientChanges == null) session.clientChanges = cs; for (int i=0; i(writer, th.writeTraits, th.sema, th.proc); try { session.getQueryProvider2().performDirtyUpdates(writer); session.fireMetadataListeners(writer, cs); session.getQueryProvider2().performScheduledUpdates(writer); session.fireReactionsToSynchronize(cs); session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES); session.printDiagnostics(); } finally { if (null != th) session.writeState = null; } } } } final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey) throws DatabaseException { if (DebugPolicy.REPORT_CLUSTER_EVENTS) System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated."); // get cluster change sets QueryProcessor queryProcessor = session.getQueryProvider2(); ClusterChanges cc; try { cc = session.graphSession.getClusterChanges(clusterUID, csid); } catch (Exception e) { Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e); release(clusterId); return; } for (int i=0; i " + newImportance); c.setImportance(newImportance); if (!c.isLoaded()) Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded")); importanceMap.put(c.getImportance(), new ImportanceEntry(c)); if(collectorPolicy != null) collectorPolicy.added(c); } static long loadTime = 0; @SuppressWarnings("unchecked") public final T getClusterProxyByResourceKey(final int resourceKey) { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); ClusterI cluster = clusterArray[clusterKey]; if (cluster == null) throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey); return (T)cluster; } TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap(); int clusterLoadCounter = 0; @SuppressWarnings("unchecked") public final T getClusterByResourceKey(final int resourceKey) { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); ClusterI c = clusterArray[clusterKey]; if (c == null) return null; if (c.isLoaded()) { if ((counter++ & 4095) == 0) refreshImportance((ClusterImpl) c); return (T) c; } if (!(c instanceof ClusterSmall)) { Logger.defaultLogError("Proxy must be instance of ClusterSmall"); return null; } ClusterI cluster; ClusterSmall cs = (ClusterSmall) c; try { if(DebugPolicy.REPORT_CLUSTER_LOADING) { long start = System.nanoTime(); cluster = load2(cs.getClusterId(), cs.getClusterKey()); long load = System.nanoTime()-start; loadTime += load; if(DebugPolicy.REPORT_CLUSTER_LOADING) { int was = clusterLoadHistogram.get(cluster.getClusterId()); clusterLoadHistogram.put(cluster.getClusterId(), was+1); clusterLoadCounter++; String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance(); if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) { new Exception(text).printStackTrace(); } else { System.err.println(text); } } } else { cluster = load2(cs.getClusterId(), cs.getClusterKey()); } } catch (DatabaseException e) { Logger.defaultLogError(e); if (DebugPolicy.REPORT_CLUSTER_EVENTS) e.printStackTrace(); String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535)))); // TODO: this jams the system => needs refactoring. throw new RuntimeDatabaseException(msg, e); } return (T) cluster; } @SuppressWarnings("unchecked") final T checkedGetClusterByResourceKey(final int resourceKey) { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey); if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) throw new RuntimeException("Tried to get a persistent cluster for a virtual resource."); ClusterI c = clusterArray[clusterKey]; if (c == null) throw new RuntimeException("No cluster for resource key " + resourceKey); if (c.isLoaded()) { if ((counter++ & 4095) == 0) refreshImportance((ClusterImpl) c); return (T) c; } if (!(c instanceof ClusterSmall)) { Logger.defaultLogError("Proxy must be instance of ClusterSmall"); return null; } ClusterI cluster; ClusterSmall cs = (ClusterSmall) c; try { // System.err.println("Load2 " + resourceKey); cluster = load2(cs.getClusterId(), cs.getClusterKey()); } catch (DatabaseException e) { if (DebugPolicy.REPORT_CLUSTER_EVENTS) e.printStackTrace(); int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey); long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex); String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId; Logger.defaultLogError(msg, e); c.setDeleted(true, null); return (T)c; } return (T) cluster; } void printDebugInfo(ClusterSupport support) throws DatabaseException { final int SIZE = clusters.size(); long sum = 0; for (int i = 1; i < SIZE; ++i) { ClusterI c = clusterArray[i]; c.getNumberOfResources(support); long size = c.getUsedSpace(); System.out.println("cluster=" + c.getClusterId() + " size=" + size); c.printDebugInfo("koss: ", support); sum += size; } System.out.println("Total number of clusters " + SIZE); System.out.println("Total cluster size " + sum); } Map prefetch = new HashMap(); public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException { ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey]; if (curr.isLoaded()) return curr; // getPrefetched(); // // curr = (ClusterImpl) clusterArray[clusterKey]; // if (curr.isLoaded()) // return curr; final Semaphore s = new Semaphore(0); final DatabaseException[] ex = new DatabaseException[1]; load2(clusterId, clusterKey, new Callback() { @Override public void run(DatabaseException e) { ex[0] = e; s.release(); } }); try { s.acquire(); } catch (InterruptedException e) { Logger.defaultLogError(e); } if (null != ex[0]) throw ex[0]; if (Development.DEVELOPMENT) { if (Development.getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) { try { ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey]; if (loaded instanceof ClusterSmall) ((ClusterSmall) loaded).check(); else if (loaded instanceof ClusterBig) ((ClusterBig) loaded).check(); } catch (Throwable t) { t.printStackTrace(); } } } validate(clusterKey); return (ClusterImpl) clusterArray[clusterKey]; } void validate(int clusterKey) { // if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) { // // ClusterImpl server = (ClusterImpl)clusterArray[clusterKey]; // String sd = server.dump(sessionImpl.clusterTranslator); // GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient); // ClusterImpl memory = gci.clusters.get(server.clusterUID); // if(memory == null) // System.err.println("ad"); // String md = memory.dump(gci.support); // // if(!sd.equals(md)) { // // int diffPos = 0; // int minLength = Math.min(sd.length(), md.length()); // for (int i = 0; i < minLength; i++) { // if (sd.charAt(i) != md.charAt(i)) { // diffPos = i; // break; // } // } // // int start = Math.max(diffPos-200, 0); // int end = Math.min(minLength-1, diffPos+200); // // System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == "); // System.err.println("== SESSION == "); // System.err.println(sd.substring(start, end)); // System.err.println("== MEM == "); // System.err.println(md.substring(start, end)); // System.err.println("== CLUSTER DIFFERENCE ENDS == "); // // throw new IllegalStateException(); // } // // } } public synchronized ClusterImpl getPrefetched() { synchronized(prefetch) { return null; } } public synchronized void load2(long clusterId, int clusterKey, final Callback runnable) { assert (Constants.ReservedClusterId != clusterId); ClusterImpl cluster = null; DatabaseException e = null; try { ClusterUID clusterUID = clusters.makeClusterUID(clusterId); // cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey); Database.Session session = sessionImpl.graphSession.dbSession; // cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator); // cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator); // cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator); cluster = session.clone(clusterUID, new ClusterCreator() { @Override public T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) { ClusterSupport support = sessionImpl.clusterTranslator; try { return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid)); } catch (DatabaseException e) { e.printStackTrace(); return null; } } }); } catch (Throwable t) { // It's totally legal to call for non-existing cluster. // Sometimes this indicates an error, though. Logger.getDefault().logInfo("Load cluster failed.", t); if (t instanceof DatabaseException) e = (DatabaseException) t; else e = new DatabaseException("Load cluster failed.", t); } if (null == cluster) { runnable.run(e); return; } // new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace(); // Can not be called with null argument. replaceCluster(cluster); sessionImpl.onClusterLoaded(clusterId); runnable.run(null); } public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException { assert (Constants.ReservedClusterId != clusterId); ClusterUID clusterUID = clusters.makeClusterUID(clusterId); Database.Session session = sessionImpl.graphSession.dbSession; ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() { @Override public T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) { ClusterSupport support = sessionImpl.clusterTranslator; try { return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid)); } catch (DatabaseException e) { e.printStackTrace(); return null; } } }); if (null == clusterArray[clusterKey]) clusterArray[clusterKey] = cluster; else replaceCluster(cluster); sessionImpl.onClusterLoaded(clusterId); validate(clusterKey); return cluster; } public Collection getClusters() { ArrayList result = new ArrayList(); for (int i = 0; i < clusterArray.length; i++) { ClusterI cluster = clusterArray[i]; if (cluster != null) result.add(cluster); } return result; } public int size() { return clusters.size(); } public long timeCounter() { return timeCounter++; } public boolean hasVirtual(int index) { return virtuals[index]; } public void markVirtual(int index) { virtuals[index] = true; } public ClusterImpl[] getClusterArray() { return clusterArray; } public boolean isImmutable(int id) { int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id); Boolean exist = immutables[clusterKey]; if(exist != null) return exist; ClusterI cluster = getClusterByResourceKey(id); boolean result = cluster == null ? false : cluster.getImmutable(); markImmutable(cluster, result); return result; } public void markImmutable(ClusterI cluster, boolean value) { immutables[cluster.getClusterKey()] = value; } @Override public int getClusterKeyByUID(long first, long second) throws DatabaseException { return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second)); } }