1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashMap;
19 import java.util.TreeMap;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.function.Consumer;
24 import org.simantics.databoard.Bindings;
25 import org.simantics.db.ClusterCreator;
26 import org.simantics.db.Database;
27 import org.simantics.db.Database.Session.ClusterChanges;
28 import org.simantics.db.DevelopmentKeys;
29 import org.simantics.db.SessionVariables;
30 import org.simantics.db.common.utils.Logger;
31 import org.simantics.db.exception.ClusterDoesNotExistException;
32 import org.simantics.db.exception.DatabaseException;
33 import org.simantics.db.exception.ResourceNotFoundException;
34 import org.simantics.db.exception.RuntimeDatabaseException;
35 import org.simantics.db.impl.ClusterBase;
36 import org.simantics.db.impl.ClusterI;
37 import org.simantics.db.impl.ClusterSupport;
38 import org.simantics.db.impl.ClusterTraitsBase;
39 import org.simantics.db.impl.IClusterTable;
40 import org.simantics.db.impl.graph.WriteGraphImpl;
41 import org.simantics.db.impl.query.QueryProcessor;
42 import org.simantics.db.procore.cluster.ClusterBig;
43 import org.simantics.db.procore.cluster.ClusterImpl;
44 import org.simantics.db.procore.cluster.ClusterSmall;
45 import org.simantics.db.procore.cluster.ClusterTraits;
46 import org.simantics.db.procore.protocol.Constants;
47 import org.simantics.db.service.ClusterCollectorPolicy;
48 import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
49 import org.simantics.db.service.ClusterUID;
50 import org.simantics.utils.Development;
52 import fi.vtt.simantics.procore.DebugPolicy;
53 import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
54 import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
55 import gnu.trove.map.hash.TLongIntHashMap;
56 import gnu.trove.map.hash.TLongObjectHashMap;
57 import gnu.trove.procedure.TIntProcedure;
58 import gnu.trove.procedure.TLongObjectProcedure;
59 import gnu.trove.procedure.TObjectProcedure;
60 import gnu.trove.set.hash.TIntHashSet;
62 public final class ClusterTable implements IClusterTable {
64 private static final boolean VALIDATE_SIZE = false;
66 int maximumBytes = 128 * 1024 * 1024;
67 int limit = (int)(0.8*(double)maximumBytes);
69 private final AtomicLong timeCounter = new AtomicLong(1);
71 final private SessionImplSocket sessionImpl;
72 final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
73 final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
74 final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
75 final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
76 final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
77 final private boolean[] virtuals = new boolean[ARRAY_SIZE];
78 static class ImportanceEntry implements CollectorCluster {
79 final public long importance;
80 final public long clusterId;
81 public ImportanceEntry(ClusterImpl impl) {
82 this(impl.getImportance(), impl.getClusterId());
85 public ImportanceEntry(long importance, long clusterId) {
86 this.importance = importance;
87 this.clusterId = clusterId;
90 public long getImportance() {
94 public long getClusterId() {
98 public String toString() {
99 return "CID " + clusterId;
102 private class Clusters {
103 // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
104 final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
105 //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
109 private void clear() {
111 // clusterU2I.clear();
112 hashMap.put(0, null); // reserved for null value
113 // clusterU2I.put(ClusterUID.make(0, 0), null);
116 return hashMap.size();
118 private ClusterImpl getClusterByClusterId(long clusterId) {
119 return hashMap.get(clusterId);
121 private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
122 return getClusterByClusterId(clusterUID.second);
123 //return clusterU2I.get(clusterUID);
125 private ClusterImpl getClusterByClusterUID(long id1, long id2) {
126 return getClusterByClusterId(id2);
128 private ClusterImpl makeProxy(long clusterId) {
129 return makeProxy(ClusterUID.make(0, clusterId));
131 private ClusterImpl makeProxy(ClusterUID clusterUID) {
132 ClusterImpl proxy = hashMap.get(clusterUID.second);
135 int clusterKey = hashMap.size();
136 ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
137 if (sentinel.clusterId != sentinel.clusterUID.second)
138 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
142 private void replace(ClusterImpl proxy) {
143 ClusterImpl old = clusterArray[proxy.clusterKey];
146 if (old.clusterKey != proxy.clusterKey)
147 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
148 if (old.clusterId != proxy.clusterId)
149 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
150 if (!old.clusterUID.equals(proxy.clusterUID))
151 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
154 private ClusterSmall freeProxy(ClusterImpl proxy) {
155 ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
156 if (null == clusterImpl)
157 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
158 // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
159 // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
160 // if (clusterImpl != clusterImpl2)
161 // throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
162 if (proxy.clusterId != clusterImpl.clusterId)
163 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
164 if (proxy.clusterKey != clusterImpl.clusterKey)
165 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
166 ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
167 return (ClusterSmall)create(sentinel);
169 private ClusterImpl create(ClusterImpl clusterImpl) {
170 hashMap.put(clusterImpl.clusterId, clusterImpl);
171 // clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
172 clusterArray[clusterImpl.clusterKey] = clusterImpl;
175 private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
176 hashMap.forEachValue(procedure);
178 private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
179 hashMap.forEachEntry(procedure);
181 private ClusterUID makeClusterUID(long clusterId) {
182 return ClusterUID.make(0, clusterId);
184 private void removeProxy(ClusterImpl proxy) {
185 hashMap.remove(proxy.clusterId);
186 clusterArray[proxy.clusterKey] = null;
189 final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
191 private ClusterCollectorPolicy collectorPolicy;
192 private boolean dirtySizeInBytes = true;
193 private long sizeInBytes = 0;
194 private final Clusters clusters;
195 ClusterUID makeClusterUID(long clusterId) {
196 return clusters.makeClusterUID(clusterId);
198 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
199 int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
200 return clusterArray[clusterKey].clusterUID;
202 ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
203 this.sessionImpl = sessionImpl;
204 clusters = new Clusters();
209 importanceMap.clear();
212 public void setCollectorPolicy(ClusterCollectorPolicy policy) {
213 this.collectorPolicy = policy;
217 * Sets Cluster Table allocation size by percentage of maximum usable
218 * memory. Allocation is limited between 5% and 50% of maximum memory.
222 private void setAllocateSize(double percentage) {
224 percentage = Math.max(percentage, 0.05);
225 percentage = Math.min(percentage, 0.5);
227 maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
230 private double estimateProperAllocation(ClusterCollectorSupport support) {
231 long max = Runtime.getRuntime().maxMemory();
232 long free = Runtime.getRuntime().freeMemory();
233 long total = Runtime.getRuntime().totalMemory();
234 long realFree = max - total + free; // amount of free memory
235 long current = support.getCurrentSize(); // currently allocated cache
236 long inUseTotal = max - realFree; // currently used memory
237 long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
238 double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
239 double aimFree = 0.2; // percentage of maximum heap that we try to keep free
240 double estimate = 1.0 - otherUsePercentage - aimFree;
241 estimate = Math.min(estimate, 0.5);
242 estimate = Math.max(estimate, 0.05);
243 // System.out.println("Estimated allocation percentage " + estimate +
244 // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
245 // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
249 void checkCollect() {
253 synchronized ClusterImpl getClusterByClusterId(long clusterId) {
254 return clusters.getClusterByClusterId(clusterId);
257 ClusterBase getClusterByClusterKey(int clusterKey) {
258 return clusterArray[clusterKey];
261 synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
262 if (clusterUID.second != clusterId)
263 throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
264 return clusters.makeProxy(clusterUID);
267 synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
269 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
272 ClusterUID clusterUID = ClusterUID.make(0, clusterId);
273 int clusterKey = clusters.size(); // new key
275 proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
276 writeOnlyClusters.add(proxy);
277 return clusters.create(proxy);
279 //printMaps("makeCluster");
280 ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
281 clusters.create(cluster);
282 if (!cluster.isLoaded())
283 Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
284 importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
286 validateSize("makeCluster");
287 if(collectorPolicy != null) collectorPolicy.added(cluster);
292 synchronized void replaceCluster(ClusterI cluster) {
293 //printMaps("replaceCluster");
295 int clusterKey = cluster.getClusterKey();
296 ClusterI existing = clusterArray[clusterKey];
297 if (existing.hasVirtual())
298 cluster.markVirtual();
300 long existingImportance = existing.getImportance();
301 if (existingImportance != 0)
302 importanceMap.remove(existingImportance);
303 if (collectorPolicy != null)
304 collectorPolicy.removed((ClusterImpl)existing);
306 //System.out.println("ClusterTable.replace(" + existing + " (I=" + existing.getImportance() + ") => " + cluster + " (I=" + cluster.getImportance() + ")");
308 clusters.replace((ClusterImpl)cluster);
309 if (!cluster.isLoaded())
310 Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
312 importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
313 if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
315 if (!dirtySizeInBytes) {
316 if (existing != cluster) {
317 adjustCachedSize(-existing.getCachedSize(), existing);
319 // This will update sizeInBytes through adjustCachedSize
320 cluster.getCachedSize();
323 validateSize("replaceCluster");
326 synchronized void release(CollectorCluster cluster) {
327 importanceMap.remove(cluster.getImportance());
328 release(cluster.getClusterId());
331 synchronized void release(long clusterId) {
332 //System.out.println("ClusterTable.release(" + clusterId + "): " + sizeInBytes);
334 ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
335 if (null == clusterImpl)
337 if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
339 //printMaps("release");
340 clusters.freeProxy(clusterImpl);
341 importanceMap.remove(clusterImpl.getImportance());
342 if (collectorPolicy != null)
343 collectorPolicy.removed(clusterImpl);
344 if (sessionImpl.writeState != null)
345 sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
346 if (!dirtySizeInBytes) {
347 adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl);
350 validateSize("release");
353 synchronized void compact(long id) {
354 ClusterI impl = clusters.getClusterByClusterId(id);
360 // cachedSize = getSizeInBytes();
363 double getLoadProbability() {
364 // This can currently cause stack overflow
366 // if(cachedSize < SLOW_LIMIT) return 1.0;
367 // if(cachedSize > HIGH_LIMIT) return 1e-2;
369 // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
371 // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
375 class SizeProcedure implements TObjectProcedure<ClusterImpl> {
376 public long result = 0;
379 public boolean execute(ClusterImpl cluster) {
380 if (cluster != null) {
382 if (cluster.isLoaded() && !cluster.isEmpty()) {
383 result += cluster.getCachedSize();
385 } catch (Throwable t) {
386 Logger.defaultLogError(t);
392 public void clear() {
398 private SizeProcedure sizeProcedure = new SizeProcedure();
399 long getSizeInBytes() {
400 if (dirtySizeInBytes) {
401 sizeProcedure.clear();
402 clusters.forEachValue(sizeProcedure);
403 sizeInBytes = sizeProcedure.result;
404 // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
405 setDirtySizeInBytes(false);
410 public void setDirtySizeInBytes(boolean value) {
411 dirtySizeInBytes = value;
414 ClusterStateImpl getState() {
415 final ClusterStateImpl result = new ClusterStateImpl();
416 clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
418 public boolean execute(long arg0, ClusterImpl arg1) {
421 if (arg1.isLoaded() && !arg1.isEmpty()) {
422 result.ids.add(new ImportanceEntry(arg1));
430 void restoreState(ClusterStateImpl state) {
431 ClusterStateImpl current = getState();
432 for (CollectorCluster id : current.ids)
433 if (!state.ids.contains(id))
434 collectorSupport.release(id);
437 class ClusterCollectorImpl implements ClusterCollector {
439 final private ClusterCollectorSupport support;
440 private ClusterCollectorPolicy policy;
441 ClusterCollectorImpl(ClusterCollectorSupport support) {
442 this.support = support;
445 ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
447 ClusterCollectorPolicy oldPolicy = policy;
451 for (CollectorCluster id : support.getResidentClusters()) {
452 policy.added(getClusterByClusterId(id.getClusterId()));
456 support.setPolicy(policy);
463 public void collect() {
467 release(policy.select());
471 int size = support.getCurrentSize();
472 boolean dynamicAllocation = useDynamicAllocation();
473 if (dynamicAllocation)
474 setAllocateSize(estimateProperAllocation(support));
475 if (DebugPolicy.CLUSTER_COLLECTION) {
476 System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
478 if (dynamicAllocation) {
479 int collectSize = maximumBytes / 2;
480 collectSize = Math.min(collectSize, 32 * 1024 * 1024);
481 // try to keep allocated clusters below the maximum
482 if (maximumBytes - size > collectSize)
484 collectSize += size - maximumBytes;
485 collect(collectSize);
487 // try to keep allocated clusters below the maximum
488 if (size < maximumBytes)
498 private boolean useDynamicAllocation() {
499 return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
503 public void collect(int target) {
507 release(policy.select(target));
511 ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
513 for (CollectorCluster cluster : support.getResidentClusters()) {
514 target -= support.getClusterSize(cluster);
516 toRelease.add(cluster);
524 if (DebugPolicy.CLUSTER_COLLECTION) {
525 System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
532 void release(Collection<CollectorCluster> toRelease) {
533 for (CollectorCluster id : toRelease) {
540 private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
541 ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
547 private long newResourceClusterId = Constants.NewClusterId;
548 public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
551 * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
552 * vain jo k�yt�ss� olevia klustereita
555 ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
556 throws DatabaseException {
557 ClusterImpl result = null;
558 if (Constants.NewClusterId == newResourceClusterId) {
559 newResourceClusterId = graphSession.newClusterId();
560 result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
562 ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
563 if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
564 newResourceClusterId = graphSession.newClusterId();
565 cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
569 return ensureLoaded(result);
572 void flushCluster(GraphSession graphSession) {
573 // We seem to disagree about this.
574 // graphSession.newClusterId();
575 newResourceClusterId = Constants.NewClusterId;
578 void writeOnlyInvalidate(ClusterI impl) {
579 writeOnlyInvalidates.add(impl.getClusterKey());
582 void removeWriteOnlyClusters() {
583 for (ClusterI proxy : writeOnlyClusters) {
584 if (!(proxy instanceof ClusterImpl))
585 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
586 clusters.freeProxy((ClusterImpl)proxy);
588 writeOnlyClusters.clear();
589 writeOnlyInvalidates.forEach(new TIntProcedure() {
591 public boolean execute(int clusterKey) {
592 ClusterImpl proxy = clusterArray[clusterKey];
593 ClusterUID clusterUID = proxy.getClusterUID();
594 System.err.println("writeOnlyInvalidate " + clusterUID);
595 clusters.freeProxy(proxy);
599 writeOnlyInvalidates.clear();
602 public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
603 synchronized (this) {
604 return clusters.getClusterByClusterUID(clusterUID);
607 public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
608 return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
610 public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
611 return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
613 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
614 synchronized (this) {
615 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
616 if (null == clusterImpl)
617 clusterImpl = clusters.makeProxy(clusterUID);
621 public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
622 synchronized (this) {
623 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
624 if (null == clusterImpl)
625 clusterImpl = clusters.makeProxy(id2);
629 ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
630 synchronized (this) {
631 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
634 if (proxy.isLoaded())
637 clusterKey = proxy.getClusterKey();
639 if (clusterKey == 0) {
640 proxy = clusters.makeProxy(clusterId);
641 clusterKey = proxy.getClusterKey();
643 ClusterImpl ci = tryLoad(clusterId, clusterKey);
645 throw new ResourceNotFoundException(clusterId);
647 } catch (ClusterDoesNotExistException t) {
648 clusters.removeProxy(proxy);
654 ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
655 synchronized (this) {
656 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
660 return makeCluster(clusterId, writeOnly);
663 ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
664 synchronized (this) {
665 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
667 throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
671 long getClusterIdOrCreate(ClusterUID clusterUID) {
672 return clusterUID.second;
674 final long getClusterIdByResourceKey(final int resourceKey)
675 throws DatabaseException {
676 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
677 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
678 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
679 ClusterI c = clusterArray[clusterKey];
681 throw new RuntimeException("No cluster for key " + resourceKey);
682 return c.getClusterId();
684 final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
685 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
686 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
687 Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
690 ClusterI c = clusterArray[clusterKey];
692 Logger.defaultLogError("No cluster for key " + resourceKey);
695 return c.getClusterId();
700 void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
701 synchronized (this) {
702 session.flushCounter = 0;
703 session.clusterStream.reallyFlush();
704 ClientChangesImpl cs = new ClientChangesImpl(session);
705 if (session.clientChanges == null)
706 session.clientChanges = cs;
707 //printMaps("refresh");
708 for (int i=0; i<clusterUID.length; ++i) {
710 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
711 System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
712 ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
713 if (null == oldCluster)
715 if (!oldCluster.isLoaded())
717 int clusterKey = oldCluster.getClusterKey();
718 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
720 boolean big = oldCluster instanceof ClusterBig;
721 boolean small = oldCluster instanceof ClusterSmall;
724 // ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
725 Database.Session dbSession = sessionImpl.graphSession.dbSession;
727 ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
730 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
731 ClusterSupport support = sessionImpl.clusterTranslator;
733 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
734 } catch (DatabaseException e) {
741 if (null == newCluster)
743 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
744 System.err.println("cluster=" + newCluster + " updated.");
745 importanceMap.remove(oldCluster.getImportance());
746 if (collectorPolicy != null)
747 collectorPolicy.removed(oldCluster);
748 clusters.replace(newCluster);
749 if (!newCluster.isLoaded())
750 Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
751 importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
752 if (collectorPolicy != null)
753 collectorPolicy.added(newCluster);
754 // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
755 refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
756 } catch (Throwable t) {
757 Logger.defaultLogError("Failed to load cluster in refresh.", t);
761 validateSize("refresh");
762 // Fake update of cluster changes.
763 QueryProcessor queryProcessor = session.getQueryProvider2();
764 WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
765 TaskHelper th = null;
766 if (null == session.writeState) {
767 th = new TaskHelper("Refresh");
768 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
770 session.getQueryProvider2().performDirtyUpdates(writer);
771 session.fireMetadataListeners(writer, cs);
772 session.getQueryProvider2().performScheduledUpdates(writer);
773 session.fireReactionsToSynchronize(cs);
774 session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
775 session.printDiagnostics();
778 session.writeState = null;
783 final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
784 throws DatabaseException {
785 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
786 System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
787 // get cluster change sets
788 QueryProcessor queryProcessor = session.getQueryProvider2();
791 cc = session.graphSession.getClusterChanges(clusterUID, csid);
792 } catch (Exception e) {
793 Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
797 for (int i=0; i<cc.getResourceIndex().length; ++i) {
798 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
799 ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
800 ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
801 if (null == pCluster)
803 int pClusterKey = pCluster.getClusterKey();
804 int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
805 queryProcessor.updateStatements(resource, predicate);
806 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
807 System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
809 for (int i=0; i<cc.getValueIndex().length; ++i) {
810 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
811 queryProcessor.updateValue(resource);
812 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
813 System.err.println("value " + cc.getValueIndex()[i] + " changed.");
816 final void refreshImportance(ClusterImpl c) {
821 //printMaps("refreshImportance");
823 importanceMap.remove(c.getImportance());
824 if(collectorPolicy != null) collectorPolicy.removed(c);
826 long newImportance = timeCounter();
827 // System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
828 c.setImportance(newImportance);
830 Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
832 importanceMap.put(c.getImportance(), new ImportanceEntry(c));
833 if(collectorPolicy != null) collectorPolicy.added(c);
835 validateSize("refreshImportance");
839 static long loadTime = 0;
841 @SuppressWarnings("unchecked")
842 public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
843 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
844 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
845 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
846 ClusterI cluster = clusterArray[clusterKey];
848 throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
852 TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
853 int clusterLoadCounter = 0;
855 private <T extends ClusterI> T ensureLoaded(T c) {
857 ClusterImpl cs = (ClusterImpl) c;
859 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
860 long start = System.nanoTime();
861 cluster = load2(cs.getClusterId(), cs.getClusterKey());
862 long load = System.nanoTime()-start;
864 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
865 int was = clusterLoadHistogram.get(cluster.getClusterId());
866 clusterLoadHistogram.put(cluster.getClusterId(), was+1);
867 clusterLoadCounter++;
868 String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
869 if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
870 new Exception(text).printStackTrace();
872 System.err.println(text);
876 cluster = load2(cs.getClusterId(), cs.getClusterKey());
878 } catch (DatabaseException e) {
879 Logger.defaultLogError(e);
880 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
882 String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
883 // TODO: this jams the system => needs refactoring.
884 throw new RuntimeDatabaseException(msg, e);
889 @SuppressWarnings("unchecked")
890 public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
891 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
892 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
893 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
894 ClusterI c = clusterArray[clusterKey];
898 if ((counter++ & 4095) == 0)
899 refreshImportance((ClusterImpl) c);
902 if (!(c instanceof ClusterSmall)) {
903 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
906 return ensureLoaded((T)c);
909 @SuppressWarnings("unchecked")
910 final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
911 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
912 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
913 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
914 ClusterI c = clusterArray[clusterKey];
916 throw new RuntimeException("No cluster for resource key " + resourceKey);
918 if ((counter++ & 4095) == 0)
919 refreshImportance((ClusterImpl) c);
922 if (!(c instanceof ClusterSmall)) {
923 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
927 ClusterSmall cs = (ClusterSmall) c;
929 // System.err.println("Load2 " + resourceKey);
930 cluster = load2(cs.getClusterId(), cs.getClusterKey());
931 } catch (DatabaseException e) {
932 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
934 int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
935 long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
936 String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
937 Logger.defaultLogError(msg, e);
938 c.setDeleted(true, null);
944 void printDebugInfo(ClusterSupport support)
945 throws DatabaseException {
946 final int SIZE = clusters.size();
948 for (int i = 1; i < SIZE; ++i) {
949 ClusterI c = clusterArray[i];
950 c.getNumberOfResources(support);
951 long size = c.getUsedSpace();
952 System.out.println("cluster=" + c.getClusterId() + " size=" + size);
953 c.printDebugInfo("koss: ", support);
956 System.out.println("Total number of clusters " + SIZE);
957 System.out.println("Total cluster size " + sum);
960 Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
962 public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
963 ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
969 // curr = (ClusterImpl) clusterArray[clusterKey];
970 // if (curr.isLoaded())
973 final Semaphore s = new Semaphore(0);
974 final DatabaseException[] ex = new DatabaseException[1];
976 load2(clusterId, clusterKey, e -> {
982 } catch (InterruptedException e) {
983 Logger.defaultLogError(e);
987 if (Development.DEVELOPMENT) {
988 if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
990 ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
991 if (loaded instanceof ClusterSmall)
992 ((ClusterSmall) loaded).check();
993 else if (loaded instanceof ClusterBig)
994 ((ClusterBig) loaded).check();
995 } catch (Throwable t) {
1001 validate(clusterKey);
1003 return (ClusterImpl) clusterArray[clusterKey];
1006 void validate(int clusterKey) {
1008 // if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
1010 // ClusterImpl server = (ClusterImpl)clusterArray[clusterKey];
1011 // String sd = server.dump(sessionImpl.clusterTranslator);
1012 // GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
1013 // ClusterImpl memory = gci.clusters.get(server.clusterUID);
1014 // if(memory == null)
1015 // System.err.println("ad");
1016 // String md = memory.dump(gci.support);
1018 // if(!sd.equals(md)) {
1021 // int minLength = Math.min(sd.length(), md.length());
1022 // for (int i = 0; i < minLength; i++) {
1023 // if (sd.charAt(i) != md.charAt(i)) {
1029 // int start = Math.max(diffPos-200, 0);
1030 // int end = Math.min(minLength-1, diffPos+200);
1032 // System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");
1033 // System.err.println("== SESSION == ");
1034 // System.err.println(sd.substring(start, end));
1035 // System.err.println("== MEM == ");
1036 // System.err.println(md.substring(start, end));
1037 // System.err.println("== CLUSTER DIFFERENCE ENDS == ");
1039 // throw new IllegalStateException();
1045 public synchronized ClusterImpl getPrefetched() {
1046 synchronized(prefetch) {
1051 public synchronized void load2(long clusterId, int clusterKey, final Consumer<DatabaseException> runnable) {
1053 assert (Constants.ReservedClusterId != clusterId);
1055 ClusterImpl cluster = null;
1056 DatabaseException e = null;
1060 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1061 // cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
1062 Database.Session session = sessionImpl.graphSession.dbSession;
1063 // cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
1064 // cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
1066 // cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
1068 cluster = session.clone(clusterUID, new ClusterCreator() {
1071 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1072 ClusterSupport support = sessionImpl.clusterTranslator;
1074 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1075 } catch (DatabaseException e) {
1076 e.printStackTrace();
1083 } catch (Throwable t) {
1084 // It's totally legal to call for non-existing cluster.
1085 // Sometimes this indicates an error, though.
1086 Logger.getDefault().logInfo("Load cluster failed.", t);
1087 if (t instanceof DatabaseException)
1088 e = (DatabaseException) t;
1090 e = new DatabaseException("Load cluster failed.", t);
1092 if (null == cluster) {
1097 // new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
1099 // Can not be called with null argument.
1100 replaceCluster(cluster);
1101 sessionImpl.onClusterLoaded(clusterId);
1102 runnable.accept(null);
1106 public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
1108 assert (Constants.ReservedClusterId != clusterId);
1110 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1112 Database.Session session = sessionImpl.graphSession.dbSession;
1114 ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
1117 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1118 ClusterSupport support = sessionImpl.clusterTranslator;
1120 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1121 } catch (DatabaseException e) {
1122 e.printStackTrace();
1128 if (null == clusterArray[clusterKey])
1129 clusterArray[clusterKey] = cluster;
1131 replaceCluster(cluster);
1132 sessionImpl.onClusterLoaded(clusterId);
1134 validate(clusterKey);
1139 public Collection<ClusterI> getClusters() {
1140 ArrayList<ClusterI> result = new ArrayList<ClusterI>();
1141 for (int i = 0; i < clusterArray.length; i++) {
1142 ClusterI cluster = clusterArray[i];
1143 if (cluster != null)
1144 result.add(cluster);
1150 return clusters.size();
1153 public long timeCounter() {
1154 return timeCounter.getAndIncrement();
1157 public boolean hasVirtual(int index) {
1158 return virtuals[index];
1161 public void markVirtual(int index) {
1162 virtuals[index] = true;
1165 public ClusterImpl[] getClusterArray() {
1166 return clusterArray;
1169 public boolean isImmutable(int id) {
1171 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
1172 Boolean exist = immutables[clusterKey];
1173 if(exist != null) return exist;
1175 ClusterI cluster = getClusterByResourceKey(id);
1176 if(cluster == null) {
1179 boolean result = cluster.getImmutable();
1180 markImmutable(cluster, result);
1186 public void markImmutable(ClusterI cluster, boolean value) {
1187 immutables[cluster.getClusterKey()] = value;
1190 public int getClusterKeyByUID(long first, long second) throws DatabaseException {
1191 return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
1194 public void adjustCachedSize(long l, ClusterI cluster) {
1196 //System.out.println("ClusterTable: adjusting cluster table cached size by " + l + ": " + sizeInBytes + " -> "
1197 // + (sizeInBytes + l) + ", for cluster " + cluster.getClusterId() + " (" + cluster + ")");
1202 private void validateSize(String place) {
1206 int ims = importanceMap.size();
1207 int ihms = countImportantClusters();
1209 // System.out.format("[%s] Validating ClusterTable: byteSize=%d, hashMap=%d/%d, importanceMap=%d%n",
1210 // place, sizeInBytes, ihms, clusters.hashMap.size(), ims);
1212 int i = clusterArray.length;
1214 for (int j = 0; j < i; ++j) {
1215 ClusterI c = clusterArray[j];
1218 size += c.getCachedSize();
1220 if (sizeInBytes != size) {
1221 if (!dirtySizeInBytes)
1222 System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1223 //else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1226 System.out.println("BUG2: hashmap and importanceMap sizes differ: " + ihms + " != " + ims + ", delta=" + (ihms - ims));
1227 printMaps("validateSize");
1229 //System.out.println("[" + place + "] VALIDATED");
1232 private void printMaps(String place) {
1233 int ihms = countImportantClusters();
1234 System.out.println("## printMaps(" + place + ") - " + importanceMap.size() + " - (" + ihms + "/" + clusters.hashMap.size() + ")");
1235 System.out.println("importanceMap (" + importanceMap.size() + "):");
1236 importanceMap.forEach((importance, ie) -> {
1237 System.out.format("\t%d: %s%n", importance, ie.toString());
1239 System.out.println("clusters.hashMap (" + ihms + "/" + clusters.hashMap.size() + "):");
1240 clusters.hashMap.forEachEntry((cid, c) -> {
1241 boolean important = importantCluster(c);
1242 boolean wo = c != null && c.isWriteOnly();
1243 boolean empty = c != null && c.isEmpty();
1244 boolean loaded = c != null && c.isLoaded();
1245 System.out.format("\t%s: %d - %s (writeOnly=%b, empty=%b, loaded=%b)%n", important ? " I" : "NI", cid, c, wo, empty, loaded);
1250 private int countImportantClusters() {
1251 int[] result = { 0 };
1252 clusters.hashMap.forEachEntry((cid, c) -> {
1253 if (importantCluster(c))
1260 private static boolean importantCluster(ClusterImpl c) {
1261 return c != null && !c.isWriteOnly() && c.isLoaded();// && !c.isEmpty();