1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashMap;
19 import java.util.TreeMap;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.function.Consumer;
24 import org.simantics.databoard.Bindings;
25 import org.simantics.db.ClusterCreator;
26 import org.simantics.db.Database;
27 import org.simantics.db.Database.Session.ClusterChanges;
28 import org.simantics.db.DevelopmentKeys;
29 import org.simantics.db.SessionVariables;
30 import org.simantics.db.exception.ClusterDoesNotExistException;
31 import org.simantics.db.exception.DatabaseException;
32 import org.simantics.db.exception.ResourceNotFoundException;
33 import org.simantics.db.exception.RuntimeDatabaseException;
34 import org.simantics.db.impl.ClusterBase;
35 import org.simantics.db.impl.ClusterI;
36 import org.simantics.db.impl.ClusterSupport;
37 import org.simantics.db.impl.ClusterTraitsBase;
38 import org.simantics.db.impl.IClusterTable;
39 import org.simantics.db.impl.graph.WriteGraphImpl;
40 import org.simantics.db.impl.query.QueryProcessor;
41 import org.simantics.db.procore.cluster.ClusterBig;
42 import org.simantics.db.procore.cluster.ClusterImpl;
43 import org.simantics.db.procore.cluster.ClusterSmall;
44 import org.simantics.db.procore.cluster.ClusterTraits;
45 import org.simantics.db.procore.protocol.Constants;
46 import org.simantics.db.service.ClusterCollectorPolicy;
47 import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
48 import org.simantics.db.service.ClusterUID;
49 import org.simantics.utils.Development;
50 import org.slf4j.LoggerFactory;
52 import fi.vtt.simantics.procore.DebugPolicy;
53 import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
54 import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
55 import gnu.trove.map.hash.TLongIntHashMap;
56 import gnu.trove.map.hash.TLongObjectHashMap;
57 import gnu.trove.procedure.TIntProcedure;
58 import gnu.trove.procedure.TLongObjectProcedure;
59 import gnu.trove.procedure.TObjectProcedure;
60 import gnu.trove.set.hash.TIntHashSet;
62 public final class ClusterTable implements IClusterTable {
64 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ClusterTable.class);
66 private static final boolean VALIDATE_SIZE = false;
68 int maximumBytes = 128 * 1024 * 1024;
69 int limit = (int)(0.8*(double)maximumBytes);
71 private final AtomicLong timeCounter = new AtomicLong(1);
73 final private SessionImplSocket sessionImpl;
74 final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
75 final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
76 final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
77 final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
78 final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
79 final private boolean[] virtuals = new boolean[ARRAY_SIZE];
80 static class ImportanceEntry implements CollectorCluster {
81 final public long importance;
82 final public long clusterId;
83 public ImportanceEntry(ClusterImpl impl) {
84 this(impl.getImportance(), impl.getClusterId());
87 public ImportanceEntry(long importance, long clusterId) {
88 this.importance = importance;
89 this.clusterId = clusterId;
92 public long getImportance() {
96 public long getClusterId() {
100 public String toString() {
101 return "CID " + clusterId;
104 private class Clusters {
105 // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
106 final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
107 //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
111 private void clear() {
113 // clusterU2I.clear();
114 hashMap.put(0, null); // reserved for null value
115 // clusterU2I.put(ClusterUID.make(0, 0), null);
118 return hashMap.size();
120 private ClusterImpl getClusterByClusterId(long clusterId) {
121 return hashMap.get(clusterId);
123 private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
124 return getClusterByClusterId(clusterUID.second);
125 //return clusterU2I.get(clusterUID);
127 private ClusterImpl getClusterByClusterUID(long id1, long id2) {
128 return getClusterByClusterId(id2);
130 private ClusterImpl makeProxy(long clusterId) {
131 return makeProxy(ClusterUID.make(0, clusterId));
133 private ClusterImpl makeProxy(ClusterUID clusterUID) {
134 ClusterImpl proxy = hashMap.get(clusterUID.second);
137 int clusterKey = hashMap.size();
138 ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
139 if (sentinel.clusterId != sentinel.clusterUID.second)
140 throw new RuntimeDatabaseException("ClusterTable corrupted.");
144 private void replace(ClusterImpl proxy) {
145 ClusterImpl old = clusterArray[proxy.clusterKey];
148 if (old.clusterKey != proxy.clusterKey)
149 throw new RuntimeDatabaseException("ClusterTable corrupted.");
150 if (old.clusterId != proxy.clusterId)
151 throw new RuntimeDatabaseException("ClusterTable corrupted.");
152 if (!old.clusterUID.equals(proxy.clusterUID))
153 throw new RuntimeDatabaseException("ClusterTable corrupted.");
156 private ClusterSmall freeProxy(ClusterImpl proxy) {
157 ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
158 if (null == clusterImpl)
159 throw new RuntimeDatabaseException("ClusterTable corrupted.");
160 // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
161 // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
162 // if (clusterImpl != clusterImpl2)
163 // throw new RuntimeDatabaseException("ClusterTable corrupted.");
164 if (proxy.clusterId != clusterImpl.clusterId)
165 throw new RuntimeDatabaseException("ClusterTable corrupted.");
166 if (proxy.clusterKey != clusterImpl.clusterKey)
167 throw new RuntimeDatabaseException("ClusterTable corrupted.");
168 ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
169 return (ClusterSmall)create(sentinel);
171 private ClusterImpl create(ClusterImpl clusterImpl) {
172 hashMap.put(clusterImpl.clusterId, clusterImpl);
173 // clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
174 clusterArray[clusterImpl.clusterKey] = clusterImpl;
177 private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
178 hashMap.forEachValue(procedure);
180 private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
181 hashMap.forEachEntry(procedure);
183 private ClusterUID makeClusterUID(long clusterId) {
184 return ClusterUID.make(0, clusterId);
186 private void removeProxy(ClusterImpl proxy) {
187 hashMap.remove(proxy.clusterId);
188 clusterArray[proxy.clusterKey] = null;
191 final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
193 private ClusterCollectorPolicy collectorPolicy;
194 private boolean dirtySizeInBytes = true;
195 private long sizeInBytes = 0;
196 private final Clusters clusters;
197 ClusterUID makeClusterUID(long clusterId) {
198 return clusters.makeClusterUID(clusterId);
200 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
201 int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
202 return clusterArray[clusterKey].clusterUID;
204 ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
205 this.sessionImpl = sessionImpl;
206 clusters = new Clusters();
211 importanceMap.clear();
214 public void setCollectorPolicy(ClusterCollectorPolicy policy) {
215 this.collectorPolicy = policy;
219 * Sets Cluster Table allocation size by percentage of maximum usable
220 * memory. Allocation is limited between 5% and 50% of maximum memory.
224 private void setAllocateSize(double percentage) {
226 percentage = Math.max(percentage, 0.05);
227 percentage = Math.min(percentage, 0.5);
229 maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
232 private double estimateProperAllocation(ClusterCollectorSupport support) {
233 long max = Runtime.getRuntime().maxMemory();
234 long free = Runtime.getRuntime().freeMemory();
235 long total = Runtime.getRuntime().totalMemory();
236 long realFree = max - total + free; // amount of free memory
237 long current = support.getCurrentSize(); // currently allocated cache
238 long inUseTotal = max - realFree; // currently used memory
239 long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
240 double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
241 double aimFree = 0.2; // percentage of maximum heap that we try to keep free
242 double estimate = 1.0 - otherUsePercentage - aimFree;
243 estimate = Math.min(estimate, 0.5);
244 estimate = Math.max(estimate, 0.05);
245 // System.out.println("Estimated allocation percentage " + estimate +
246 // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
247 // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
251 void checkCollect() {
255 synchronized ClusterImpl getClusterByClusterId(long clusterId) {
256 return clusters.getClusterByClusterId(clusterId);
259 ClusterBase getClusterByClusterKey(int clusterKey) {
260 return clusterArray[clusterKey];
263 synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
264 if (clusterUID.second != clusterId)
265 throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
266 return clusters.makeProxy(clusterUID);
269 synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
271 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
274 ClusterUID clusterUID = ClusterUID.make(0, clusterId);
275 int clusterKey = clusters.size(); // new key
277 proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
278 writeOnlyClusters.add(proxy);
279 return clusters.create(proxy);
281 //printMaps("makeCluster");
282 ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
283 clusters.create(cluster);
284 if (!cluster.isLoaded())
285 LOGGER.error("", new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
286 importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
288 validateSize("makeCluster");
289 if(collectorPolicy != null) collectorPolicy.added(cluster);
294 synchronized void replaceCluster(ClusterI cluster_) {
295 ClusterImpl cluster = (ClusterImpl) cluster_;
297 int clusterKey = cluster.getClusterKey();
298 ClusterImpl existing = (ClusterImpl) clusterArray[clusterKey];
299 if (existing.hasVirtual())
300 cluster.markVirtual();
302 if (existing.cc != null) {
303 if (existing.isLoaded()) {
304 // This shall be promoted to actual exception in the future -
305 // for now, minimal changes
306 new Exception("Trying to replace cluster with pending changes " + existing.getClusterUID())
309 // Adopt changes to loaded cluster
310 cluster.cc = existing.cc;
311 cluster.cc.adopt(cluster);
312 cluster.foreignLookup = existing.foreignLookup;
313 cluster.change = existing.change;
317 importanceMap.remove(existing.getImportance());
318 if (collectorPolicy != null)
319 collectorPolicy.removed((ClusterImpl)existing);
321 //System.out.println("ClusterTable.replace(" + existing + " (I=" + existing.getImportance() + ") => " + cluster + " (I=" + cluster.getImportance() + ")");
323 clusters.replace((ClusterImpl)cluster);
324 if (!cluster.isLoaded())
325 LOGGER.error("", new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
327 importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
328 if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
330 if (!dirtySizeInBytes) {
331 if (existing != cluster) {
332 adjustCachedSize(-existing.getCachedSize(), existing);
334 // This will update sizeInBytes through adjustCachedSize
335 cluster.getCachedSize();
338 validateSize("replaceCluster");
341 synchronized void release(CollectorCluster cluster) {
342 importanceMap.remove(cluster.getImportance());
343 release(cluster.getClusterId());
346 synchronized void release(long clusterId) {
347 //System.out.println("ClusterTable.release(" + clusterId + "): " + sizeInBytes);
349 ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
350 if (null == clusterImpl)
352 if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
354 //printMaps("release");
355 clusters.freeProxy(clusterImpl);
356 importanceMap.remove(clusterImpl.getImportance());
357 if (collectorPolicy != null)
358 collectorPolicy.removed(clusterImpl);
359 if (sessionImpl.writeState != null)
360 sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
361 if (!dirtySizeInBytes) {
362 adjustCachedSize(-clusterImpl.getCachedSize(), clusterImpl);
365 validateSize("release");
368 synchronized void compact(long id) {
369 ClusterI impl = clusters.getClusterByClusterId(id);
375 // cachedSize = getSizeInBytes();
378 double getLoadProbability() {
379 // This can currently cause stack overflow
381 // if(cachedSize < SLOW_LIMIT) return 1.0;
382 // if(cachedSize > HIGH_LIMIT) return 1e-2;
384 // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
386 // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
390 class SizeProcedure implements TObjectProcedure<ClusterImpl> {
391 public long result = 0;
394 public boolean execute(ClusterImpl cluster) {
395 if (cluster != null) {
397 if (cluster.isLoaded() && !cluster.isEmpty()) {
398 result += cluster.getCachedSize();
400 } catch (Throwable t) {
401 LOGGER.error("Could not calculate size", t);
407 public void clear() {
413 private SizeProcedure sizeProcedure = new SizeProcedure();
414 long getSizeInBytes() {
415 if (dirtySizeInBytes) {
416 sizeProcedure.clear();
417 clusters.forEachValue(sizeProcedure);
418 sizeInBytes = sizeProcedure.result;
419 // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
420 setDirtySizeInBytes(false);
425 public void setDirtySizeInBytes(boolean value) {
426 dirtySizeInBytes = value;
429 ClusterStateImpl getState() {
430 final ClusterStateImpl result = new ClusterStateImpl();
431 clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
433 public boolean execute(long arg0, ClusterImpl arg1) {
436 if (arg1.isLoaded() && !arg1.isEmpty()) {
437 result.ids.add(new ImportanceEntry(arg1));
445 void restoreState(ClusterStateImpl state) {
446 ClusterStateImpl current = getState();
447 for (CollectorCluster id : current.ids)
448 if (!state.ids.contains(id))
449 collectorSupport.release(id);
452 class ClusterCollectorImpl implements ClusterCollector {
454 final private ClusterCollectorSupport support;
455 private ClusterCollectorPolicy policy;
456 ClusterCollectorImpl(ClusterCollectorSupport support) {
457 this.support = support;
460 ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
462 ClusterCollectorPolicy oldPolicy = policy;
466 for (CollectorCluster id : support.getResidentClusters()) {
467 policy.added(getClusterByClusterId(id.getClusterId()));
471 support.setPolicy(policy);
478 public void collect() {
482 release(policy.select());
486 int size = support.getCurrentSize();
487 boolean dynamicAllocation = useDynamicAllocation();
488 if (dynamicAllocation)
489 setAllocateSize(estimateProperAllocation(support));
490 if (DebugPolicy.CLUSTER_COLLECTION) {
491 System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
493 if (dynamicAllocation) {
494 int collectSize = maximumBytes / 2;
495 collectSize = Math.min(collectSize, 32 * 1024 * 1024);
496 // try to keep allocated clusters below the maximum
497 if (maximumBytes - size > collectSize)
499 collectSize += size - maximumBytes;
500 collect(collectSize);
502 // try to keep allocated clusters below the maximum
503 if (size < maximumBytes)
513 private boolean useDynamicAllocation() {
514 return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
518 public void collect(int target) {
522 release(policy.select(target));
526 ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
528 for (CollectorCluster cluster : support.getResidentClusters()) {
529 toRelease.add(cluster);
530 long clusterSize = support.getClusterSize(cluster);
531 //System.err.println("release cluster with " + (clusterSize/1024) + " kiB - " + cluster);
532 target -= clusterSize;
540 if (DebugPolicy.CLUSTER_COLLECTION) {
541 System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
548 void release(Collection<CollectorCluster> toRelease) {
549 for (CollectorCluster id : toRelease) {
556 private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
557 ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
563 private long newResourceClusterId = Constants.NewClusterId;
564 public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
567 * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
568 * vain jo k�yt�ss� olevia klustereita
571 ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
572 throws DatabaseException {
573 ClusterImpl result = null;
574 if (Constants.NewClusterId == newResourceClusterId) {
575 newResourceClusterId = graphSession.newClusterId();
576 result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
578 ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
579 if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
580 newResourceClusterId = graphSession.newClusterId();
581 cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
585 return ensureLoaded(result);
588 void flushCluster(GraphSession graphSession) {
589 // We seem to disagree about this.
590 // graphSession.newClusterId();
591 newResourceClusterId = Constants.NewClusterId;
594 void writeOnlyInvalidate(ClusterI impl) {
595 writeOnlyInvalidates.add(impl.getClusterKey());
598 void removeWriteOnlyClusters() {
599 for (ClusterI proxy : writeOnlyClusters) {
600 if (!(proxy instanceof ClusterImpl))
601 throw new RuntimeDatabaseException("ClusterTable corrupted.");
602 clusters.freeProxy((ClusterImpl)proxy);
604 writeOnlyClusters.clear();
605 writeOnlyInvalidates.forEach(new TIntProcedure() {
607 public boolean execute(int clusterKey) {
608 ClusterImpl proxy = clusterArray[clusterKey];
609 ClusterUID clusterUID = proxy.getClusterUID();
610 clusters.freeProxy(proxy);
614 writeOnlyInvalidates.clear();
617 public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
618 synchronized (this) {
619 return clusters.getClusterByClusterUID(clusterUID);
622 public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
623 return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
625 public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
626 return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
628 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
629 synchronized (this) {
630 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
631 if (null == clusterImpl)
632 clusterImpl = clusters.makeProxy(clusterUID);
636 public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
637 synchronized (this) {
638 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
639 if (null == clusterImpl)
640 clusterImpl = clusters.makeProxy(id2);
644 ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
645 synchronized (this) {
646 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
649 if (proxy.isLoaded())
652 clusterKey = proxy.getClusterKey();
654 if (clusterKey == 0) {
655 proxy = clusters.makeProxy(clusterId);
656 clusterKey = proxy.getClusterKey();
658 ClusterImpl ci = tryLoad(clusterId, clusterKey);
660 throw new ResourceNotFoundException(clusterId);
662 } catch (ClusterDoesNotExistException t) {
663 clusters.removeProxy(proxy);
669 ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
670 synchronized (this) {
671 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
675 return makeCluster(clusterId, writeOnly);
678 ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
679 synchronized (this) {
680 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
682 throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
686 long getClusterIdOrCreate(ClusterUID clusterUID) {
687 return clusterUID.second;
689 final long getClusterIdByResourceKey(final int resourceKey)
690 throws DatabaseException {
691 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
692 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
693 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
694 ClusterI c = clusterArray[clusterKey];
696 throw new RuntimeException("No cluster for key " + resourceKey);
697 return c.getClusterId();
699 final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
700 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
701 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
702 LOGGER.error("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
705 ClusterI c = clusterArray[clusterKey];
707 LOGGER.error("No cluster for key " + resourceKey);
710 return c.getClusterId();
715 void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
716 synchronized (this) {
717 session.flushCounter = 0;
718 session.clusterStream.reallyFlush();
719 ClientChangesImpl cs = new ClientChangesImpl(session);
720 if (session.clientChanges == null)
721 session.clientChanges = cs;
722 //printMaps("refresh");
723 for (int i=0; i<clusterUID.length; ++i) {
725 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
726 System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
727 ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
728 if (null == oldCluster)
730 if (!oldCluster.isLoaded())
732 int clusterKey = oldCluster.getClusterKey();
733 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
735 boolean big = oldCluster instanceof ClusterBig;
736 boolean small = oldCluster instanceof ClusterSmall;
739 // ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
740 Database.Session dbSession = sessionImpl.graphSession.dbSession;
742 ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
745 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
746 ClusterSupport support = sessionImpl.clusterTranslator;
748 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
749 } catch (DatabaseException e) {
756 if (null == newCluster)
758 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
759 System.err.println("cluster=" + newCluster + " updated.");
760 importanceMap.remove(oldCluster.getImportance());
761 if (collectorPolicy != null)
762 collectorPolicy.removed(oldCluster);
763 clusters.replace(newCluster);
764 if (!newCluster.isLoaded())
765 LOGGER.error("", new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
766 importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
767 if (collectorPolicy != null)
768 collectorPolicy.added(newCluster);
769 // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
770 refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
771 } catch (Throwable t) {
772 LOGGER.error("Failed to load cluster in refresh.", t);
776 validateSize("refresh");
777 // Fake update of cluster changes.
778 QueryProcessor queryProcessor = session.getQueryProvider2();
779 WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
780 TaskHelper th = null;
781 if (null == session.writeState) {
782 th = new TaskHelper("Refresh");
783 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
785 session.getQueryProvider2().propagateChangesInQueryCache(writer);
786 session.fireMetadataListeners(writer, cs);
787 session.getQueryProvider2().listening.fireListeners(writer);
788 session.fireReactionsToSynchronize(cs);
789 session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
790 session.printDiagnostics();
793 session.writeState = null;
799 final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
800 throws DatabaseException {
801 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
802 System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
803 // get cluster change sets
804 QueryProcessor queryProcessor = session.getQueryProvider2();
807 cc = session.graphSession.getClusterChanges(clusterUID, csid);
808 } catch (Exception e) {
809 LOGGER.error("Could not get cluster changes. cluster=" + clusterUID, e);
813 for (int i=0; i<cc.getResourceIndex().length; ++i) {
814 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
815 ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
816 ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
817 if (null == pCluster)
819 int pClusterKey = pCluster.getClusterKey();
820 int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
821 queryProcessor.updateStatements(resource, predicate);
822 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
823 System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
825 for (int i=0; i<cc.getValueIndex().length; ++i) {
826 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
827 queryProcessor.updateValue(resource);
828 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
829 System.err.println("value " + cc.getValueIndex()[i] + " changed.");
832 final synchronized void refreshImportance(ClusterImpl c) {
837 //printMaps("refreshImportance");
839 importanceMap.remove(c.getImportance());
840 if(collectorPolicy != null) collectorPolicy.removed(c);
842 long newImportance = timeCounter();
843 // System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
844 c.setImportance(newImportance);
846 LOGGER.error("", new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
848 importanceMap.put(c.getImportance(), new ImportanceEntry(c));
849 if(collectorPolicy != null) collectorPolicy.added(c);
851 validateSize("refreshImportance");
855 static long loadTime = 0;
857 @SuppressWarnings("unchecked")
858 public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
859 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
860 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
861 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
862 ClusterI cluster = clusterArray[clusterKey];
864 throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
868 TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
869 int clusterLoadCounter = 0;
871 private <T extends ClusterI> T ensureLoaded(T c) {
873 ClusterImpl cs = (ClusterImpl) c;
875 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
876 long start = System.nanoTime();
877 cluster = load2(cs.getClusterId(), cs.getClusterKey());
878 long load = System.nanoTime()-start;
880 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
881 int was = clusterLoadHistogram.get(cluster.getClusterId());
882 clusterLoadHistogram.put(cluster.getClusterId(), was+1);
883 clusterLoadCounter++;
884 String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
885 if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
886 new Exception(text).printStackTrace();
888 System.err.println(text);
892 cluster = load2(cs.getClusterId(), cs.getClusterKey());
894 } catch (DatabaseException e) {
895 LOGGER.error("Could not load cluster", e);
896 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
898 String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
899 // TODO: this jams the system => needs refactoring.
900 throw new RuntimeDatabaseException(msg, e);
905 @SuppressWarnings("unchecked")
906 public synchronized final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
907 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
908 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
909 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
910 ClusterI c = clusterArray[clusterKey];
914 if ((counter++ & 4095) == 0)
915 refreshImportance((ClusterImpl) c);
918 if (!(c instanceof ClusterSmall)) {
919 LOGGER.error("Proxy must be instance of ClusterSmall");
922 return ensureLoaded((T)c);
925 @SuppressWarnings("unchecked")
926 final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
927 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
928 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
929 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
930 ClusterI c = clusterArray[clusterKey];
932 throw new RuntimeException("No cluster for resource key " + resourceKey);
934 if ((counter++ & 4095) == 0)
935 refreshImportance((ClusterImpl) c);
938 if (!(c instanceof ClusterSmall)) {
939 LOGGER.error("Proxy must be instance of ClusterSmall");
943 ClusterSmall cs = (ClusterSmall) c;
945 // System.err.println("Load2 " + resourceKey);
946 cluster = load2(cs.getClusterId(), cs.getClusterKey());
947 } catch (DatabaseException e) {
948 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
950 int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
951 long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
952 String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
953 LOGGER.error(msg, e);
954 c.setDeleted(true, null);
960 void printDebugInfo(ClusterSupport support)
961 throws DatabaseException {
962 final int SIZE = clusters.size();
964 for (int i = 1; i < SIZE; ++i) {
965 ClusterI c = clusterArray[i];
966 c.getNumberOfResources(support);
967 long size = c.getUsedSpace();
968 System.out.println("cluster=" + c.getClusterId() + " size=" + size);
969 c.printDebugInfo("koss: ", support);
972 System.out.println("Total number of clusters " + SIZE);
973 System.out.println("Total cluster size " + sum);
976 Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
978 public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
979 ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
985 // curr = (ClusterImpl) clusterArray[clusterKey];
986 // if (curr.isLoaded())
989 final Semaphore s = new Semaphore(0);
990 final DatabaseException[] ex = new DatabaseException[1];
992 load2(clusterId, clusterKey, e -> {
998 } catch (InterruptedException e) {
999 LOGGER.error("unable to acquire", e);
1003 if (Development.DEVELOPMENT) {
1004 if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
1006 ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
1007 if (loaded instanceof ClusterSmall)
1008 ((ClusterSmall) loaded).check();
1009 else if (loaded instanceof ClusterBig)
1010 ((ClusterBig) loaded).check();
1011 } catch (Throwable t) {
1012 t.printStackTrace();
1017 validate(clusterKey);
1019 return (ClusterImpl) clusterArray[clusterKey];
1022 void validate(int clusterKey) {
1024 // if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
1026 // ClusterImpl server = (ClusterImpl)clusterArray[clusterKey];
1027 // String sd = server.dump(sessionImpl.clusterTranslator);
1028 // GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
1029 // ClusterImpl memory = gci.clusters.get(server.clusterUID);
1030 // if(memory == null)
1031 // System.err.println("ad");
1032 // String md = memory.dump(gci.support);
1034 // if(!sd.equals(md)) {
1037 // int minLength = Math.min(sd.length(), md.length());
1038 // for (int i = 0; i < minLength; i++) {
1039 // if (sd.charAt(i) != md.charAt(i)) {
1045 // int start = Math.max(diffPos-200, 0);
1046 // int end = Math.min(minLength-1, diffPos+200);
1048 // System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");
1049 // System.err.println("== SESSION == ");
1050 // System.err.println(sd.substring(start, end));
1051 // System.err.println("== MEM == ");
1052 // System.err.println(md.substring(start, end));
1053 // System.err.println("== CLUSTER DIFFERENCE ENDS == ");
1055 // throw new IllegalStateException();
1061 public synchronized ClusterImpl getPrefetched() {
1062 synchronized(prefetch) {
1067 public synchronized void load2(long clusterId, int clusterKey, final Consumer<DatabaseException> runnable) {
1069 assert (Constants.ReservedClusterId != clusterId);
1071 ClusterImpl cluster = null;
1072 DatabaseException e = null;
1076 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1077 // cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
1078 Database.Session session = sessionImpl.graphSession.dbSession;
1079 // cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
1080 // cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
1082 // cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
1084 cluster = session.clone(clusterUID, new ClusterCreator() {
1087 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1088 ClusterSupport support = sessionImpl.clusterTranslator;
1090 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1091 } catch (DatabaseException e) {
1092 e.printStackTrace();
1099 } catch (Throwable t) {
1100 // It's totally legal to call for non-existing cluster.
1101 // Sometimes this indicates an error, though.
1102 LOGGER.error("Load cluster failed", t);
1103 if (t instanceof DatabaseException)
1104 e = (DatabaseException) t;
1106 e = new DatabaseException("Load cluster failed.", t);
1108 if (null == cluster) {
1113 // new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
1115 // Can not be called with null argument.
1116 replaceCluster(cluster);
1117 sessionImpl.onClusterLoaded(clusterId);
1118 runnable.accept(null);
1122 public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
1124 assert (Constants.ReservedClusterId != clusterId);
1126 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1128 Database.Session session = sessionImpl.graphSession.dbSession;
1130 ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
1133 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1134 ClusterSupport support = sessionImpl.clusterTranslator;
1136 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1137 } catch (DatabaseException e) {
1138 e.printStackTrace();
1144 if (null == clusterArray[clusterKey])
1145 clusterArray[clusterKey] = cluster;
1147 replaceCluster(cluster);
1148 sessionImpl.onClusterLoaded(clusterId);
1150 validate(clusterKey);
1155 public Collection<ClusterI> getClusters() {
1156 ArrayList<ClusterI> result = new ArrayList<ClusterI>();
1157 for (int i = 0; i < clusterArray.length; i++) {
1158 ClusterI cluster = clusterArray[i];
1159 if (cluster != null)
1160 result.add(cluster);
1166 return clusters.size();
1169 public long timeCounter() {
1170 return timeCounter.getAndIncrement();
1173 public boolean hasVirtual(int index) {
1174 return virtuals[index];
1177 public void markVirtual(int index) {
1178 virtuals[index] = true;
1181 public ClusterImpl[] getClusterArray() {
1182 return clusterArray;
1185 public boolean isImmutable(int id) {
1187 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
1188 Boolean exist = immutables[clusterKey];
1189 if(exist != null) return exist;
1191 ClusterI cluster = getClusterByResourceKey(id);
1192 if(cluster == null) {
1195 boolean result = cluster.getImmutable();
1196 markImmutable(cluster, result);
1202 public void markImmutable(ClusterI cluster, boolean value) {
1203 immutables[cluster.getClusterKey()] = value;
1206 public int getClusterKeyByUID(long first, long second) throws DatabaseException {
1207 return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
1210 public void adjustCachedSize(long l, ClusterI cluster) {
1212 //System.out.println("ClusterTable: adjusting cluster table cached size by " + l + ": " + sizeInBytes + " -> "
1213 // + (sizeInBytes + l) + ", for cluster " + cluster.getClusterId() + " (" + cluster + ")");
1218 private void validateSize(String place) {
1222 int ims = importanceMap.size();
1223 int ihms = countImportantClusters();
1225 // System.out.format("[ClusterTable.%s] Validating: byteSize=%d (%d MB), hashMap=%d/%d, importanceMap=%d%n",
1226 // place, sizeInBytes, sizeInBytes / (1024*1024), ihms, clusters.hashMap.size(), ims);
1228 int i = clusterArray.length;
1230 for (int j = 0; j < i; ++j) {
1231 ClusterI c = clusterArray[j];
1234 size += c.getCachedSize();
1236 if (sizeInBytes != size) {
1237 if (!dirtySizeInBytes)
1238 System.out.println("BUG: CACHED CLUSTER SIZE DIFFERS FROM CALCULATED: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1239 //else System.out.println("\"BUG?\": SIZES DIFFER: " + sizeInBytes + " != " + size + ", delta = " + (sizeInBytes - size));
1242 System.out.println("BUG2: hashmap and importanceMap sizes differ: " + ihms + " != " + ims + ", delta=" + (ihms - ims));
1243 printMaps("validateSize");
1245 //System.out.println("[" + place + "] VALIDATED");
1248 private void printMaps(String place) {
1249 int ihms = countImportantClusters();
1250 System.out.println("## printMaps(" + place + ") - " + importanceMap.size() + " - (" + ihms + "/" + clusters.hashMap.size() + ")");
1251 System.out.println("importanceMap (" + importanceMap.size() + "):");
1252 importanceMap.forEach((importance, ie) -> {
1253 System.out.format("\t%d: %s%n", importance, ie.toString());
1255 System.out.println("clusters.hashMap (" + ihms + "/" + clusters.hashMap.size() + "):");
1256 clusters.hashMap.forEachEntry((cid, c) -> {
1257 boolean important = importantCluster(c);
1258 boolean wo = c != null && c.isWriteOnly();
1259 boolean empty = c != null && c.isEmpty();
1260 boolean loaded = c != null && c.isLoaded();
1261 System.out.format("\t%s: %d - %s (writeOnly=%b, empty=%b, loaded=%b)%n", important ? " I" : "NI", cid, c, wo, empty, loaded);
1266 private int countImportantClusters() {
1267 int[] result = { 0 };
1268 clusters.hashMap.forEachEntry((cid, c) -> {
1269 if (importantCluster(c))
1276 private static boolean importantCluster(ClusterImpl c) {
1277 return c != null && !c.isWriteOnly() && c.isLoaded();// && !c.isEmpty();