1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashMap;
19 import java.util.TreeMap;
20 import java.util.concurrent.Semaphore;
21 import java.util.function.Consumer;
23 import org.simantics.databoard.Bindings;
24 import org.simantics.db.ClusterCreator;
25 import org.simantics.db.Database;
26 import org.simantics.db.Database.Session.ClusterChanges;
27 import org.simantics.db.DevelopmentKeys;
28 import org.simantics.db.SessionVariables;
29 import org.simantics.db.common.utils.Logger;
30 import org.simantics.db.exception.ClusterDoesNotExistException;
31 import org.simantics.db.exception.DatabaseException;
32 import org.simantics.db.exception.ResourceNotFoundException;
33 import org.simantics.db.exception.RuntimeDatabaseException;
34 import org.simantics.db.impl.ClusterBase;
35 import org.simantics.db.impl.ClusterI;
36 import org.simantics.db.impl.ClusterSupport;
37 import org.simantics.db.impl.ClusterTraitsBase;
38 import org.simantics.db.impl.IClusterTable;
39 import org.simantics.db.impl.graph.WriteGraphImpl;
40 import org.simantics.db.impl.query.QueryProcessor;
41 import org.simantics.db.procore.cluster.ClusterBig;
42 import org.simantics.db.procore.cluster.ClusterImpl;
43 import org.simantics.db.procore.cluster.ClusterSmall;
44 import org.simantics.db.procore.cluster.ClusterTraits;
45 import org.simantics.db.procore.protocol.Constants;
46 import org.simantics.db.service.ClusterCollectorPolicy;
47 import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
48 import org.simantics.db.service.ClusterUID;
49 import org.simantics.utils.Development;
51 import fi.vtt.simantics.procore.DebugPolicy;
52 import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
53 import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
54 import gnu.trove.map.hash.TLongIntHashMap;
55 import gnu.trove.map.hash.TLongObjectHashMap;
56 import gnu.trove.procedure.TIntProcedure;
57 import gnu.trove.procedure.TLongObjectProcedure;
58 import gnu.trove.procedure.TObjectProcedure;
59 import gnu.trove.set.hash.TIntHashSet;
61 public final class ClusterTable implements IClusterTable {
63 private static final boolean VALIDATE_SIZE = false;
65 int maximumBytes = 128 * 1024 * 1024;
66 int limit = (int)(0.8*(double)maximumBytes);
70 final private SessionImplSocket sessionImpl;
71 final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
72 final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
73 final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
74 final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
75 final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
76 final private boolean[] virtuals = new boolean[ARRAY_SIZE];
77 static class ImportanceEntry implements CollectorCluster {
78 final public long importance;
79 final public long clusterId;
80 public ImportanceEntry(ClusterImpl impl) {
81 this(impl.getImportance(), impl.getClusterId());
84 public ImportanceEntry(long importance, long clusterId) {
85 this.importance = importance;
86 this.clusterId = clusterId;
89 public long getImportance() {
93 public long getClusterId() {
97 private class Clusters {
98 // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
99 final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
100 //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
104 private void clear() {
106 // clusterU2I.clear();
107 hashMap.put(0, null); // reserved for null value
108 // clusterU2I.put(ClusterUID.make(0, 0), null);
111 return hashMap.size();
113 private ClusterImpl getClusterByClusterId(long clusterId) {
114 return hashMap.get(clusterId);
116 private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
117 return getClusterByClusterId(clusterUID.second);
118 //return clusterU2I.get(clusterUID);
120 private ClusterImpl getClusterByClusterUID(long id1, long id2) {
121 return getClusterByClusterId(id2);
123 private ClusterImpl makeProxy(long clusterId) {
124 return makeProxy(ClusterUID.make(0, clusterId));
126 private ClusterImpl makeProxy(ClusterUID clusterUID) {
127 ClusterImpl proxy = hashMap.get(clusterUID.second);
130 int clusterKey = hashMap.size();
131 ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
132 if (sentinel.clusterId != sentinel.clusterUID.second)
133 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
137 private void replace(ClusterImpl proxy) {
138 ClusterImpl old = clusterArray[proxy.clusterKey];
141 if (old.clusterKey != proxy.clusterKey)
142 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
143 if (old.clusterId != proxy.clusterId)
144 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
145 if (!old.clusterUID.equals(proxy.clusterUID))
146 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
149 private ClusterSmall freeProxy(ClusterImpl proxy) {
150 ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
151 if (null == clusterImpl)
152 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
153 // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
154 // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
155 // if (clusterImpl != clusterImpl2)
156 // throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
157 if (proxy.clusterId != clusterImpl.clusterId)
158 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
159 if (proxy.clusterKey != clusterImpl.clusterKey)
160 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
161 ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
162 return (ClusterSmall)create(sentinel);
164 private ClusterImpl create(ClusterImpl clusterImpl) {
165 hashMap.put(clusterImpl.clusterId, clusterImpl);
166 // clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
167 clusterArray[clusterImpl.clusterKey] = clusterImpl;
170 private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
171 hashMap.forEachValue(procedure);
173 private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
174 hashMap.forEachEntry(procedure);
176 private ClusterUID makeClusterUID(long clusterId) {
177 return ClusterUID.make(0, clusterId);
179 private void removeProxy(ClusterImpl proxy) {
180 hashMap.remove(proxy.clusterId);
181 clusterArray[proxy.clusterKey] = null;
184 final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
186 private ClusterCollectorPolicy collectorPolicy;
187 private boolean dirtySizeInBytes = true;
188 private long sizeInBytes = 0;
189 private final Clusters clusters;
190 ClusterUID makeClusterUID(long clusterId) {
191 return clusters.makeClusterUID(clusterId);
193 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
194 int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
195 return clusterArray[clusterKey].clusterUID;
197 ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
198 this.sessionImpl = sessionImpl;
199 clusters = new Clusters();
204 importanceMap.clear();
207 public void setCollectorPolicy(ClusterCollectorPolicy policy) {
208 this.collectorPolicy = policy;
212 * Sets Cluster Table allocation size by percentage of maximum usable
213 * memory. Allocation is limited between 5% and 50% of maximum memory.
217 private void setAllocateSize(double percentage) {
219 percentage = Math.max(percentage, 0.05);
220 percentage = Math.min(percentage, 0.5);
222 maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
225 private double estimateProperAllocation(ClusterCollectorSupport support) {
226 long max = Runtime.getRuntime().maxMemory();
227 long free = Runtime.getRuntime().freeMemory();
228 long total = Runtime.getRuntime().totalMemory();
229 long realFree = max - total + free; // amount of free memory
230 long current = support.getCurrentSize(); // currently allocated cache
231 long inUseTotal = max - realFree; // currently used memory
232 long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
233 double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
234 double aimFree = 0.2; // percentage of maximum heap that we try to keep free
235 double estimate = 1.0 - otherUsePercentage - aimFree;
236 estimate = Math.min(estimate, 0.5);
237 estimate = Math.max(estimate, 0.05);
238 // System.out.println("Estimated allocation percentage " + estimate +
239 // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
240 // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
244 void checkCollect() {
248 synchronized ClusterImpl getClusterByClusterId(long clusterId) {
249 return clusters.getClusterByClusterId(clusterId);
252 ClusterBase getClusterByClusterKey(int clusterKey) {
253 return clusterArray[clusterKey];
256 synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
257 if (clusterUID.second != clusterId)
258 throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
259 return clusters.makeProxy(clusterUID);
262 synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
264 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
267 ClusterUID clusterUID = ClusterUID.make(0, clusterId);
268 int clusterKey = clusters.size(); // new key
270 proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
271 writeOnlyClusters.add(proxy);
272 return clusters.create(proxy);
274 ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
275 clusters.create(cluster);
276 if (!cluster.isLoaded())
277 Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
278 importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
279 if(collectorPolicy != null) collectorPolicy.added(cluster);
284 synchronized void replaceCluster(ClusterI cluster) {
286 int clusterKey = cluster.getClusterKey();
287 ClusterI existing = clusterArray[clusterKey];
288 if (existing.hasVirtual())
289 cluster.markVirtual();
291 importanceMap.remove(existing.getImportance());
292 if (collectorPolicy != null)
293 collectorPolicy.removed((ClusterImpl)existing);
295 clusters.replace((ClusterImpl)cluster);
296 if (!cluster.isLoaded())
297 Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
299 importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
300 if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
302 if (!dirtySizeInBytes) {
303 if (existing != cluster) {
304 adjustCachedSize(-existing.getCachedSize(), existing);
306 // This will update sizeInBytes through adjustCachedSize
307 cluster.getCachedSize();
312 synchronized void release(CollectorCluster cluster) {
313 importanceMap.remove(cluster.getImportance());
314 release(cluster.getClusterId());
317 synchronized void release(long clusterId) {
318 //System.out.println("ClusterTable.release(" + clusterId + "): " + sizeInBytes);
320 ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
321 if (null == clusterImpl)
323 if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
325 clusters.freeProxy(clusterImpl);
326 importanceMap.remove(clusterImpl.getImportance());
327 if (collectorPolicy != null)
328 collectorPolicy.removed(clusterImpl);
329 if (sessionImpl.writeState != null)
330 sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
331 if (!dirtySizeInBytes) {
332 adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl);
336 synchronized void compact(long id) {
337 ClusterI impl = clusters.getClusterByClusterId(id);
343 // cachedSize = getSizeInBytes();
346 double getLoadProbability() {
347 // This can currently cause stack overflow
349 // if(cachedSize < SLOW_LIMIT) return 1.0;
350 // if(cachedSize > HIGH_LIMIT) return 1e-2;
352 // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
354 // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
358 class SizeProcedure implements TObjectProcedure<ClusterImpl> {
359 public long result = 0;
362 public boolean execute(ClusterImpl cluster) {
363 if (cluster != null) {
365 if (cluster.isLoaded() && !cluster.isEmpty()) {
366 result += cluster.getCachedSize();
368 } catch (Throwable t) {
369 Logger.defaultLogError(t);
375 public void clear() {
381 private SizeProcedure sizeProcedure = new SizeProcedure();
382 long getSizeInBytes() {
383 if (dirtySizeInBytes) {
384 sizeProcedure.clear();
385 clusters.forEachValue(sizeProcedure);
386 sizeInBytes = sizeProcedure.result;
387 // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
388 setDirtySizeInBytes(false);
393 public void setDirtySizeInBytes(boolean value) {
394 dirtySizeInBytes = value;
397 ClusterStateImpl getState() {
398 final ClusterStateImpl result = new ClusterStateImpl();
399 clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
401 public boolean execute(long arg0, ClusterImpl arg1) {
404 if (arg1.isLoaded() && !arg1.isEmpty()) {
405 result.ids.add(new ImportanceEntry(arg1));
413 void restoreState(ClusterStateImpl state) {
414 ClusterStateImpl current = getState();
415 for (CollectorCluster id : current.ids)
416 if (!state.ids.contains(id))
417 collectorSupport.release(id);
420 class ClusterCollectorImpl implements ClusterCollector {
422 final private ClusterCollectorSupport support;
423 private ClusterCollectorPolicy policy;
424 ClusterCollectorImpl(ClusterCollectorSupport support) {
425 this.support = support;
428 ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
430 ClusterCollectorPolicy oldPolicy = policy;
434 for (CollectorCluster id : support.getResidentClusters()) {
435 policy.added(getClusterByClusterId(id.getClusterId()));
439 support.setPolicy(policy);
446 public void collect() {
450 release(policy.select());
454 int size = support.getCurrentSize();
455 boolean dynamicAllocation = useDynamicAllocation();
456 if (dynamicAllocation)
457 setAllocateSize(estimateProperAllocation(support));
458 if (DebugPolicy.CLUSTER_COLLECTION) {
459 System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
461 if (dynamicAllocation) {
462 int collectSize = maximumBytes / 2;
463 collectSize = Math.min(collectSize, 32 * 1024 * 1024);
464 // try to keep allocated clusters below the maximum
465 if (maximumBytes - size > collectSize)
467 collectSize += size - maximumBytes;
468 collect(collectSize);
470 // try to keep allocated clusters below the maximum
471 if (size < maximumBytes)
481 private boolean useDynamicAllocation() {
482 return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
486 public void collect(int target) {
490 release(policy.select(target));
494 ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
496 for (CollectorCluster cluster : support.getResidentClusters()) {
497 target -= support.getClusterSize(cluster);
499 toRelease.add(cluster);
507 if (DebugPolicy.CLUSTER_COLLECTION) {
508 System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
515 void release(Collection<CollectorCluster> toRelease) {
516 for (CollectorCluster id : toRelease) {
523 private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
524 ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
530 private long newResourceClusterId = Constants.NewClusterId;
531 public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
534 * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
535 * vain jo k�yt�ss� olevia klustereita
538 ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
539 throws DatabaseException {
540 ClusterImpl result = null;
541 if (Constants.NewClusterId == newResourceClusterId) {
542 newResourceClusterId = graphSession.newClusterId();
543 result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
545 ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
546 if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
547 newResourceClusterId = graphSession.newClusterId();
548 cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
552 return ensureLoaded(result);
555 void flushCluster(GraphSession graphSession) {
556 // We seem to disagree about this.
557 // graphSession.newClusterId();
558 newResourceClusterId = Constants.NewClusterId;
561 void writeOnlyInvalidate(ClusterI impl) {
562 writeOnlyInvalidates.add(impl.getClusterKey());
565 void removeWriteOnlyClusters() {
566 for (ClusterI proxy : writeOnlyClusters) {
567 if (!(proxy instanceof ClusterImpl))
568 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
569 clusters.freeProxy((ClusterImpl)proxy);
571 writeOnlyClusters.clear();
572 writeOnlyInvalidates.forEach(new TIntProcedure() {
574 public boolean execute(int clusterKey) {
575 ClusterImpl proxy = clusterArray[clusterKey];
576 ClusterUID clusterUID = proxy.getClusterUID();
577 System.err.println("writeOnlyInvalidate " + clusterUID);
578 clusters.freeProxy(proxy);
582 writeOnlyInvalidates.clear();
585 public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
586 synchronized (this) {
587 return clusters.getClusterByClusterUID(clusterUID);
590 public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
591 return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
593 public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
594 return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
596 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
597 synchronized (this) {
598 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
599 if (null == clusterImpl)
600 clusterImpl = clusters.makeProxy(clusterUID);
604 public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
605 synchronized (this) {
606 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
607 if (null == clusterImpl)
608 clusterImpl = clusters.makeProxy(id2);
612 ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
613 synchronized (this) {
614 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
617 if (proxy.isLoaded())
620 clusterKey = proxy.getClusterKey();
622 if (clusterKey == 0) {
623 proxy = clusters.makeProxy(clusterId);
624 clusterKey = proxy.getClusterKey();
626 ClusterImpl ci = tryLoad(clusterId, clusterKey);
628 throw new ResourceNotFoundException(clusterId);
630 } catch (ClusterDoesNotExistException t) {
631 clusters.removeProxy(proxy);
637 ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
638 synchronized (this) {
639 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
643 return makeCluster(clusterId, writeOnly);
646 ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
647 synchronized (this) {
648 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
650 throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
654 long getClusterIdOrCreate(ClusterUID clusterUID) {
655 return clusterUID.second;
657 final long getClusterIdByResourceKey(final int resourceKey)
658 throws DatabaseException {
659 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
660 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
661 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
662 ClusterI c = clusterArray[clusterKey];
664 throw new RuntimeException("No cluster for key " + resourceKey);
665 return c.getClusterId();
667 final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
668 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
669 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
670 Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
673 ClusterI c = clusterArray[clusterKey];
675 Logger.defaultLogError("No cluster for key " + resourceKey);
678 return c.getClusterId();
683 void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
684 synchronized (this) {
685 session.flushCounter = 0;
686 session.clusterStream.reallyFlush();
687 ClientChangesImpl cs = new ClientChangesImpl(session);
688 if (session.clientChanges == null)
689 session.clientChanges = cs;
690 for (int i=0; i<clusterUID.length; ++i) {
692 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
693 System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
694 ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
695 if (null == oldCluster)
697 if (!oldCluster.isLoaded())
699 int clusterKey = oldCluster.getClusterKey();
700 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
702 boolean big = oldCluster instanceof ClusterBig;
703 boolean small = oldCluster instanceof ClusterSmall;
706 // ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
707 Database.Session dbSession = sessionImpl.graphSession.dbSession;
709 ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
712 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
713 ClusterSupport support = sessionImpl.clusterTranslator;
715 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
716 } catch (DatabaseException e) {
723 if (null == newCluster)
725 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
726 System.err.println("cluster=" + newCluster + " updated.");
727 importanceMap.remove(oldCluster.getImportance());
728 if (collectorPolicy != null)
729 collectorPolicy.removed(oldCluster);
730 clusters.replace(newCluster);
731 if (!newCluster.isLoaded())
732 Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
733 importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
734 if (collectorPolicy != null)
735 collectorPolicy.added(newCluster);
736 // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
737 refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
738 } catch (Throwable t) {
739 Logger.defaultLogError("Failed to load cluster in refresh.", t);
742 // Fake update of cluster changes.
743 QueryProcessor queryProcessor = session.getQueryProvider2();
744 WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
745 TaskHelper th = null;
746 if (null == session.writeState) {
747 th = new TaskHelper("Refresh");
748 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
750 session.getQueryProvider2().performDirtyUpdates(writer);
751 session.fireMetadataListeners(writer, cs);
752 session.getQueryProvider2().performScheduledUpdates(writer);
753 session.fireReactionsToSynchronize(cs);
754 session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
755 session.printDiagnostics();
758 session.writeState = null;
764 final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
765 throws DatabaseException {
766 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
767 System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
768 // get cluster change sets
769 QueryProcessor queryProcessor = session.getQueryProvider2();
772 cc = session.graphSession.getClusterChanges(clusterUID, csid);
773 } catch (Exception e) {
774 Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
778 for (int i=0; i<cc.getResourceIndex().length; ++i) {
779 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
780 ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
781 ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
782 if (null == pCluster)
784 int pClusterKey = pCluster.getClusterKey();
785 int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
786 queryProcessor.updateStatements(resource, predicate);
787 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
788 System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
790 for (int i=0; i<cc.getValueIndex().length; ++i) {
791 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
792 queryProcessor.updateValue(resource);
793 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
794 System.err.println("value " + cc.getValueIndex()[i] + " changed.");
797 final void refreshImportance(ClusterImpl c) {
802 importanceMap.remove(c.getImportance());
803 if(collectorPolicy != null) collectorPolicy.removed(c);
805 long newImportance = timeCounter();
806 // System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
807 c.setImportance(newImportance);
809 Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
811 importanceMap.put(c.getImportance(), new ImportanceEntry(c));
812 if(collectorPolicy != null) collectorPolicy.added(c);
816 static long loadTime = 0;
818 @SuppressWarnings("unchecked")
819 public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
820 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
821 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
822 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
823 ClusterI cluster = clusterArray[clusterKey];
825 throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
829 TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
830 int clusterLoadCounter = 0;
832 private <T extends ClusterI> T ensureLoaded(T c) {
834 ClusterImpl cs = (ClusterImpl) c;
836 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
837 long start = System.nanoTime();
838 cluster = load2(cs.getClusterId(), cs.getClusterKey());
839 long load = System.nanoTime()-start;
841 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
842 int was = clusterLoadHistogram.get(cluster.getClusterId());
843 clusterLoadHistogram.put(cluster.getClusterId(), was+1);
844 clusterLoadCounter++;
845 String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
846 if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
847 new Exception(text).printStackTrace();
849 System.err.println(text);
853 cluster = load2(cs.getClusterId(), cs.getClusterKey());
855 } catch (DatabaseException e) {
856 Logger.defaultLogError(e);
857 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
859 String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
860 // TODO: this jams the system => needs refactoring.
861 throw new RuntimeDatabaseException(msg, e);
866 @SuppressWarnings("unchecked")
867 public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
868 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
869 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
870 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
871 ClusterI c = clusterArray[clusterKey];
875 if ((counter++ & 4095) == 0)
876 refreshImportance((ClusterImpl) c);
879 if (!(c instanceof ClusterSmall)) {
880 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
883 return ensureLoaded((T)c);
886 @SuppressWarnings("unchecked")
887 final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
888 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
889 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
890 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
891 ClusterI c = clusterArray[clusterKey];
893 throw new RuntimeException("No cluster for resource key " + resourceKey);
895 if ((counter++ & 4095) == 0)
896 refreshImportance((ClusterImpl) c);
899 if (!(c instanceof ClusterSmall)) {
900 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
904 ClusterSmall cs = (ClusterSmall) c;
906 // System.err.println("Load2 " + resourceKey);
907 cluster = load2(cs.getClusterId(), cs.getClusterKey());
908 } catch (DatabaseException e) {
909 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
911 int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
912 long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
913 String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
914 Logger.defaultLogError(msg, e);
915 c.setDeleted(true, null);
921 void printDebugInfo(ClusterSupport support)
922 throws DatabaseException {
923 final int SIZE = clusters.size();
925 for (int i = 1; i < SIZE; ++i) {
926 ClusterI c = clusterArray[i];
927 c.getNumberOfResources(support);
928 long size = c.getUsedSpace();
929 System.out.println("cluster=" + c.getClusterId() + " size=" + size);
930 c.printDebugInfo("koss: ", support);
933 System.out.println("Total number of clusters " + SIZE);
934 System.out.println("Total cluster size " + sum);
937 Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
939 public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
940 ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
946 // curr = (ClusterImpl) clusterArray[clusterKey];
947 // if (curr.isLoaded())
950 final Semaphore s = new Semaphore(0);
951 final DatabaseException[] ex = new DatabaseException[1];
953 load2(clusterId, clusterKey, e -> {
959 } catch (InterruptedException e) {
960 Logger.defaultLogError(e);
964 if (Development.DEVELOPMENT) {
965 if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
967 ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
968 if (loaded instanceof ClusterSmall)
969 ((ClusterSmall) loaded).check();
970 else if (loaded instanceof ClusterBig)
971 ((ClusterBig) loaded).check();
972 } catch (Throwable t) {
978 validate(clusterKey);
980 return (ClusterImpl) clusterArray[clusterKey];
983 void validate(int clusterKey) {
985 // if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
987 // ClusterImpl server = (ClusterImpl)clusterArray[clusterKey];
988 // String sd = server.dump(sessionImpl.clusterTranslator);
989 // GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
990 // ClusterImpl memory = gci.clusters.get(server.clusterUID);
991 // if(memory == null)
992 // System.err.println("ad");
993 // String md = memory.dump(gci.support);
995 // if(!sd.equals(md)) {
998 // int minLength = Math.min(sd.length(), md.length());
999 // for (int i = 0; i < minLength; i++) {
1000 // if (sd.charAt(i) != md.charAt(i)) {
1006 // int start = Math.max(diffPos-200, 0);
1007 // int end = Math.min(minLength-1, diffPos+200);
1009 // System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");
1010 // System.err.println("== SESSION == ");
1011 // System.err.println(sd.substring(start, end));
1012 // System.err.println("== MEM == ");
1013 // System.err.println(md.substring(start, end));
1014 // System.err.println("== CLUSTER DIFFERENCE ENDS == ");
1016 // throw new IllegalStateException();
1022 public synchronized ClusterImpl getPrefetched() {
1023 synchronized(prefetch) {
1028 public synchronized void load2(long clusterId, int clusterKey, final Consumer<DatabaseException> runnable) {
1030 assert (Constants.ReservedClusterId != clusterId);
1032 ClusterImpl cluster = null;
1033 DatabaseException e = null;
1037 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1038 // cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
1039 Database.Session session = sessionImpl.graphSession.dbSession;
1040 // cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
1041 // cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
1043 // cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
1045 cluster = session.clone(clusterUID, new ClusterCreator() {
1048 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1049 ClusterSupport support = sessionImpl.clusterTranslator;
1051 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1052 } catch (DatabaseException e) {
1053 e.printStackTrace();
1060 } catch (Throwable t) {
1061 // It's totally legal to call for non-existing cluster.
1062 // Sometimes this indicates an error, though.
1063 Logger.getDefault().logInfo("Load cluster failed.", t);
1064 if (t instanceof DatabaseException)
1065 e = (DatabaseException) t;
1067 e = new DatabaseException("Load cluster failed.", t);
1069 if (null == cluster) {
1074 // new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
1076 // Can not be called with null argument.
1077 replaceCluster(cluster);
1078 sessionImpl.onClusterLoaded(clusterId);
1079 runnable.accept(null);
1083 public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
1085 assert (Constants.ReservedClusterId != clusterId);
1087 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1089 Database.Session session = sessionImpl.graphSession.dbSession;
1091 ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
1094 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1095 ClusterSupport support = sessionImpl.clusterTranslator;
1097 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1098 } catch (DatabaseException e) {
1099 e.printStackTrace();
1105 if (null == clusterArray[clusterKey])
1106 clusterArray[clusterKey] = cluster;
1108 replaceCluster(cluster);
1109 sessionImpl.onClusterLoaded(clusterId);
1111 validate(clusterKey);
1116 public Collection<ClusterI> getClusters() {
1117 ArrayList<ClusterI> result = new ArrayList<ClusterI>();
1118 for (int i = 0; i < clusterArray.length; i++) {
1119 ClusterI cluster = clusterArray[i];
1120 if (cluster != null)
1121 result.add(cluster);
1127 return clusters.size();
1130 public long timeCounter() {
1131 return timeCounter++;
1134 public boolean hasVirtual(int index) {
1135 return virtuals[index];
1138 public void markVirtual(int index) {
1139 virtuals[index] = true;
1142 public ClusterImpl[] getClusterArray() {
1143 return clusterArray;
1146 public boolean isImmutable(int id) {
1148 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
1149 Boolean exist = immutables[clusterKey];
1150 if(exist != null) return exist;
1152 ClusterI cluster = getClusterByResourceKey(id);
1153 if(cluster == null) {
1156 boolean result = cluster.getImmutable();
1157 markImmutable(cluster, result);
1163 public void markImmutable(ClusterI cluster, boolean value) {
1164 immutables[cluster.getClusterKey()] = value;
1167 public int getClusterKeyByUID(long first, long second) throws DatabaseException {
1168 return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
1171 public void adjustCachedSize(long l, ClusterI cluster) {
1173 //System.out.println("ClusterTable: adjusting cluster table cached size by " + l + ": " + sizeInBytes + " -> "
1174 // + (sizeInBytes + l) + ", for cluster " + cluster.getClusterId() + " (" + cluster + ")");
1179 private void validateSize() {
1183 // System.out.println("validating cached cluster sizes: " + sizeInBytes + ", hashMap.size="
1184 // + clusters.hashMap.size() + ", importanceMap.size=" + importanceMap.size());
1186 int i = clusterArray.length;
1188 for (int j = 0; j < i; ++j) {
1189 ClusterI c = clusterArray[j];
1192 size += c.getCachedSize();
1194 if (sizeInBytes != size) {
1195 if (!dirtySizeInBytes)
1196 System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1197 //else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1200 int ims = importanceMap.size();
1202 clusters.hashMap.forEachEntry((cid, c) -> {
1203 if (c != null && !c.isWriteOnly() && !c.isEmpty() && c.isLoaded()) {
1204 //System.out.println(cid + ": " + c);
1209 if (Math.abs(ims-hms[0]) > 0) {
1210 System.out.println("BUG2: hashmap and importanceMap sizes differ: " + hms[0] + " != " + ims + ", delta=" + (hms[0] - ims));