1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.io.File;
\r
15 import java.util.ArrayList;
\r
16 import java.util.Collection;
\r
17 import java.util.HashMap;
\r
18 import java.util.Map;
\r
19 import java.util.TreeMap;
\r
20 import java.util.concurrent.Semaphore;
\r
22 import org.simantics.databoard.Bindings;
\r
23 import org.simantics.db.ClusterCreator;
\r
24 import org.simantics.db.Database;
\r
25 import org.simantics.db.DevelopmentKeys;
\r
26 import org.simantics.db.SessionVariables;
\r
27 import org.simantics.db.Database.Session.ClusterChanges;
\r
28 import org.simantics.db.common.utils.Logger;
\r
29 import org.simantics.db.exception.ClusterDoesNotExistException;
\r
30 import org.simantics.db.exception.DatabaseException;
\r
31 import org.simantics.db.exception.ResourceNotFoundException;
\r
32 import org.simantics.db.exception.RuntimeDatabaseException;
\r
33 import org.simantics.db.impl.ClusterBase;
\r
34 import org.simantics.db.impl.ClusterI;
\r
35 import org.simantics.db.impl.ClusterSupport;
\r
36 import org.simantics.db.impl.ClusterTraitsBase;
\r
37 import org.simantics.db.impl.IClusterTable;
\r
38 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
39 import org.simantics.db.impl.query.QueryProcessor;
\r
40 import org.simantics.db.procore.cluster.ClusterBig;
\r
41 import org.simantics.db.procore.cluster.ClusterImpl;
\r
42 import org.simantics.db.procore.cluster.ClusterSmall;
\r
43 import org.simantics.db.procore.cluster.ClusterTraits;
\r
44 import org.simantics.db.procore.protocol.Constants;
\r
45 import org.simantics.db.service.ClusterCollectorPolicy;
\r
46 import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
\r
47 import org.simantics.db.service.ClusterUID;
\r
48 import org.simantics.utils.Development;
\r
49 import org.simantics.utils.datastructures.Callback;
\r
51 import fi.vtt.simantics.procore.DebugPolicy;
\r
52 import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
\r
53 import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
\r
54 import gnu.trove.map.hash.TLongIntHashMap;
\r
55 import gnu.trove.map.hash.TLongObjectHashMap;
\r
56 import gnu.trove.procedure.TIntProcedure;
\r
57 import gnu.trove.procedure.TLongObjectProcedure;
\r
58 import gnu.trove.procedure.TObjectProcedure;
\r
59 import gnu.trove.set.hash.TIntHashSet;
\r
61 public final class ClusterTable implements IClusterTable {
\r
63 int maximumBytes = 128 * 1024 * 1024;
\r
64 int limit = (int)(0.8*(double)maximumBytes);
\r
66 long timeCounter = 0;
\r
68 final private SessionImplSocket sessionImpl;
\r
69 final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
\r
70 final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
\r
71 final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
\r
72 final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
\r
73 final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
\r
74 final private boolean[] virtuals = new boolean[ARRAY_SIZE];
\r
75 static class ImportanceEntry implements CollectorCluster {
\r
76 final public long importance;
\r
77 final public long clusterId;
\r
78 public ImportanceEntry(ClusterImpl impl) {
\r
79 this(impl.getImportance(), impl.getClusterId());
\r
82 public ImportanceEntry(long importance, long clusterId) {
\r
83 this.importance = importance;
\r
84 this.clusterId = clusterId;
\r
87 public long getImportance() {
\r
91 public long getClusterId() {
\r
95 private class Clusters {
\r
96 // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
\r
97 final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
\r
98 //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
\r
99 private Clusters() {
\r
102 private void clear() {
\r
104 // clusterU2I.clear();
\r
105 hashMap.put(0, null); // reserved for null value
\r
106 // clusterU2I.put(ClusterUID.make(0, 0), null);
\r
108 private int size() {
\r
109 return hashMap.size();
\r
111 private ClusterImpl getClusterByClusterId(long clusterId) {
\r
112 return hashMap.get(clusterId);
\r
114 private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
\r
115 return getClusterByClusterId(clusterUID.second);
\r
116 //return clusterU2I.get(clusterUID);
\r
118 private ClusterImpl getClusterByClusterUID(long id1, long id2) {
\r
119 return getClusterByClusterId(id2);
\r
121 private ClusterImpl makeProxy(long clusterId) {
\r
122 return makeProxy(ClusterUID.make(0, clusterId));
\r
124 private ClusterImpl makeProxy(ClusterUID clusterUID) {
\r
125 ClusterImpl proxy = hashMap.get(clusterUID.second);
\r
128 int clusterKey = hashMap.size();
\r
129 ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
\r
130 if (sentinel.clusterId != sentinel.clusterUID.second)
\r
131 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
135 private void replace(ClusterImpl proxy) {
\r
136 ClusterImpl old = clusterArray[proxy.clusterKey];
\r
139 if (old.clusterKey != proxy.clusterKey)
\r
140 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
141 if (old.clusterId != proxy.clusterId)
\r
142 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
143 if (!old.clusterUID.equals(proxy.clusterUID))
\r
144 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
147 private ClusterSmall freeProxy(ClusterImpl proxy) {
\r
148 ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
\r
149 if (null == clusterImpl)
\r
150 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
151 // ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
\r
152 // ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
\r
153 // if (clusterImpl != clusterImpl2)
\r
154 // throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
155 if (proxy.clusterId != clusterImpl.clusterId)
\r
156 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
157 if (proxy.clusterKey != clusterImpl.clusterKey)
\r
158 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
159 ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
\r
160 return (ClusterSmall)create(sentinel);
\r
162 private ClusterImpl create(ClusterImpl clusterImpl) {
\r
163 hashMap.put(clusterImpl.clusterId, clusterImpl);
\r
164 // clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
\r
165 clusterArray[clusterImpl.clusterKey] = clusterImpl;
\r
166 return clusterImpl;
\r
168 private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
\r
169 hashMap.forEachValue(procedure);
\r
171 private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
\r
172 hashMap.forEachEntry(procedure);
\r
174 private ClusterUID makeClusterUID(long clusterId) {
\r
175 return ClusterUID.make(0, clusterId);
\r
177 private void removeProxy(ClusterImpl proxy) {
\r
178 hashMap.remove(proxy.clusterId);
\r
179 clusterArray[proxy.clusterKey] = null;
\r
182 final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
\r
184 private ClusterCollectorPolicy collectorPolicy;
\r
185 private boolean dirtySizeInBytes = true;
\r
186 private long sizeInBytes = 0;
\r
187 private final Clusters clusters;
\r
188 ClusterUID makeClusterUID(long clusterId) {
\r
189 return clusters.makeClusterUID(clusterId);
\r
191 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
\r
192 int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
\r
193 return clusterArray[clusterKey].clusterUID;
\r
195 ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
\r
196 this.sessionImpl = sessionImpl;
\r
197 clusters = new Clusters();
\r
202 importanceMap.clear();
\r
205 public void setCollectorPolicy(ClusterCollectorPolicy policy) {
\r
206 this.collectorPolicy = policy;
\r
210 * Sets Cluster Table allocation size by percentage of maximum usable
\r
211 * memory. Allocation is limited between 5% and 50% of maximum memory.
\r
213 * @param precentage
\r
215 private void setAllocateSize(double percentage) {
\r
217 percentage = Math.max(percentage, 0.05);
\r
218 percentage = Math.min(percentage, 0.5);
\r
220 maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
\r
223 private double estimateProperAllocation(ClusterCollectorSupport support) {
\r
224 long max = Runtime.getRuntime().maxMemory();
\r
225 long free = Runtime.getRuntime().freeMemory();
\r
226 long total = Runtime.getRuntime().totalMemory();
\r
227 long realFree = max - total + free; // amount of free memory
\r
228 long current = support.getCurrentSize(); // currently allocated cache
\r
229 long inUseTotal = max - realFree; // currently used memory
\r
230 long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
\r
231 double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
\r
232 double aimFree = 0.2; // percentage of maximum heap that we try to keep free
\r
233 double estimate = 1.0 - otherUsePercentage - aimFree;
\r
234 estimate = Math.min(estimate, 0.5);
\r
235 estimate = Math.max(estimate, 0.05);
\r
236 // System.out.println("Estimated allocation percentage " + estimate +
\r
237 // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
\r
238 // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
\r
242 void checkCollect() {
\r
243 collector.collect();
\r
246 synchronized ClusterImpl getClusterByClusterId(long clusterId) {
\r
247 return clusters.getClusterByClusterId(clusterId);
\r
250 ClusterBase getClusterByClusterKey(int clusterKey) {
\r
251 return clusterArray[clusterKey];
\r
254 synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
\r
255 if (clusterUID.second != clusterId)
\r
256 throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
\r
257 return clusters.makeProxy(clusterUID);
\r
260 synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
\r
262 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
\r
265 ClusterUID clusterUID = ClusterUID.make(0, clusterId);
\r
266 int clusterKey = clusters.size(); // new key
\r
268 proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
\r
269 writeOnlyClusters.add(proxy);
\r
270 return clusters.create(proxy);
\r
272 ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
\r
273 clusters.create(cluster);
\r
274 if (!cluster.isLoaded())
\r
275 Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
\r
276 importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
\r
277 if(collectorPolicy != null) collectorPolicy.added(cluster);
\r
282 synchronized void replaceCluster(ClusterI cluster) {
\r
284 int clusterKey = cluster.getClusterKey();
\r
285 ClusterI existing = clusterArray[clusterKey];
\r
286 if (existing.hasVirtual())
\r
287 cluster.markVirtual();
\r
289 importanceMap.remove(existing.getImportance());
\r
290 if (collectorPolicy != null)
\r
291 collectorPolicy.removed((ClusterImpl)existing);
\r
293 clusters.replace((ClusterImpl)cluster);
\r
294 if (!cluster.isLoaded())
\r
295 Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
\r
297 importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
\r
298 if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
\r
300 if (!dirtySizeInBytes && !existing.isLoaded()) {
\r
301 sizeInBytes += cluster.getCachedSize();
\r
306 synchronized void release(CollectorCluster cluster) {
\r
307 importanceMap.remove(cluster.getImportance());
\r
308 release(cluster.getClusterId());
\r
311 synchronized void release(long clusterId) {
\r
312 ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
\r
313 if (null == clusterImpl)
\r
315 if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
\r
317 clusters.freeProxy(clusterImpl);
\r
318 importanceMap.remove(clusterImpl.getImportance());
\r
319 if (collectorPolicy != null)
\r
320 collectorPolicy.removed(clusterImpl);
\r
321 if (sessionImpl.writeState != null)
\r
322 sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
\r
323 if (!dirtySizeInBytes) {
\r
324 sizeInBytes -= clusterImpl.getCachedSize();
\r
328 synchronized void compact(long id) {
\r
329 ClusterI impl = clusters.getClusterByClusterId(id);
\r
334 void updateSize() {
\r
335 // cachedSize = getSizeInBytes();
\r
338 double getLoadProbability() {
\r
339 // This can currently cause stack overflow
\r
341 // if(cachedSize < SLOW_LIMIT) return 1.0;
\r
342 // if(cachedSize > HIGH_LIMIT) return 1e-2;
\r
344 // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
\r
346 // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
\r
350 class SizeProcedure implements TObjectProcedure<ClusterImpl> {
\r
351 public long result = 0;
\r
354 public boolean execute(ClusterImpl cluster) {
\r
355 if (cluster != null) {
\r
357 if (cluster.isLoaded() && !cluster.isEmpty()) {
\r
358 result += cluster.getCachedSize();
\r
360 } catch (Throwable t) {
\r
361 Logger.defaultLogError(t);
\r
367 public void clear() {
\r
373 private SizeProcedure sizeProcedure = new SizeProcedure();
\r
374 long getSizeInBytes() {
\r
375 if (dirtySizeInBytes) {
\r
376 sizeProcedure.clear();
\r
377 clusters.forEachValue(sizeProcedure);
\r
378 sizeInBytes = sizeProcedure.result;
\r
379 // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
\r
380 setDirtySizeInBytes(false);
\r
382 return sizeInBytes;
\r
385 public void setDirtySizeInBytes(boolean value) {
\r
386 dirtySizeInBytes = value;
\r
389 ClusterStateImpl getState() {
\r
390 final ClusterStateImpl result = new ClusterStateImpl();
\r
391 clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
\r
393 public boolean execute(long arg0, ClusterImpl arg1) {
\r
396 if (arg1.isLoaded() && !arg1.isEmpty()) {
\r
397 result.ids.add(new ImportanceEntry(arg1));
\r
405 void restoreState(ClusterStateImpl state) {
\r
406 ClusterStateImpl current = getState();
\r
407 for (CollectorCluster id : current.ids)
\r
408 if (!state.ids.contains(id))
\r
409 collectorSupport.release(id);
\r
412 class ClusterCollectorImpl implements ClusterCollector {
\r
414 final private ClusterCollectorSupport support;
\r
415 private ClusterCollectorPolicy policy;
\r
416 ClusterCollectorImpl(ClusterCollectorSupport support) {
\r
417 this.support = support;
\r
420 ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
\r
422 ClusterCollectorPolicy oldPolicy = policy;
\r
423 policy = newPolicy;
\r
425 if(policy != null) {
\r
426 for (CollectorCluster id : support.getResidentClusters()) {
\r
427 policy.added(getClusterByClusterId(id.getClusterId()));
\r
431 support.setPolicy(policy);
\r
438 public void collect() {
\r
440 if(policy != null) {
\r
442 release(policy.select());
\r
446 int size = support.getCurrentSize();
\r
447 boolean dynamicAllocation = useDynamicAllocation();
\r
448 if (dynamicAllocation)
\r
449 setAllocateSize(estimateProperAllocation(support));
\r
450 if (DebugPolicy.CLUSTER_COLLECTION) {
\r
451 System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
\r
453 if (dynamicAllocation) {
\r
454 int collectSize = maximumBytes / 2;
\r
455 collectSize = Math.min(collectSize, 32 * 1024 * 1024);
\r
456 // try to keep allocated clusters below the maximum
\r
457 if (maximumBytes - size > collectSize)
\r
459 collectSize += size - maximumBytes;
\r
460 collect(collectSize);
\r
462 // try to keep allocated clusters below the maximum
\r
463 if (size < maximumBytes)
\r
466 collect(size-limit);
\r
473 private boolean useDynamicAllocation() {
\r
474 return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
\r
478 public void collect(int target) {
\r
480 if(policy != null) {
\r
482 release(policy.select(target));
\r
486 ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
\r
488 for (CollectorCluster cluster : support.getResidentClusters()) {
\r
489 target -= support.getClusterSize(cluster);
\r
491 toRelease.add(cluster);
\r
497 release(toRelease);
\r
499 if (DebugPolicy.CLUSTER_COLLECTION) {
\r
500 System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
\r
507 void release(Collection<CollectorCluster> toRelease) {
\r
508 for (CollectorCluster id : toRelease) {
\r
509 support.release(id);
\r
515 private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
\r
516 ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
\r
519 collector.collect();
\r
522 private long newResourceClusterId = Constants.NewClusterId;
\r
523 public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
\r
526 * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
\r
527 * vain jo k�yt�ss� olevia klustereita
\r
530 ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
\r
531 throws DatabaseException {
\r
532 if (Constants.NewClusterId == newResourceClusterId) {
\r
533 newResourceClusterId = graphSession.newClusterId();
\r
534 return getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
\r
536 ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
\r
537 if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
\r
538 newResourceClusterId = graphSession.newClusterId();
\r
539 cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
\r
545 void flushCluster(GraphSession graphSession) {
\r
546 // We seem to disagree about this.
\r
547 // graphSession.newClusterId();
\r
548 newResourceClusterId = Constants.NewClusterId;
\r
551 void writeOnlyInvalidate(ClusterI impl) {
\r
552 writeOnlyInvalidates.add(impl.getClusterKey());
\r
555 void removeWriteOnlyClusters() {
\r
556 for (ClusterI proxy : writeOnlyClusters) {
\r
557 if (!(proxy instanceof ClusterImpl))
\r
558 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
\r
559 clusters.freeProxy((ClusterImpl)proxy);
\r
561 writeOnlyClusters.clear();
\r
562 writeOnlyInvalidates.forEach(new TIntProcedure() {
\r
564 public boolean execute(int clusterKey) {
\r
565 ClusterImpl proxy = clusterArray[clusterKey];
\r
566 ClusterUID clusterUID = proxy.getClusterUID();
\r
567 System.err.println("writeOnlyInvalidate " + clusterUID);
\r
568 clusters.freeProxy(proxy);
\r
572 writeOnlyInvalidates.clear();
\r
575 public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
\r
576 synchronized (this) {
\r
577 return clusters.getClusterByClusterUID(clusterUID);
\r
580 public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
\r
581 return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
\r
583 public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
\r
584 return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
\r
586 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
\r
587 synchronized (this) {
\r
588 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
\r
589 if (null == clusterImpl)
\r
590 clusterImpl = clusters.makeProxy(clusterUID);
\r
591 return clusterImpl;
\r
594 public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
\r
595 synchronized (this) {
\r
596 ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
\r
597 if (null == clusterImpl)
\r
598 clusterImpl = clusters.makeProxy(id2);
\r
599 return clusterImpl;
\r
602 ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
\r
603 synchronized (this) {
\r
604 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
\r
605 int clusterKey = 0;
\r
607 if (proxy.isLoaded())
\r
610 clusterKey = proxy.getClusterKey();
\r
612 if (clusterKey == 0) {
\r
613 proxy = clusters.makeProxy(clusterId);
\r
614 clusterKey = proxy.getClusterKey();
\r
616 ClusterImpl ci = tryLoad(clusterId, clusterKey);
\r
618 throw new ResourceNotFoundException(clusterId);
\r
620 } catch (ClusterDoesNotExistException t) {
\r
621 clusters.removeProxy(proxy);
\r
627 ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
\r
628 synchronized (this) {
\r
629 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
\r
633 return makeCluster(clusterId, writeOnly);
\r
636 ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
\r
637 synchronized (this) {
\r
638 ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
\r
640 throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
\r
644 long getClusterIdOrCreate(ClusterUID clusterUID) {
\r
645 return clusterUID.second;
\r
647 final long getClusterIdByResourceKey(final int resourceKey)
\r
648 throws DatabaseException {
\r
649 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
\r
650 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
\r
651 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
\r
652 ClusterI c = clusterArray[clusterKey];
\r
654 throw new RuntimeException("No cluster for key " + resourceKey);
\r
655 return c.getClusterId();
\r
657 final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
\r
658 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
\r
659 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
\r
660 Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
\r
663 ClusterI c = clusterArray[clusterKey];
\r
665 Logger.defaultLogError("No cluster for key " + resourceKey);
\r
668 return c.getClusterId();
\r
673 void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
\r
674 synchronized (this) {
\r
675 session.flushCounter = 0;
\r
676 session.clusterStream.reallyFlush();
\r
677 ClientChangesImpl cs = new ClientChangesImpl(session);
\r
678 if (session.clientChanges == null)
\r
679 session.clientChanges = cs;
\r
680 for (int i=0; i<clusterUID.length; ++i) {
\r
682 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
683 System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
\r
684 ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
\r
685 if (null == oldCluster)
\r
687 if (!oldCluster.isLoaded())
\r
689 int clusterKey = oldCluster.getClusterKey();
\r
690 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
\r
692 boolean big = oldCluster instanceof ClusterBig;
\r
693 boolean small = oldCluster instanceof ClusterSmall;
\r
694 if (!big && !small)
\r
696 // ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
\r
697 Database.Session dbSession = sessionImpl.graphSession.dbSession;
\r
699 ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
\r
702 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
\r
703 ClusterSupport support = sessionImpl.clusterTranslator;
\r
705 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
\r
706 } catch (DatabaseException e) {
\r
707 e.printStackTrace();
\r
713 if (null == newCluster)
\r
715 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
716 System.err.println("cluster=" + newCluster + " updated.");
\r
717 importanceMap.remove(oldCluster.getImportance());
\r
718 if (collectorPolicy != null)
\r
719 collectorPolicy.removed(oldCluster);
\r
720 clusters.replace(newCluster);
\r
721 if (!newCluster.isLoaded())
\r
722 Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
\r
723 importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
\r
724 if (collectorPolicy != null)
\r
725 collectorPolicy.added(newCluster);
\r
726 // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
\r
727 refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
\r
728 } catch (Throwable t) {
\r
729 Logger.defaultLogError("Failed to load cluster in refresh.", t);
\r
732 // Fake update of cluster changes.
\r
733 QueryProcessor queryProcessor = session.getQueryProvider2();
\r
734 WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
\r
735 TaskHelper th = null;
\r
736 if (null == session.writeState) {
\r
737 th = new TaskHelper("Refresh");
\r
738 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
\r
740 session.getQueryProvider2().performDirtyUpdates(writer);
\r
741 session.fireMetadataListeners(writer, cs);
\r
742 session.getQueryProvider2().performScheduledUpdates(writer);
\r
743 session.fireReactionsToSynchronize(cs);
\r
744 session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
745 session.printDiagnostics();
\r
748 session.writeState = null;
\r
753 final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
\r
754 throws DatabaseException {
\r
755 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
756 System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
\r
757 // get cluster change sets
\r
758 QueryProcessor queryProcessor = session.getQueryProvider2();
\r
761 cc = session.graphSession.getClusterChanges(clusterUID, csid);
\r
762 } catch (Exception e) {
\r
763 Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
\r
764 release(clusterId);
\r
767 for (int i=0; i<cc.getResourceIndex().length; ++i) {
\r
768 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
\r
769 ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
\r
770 ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
\r
771 if (null == pCluster)
\r
773 int pClusterKey = pCluster.getClusterKey();
\r
774 int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
\r
775 queryProcessor.updateStatements(resource, predicate);
\r
776 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
777 System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
\r
779 for (int i=0; i<cc.getValueIndex().length; ++i) {
\r
780 int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
\r
781 queryProcessor.updateValue(resource);
\r
782 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
783 System.err.println("value " + cc.getValueIndex()[i] + " changed.");
\r
786 final void refreshImportance(ClusterImpl c) {
\r
788 if (c.isWriteOnly())
\r
791 importanceMap.remove(c.getImportance());
\r
792 if(collectorPolicy != null) collectorPolicy.removed(c);
\r
794 long newImportance = timeCounter();
\r
795 // System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
\r
796 c.setImportance(newImportance);
\r
798 Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
\r
800 importanceMap.put(c.getImportance(), new ImportanceEntry(c));
\r
801 if(collectorPolicy != null) collectorPolicy.added(c);
\r
805 static long loadTime = 0;
\r
807 @SuppressWarnings("unchecked")
\r
808 public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
\r
809 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
\r
810 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
\r
811 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
\r
812 ClusterI cluster = clusterArray[clusterKey];
\r
813 if (cluster == null)
\r
814 throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
\r
818 TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
\r
819 int clusterLoadCounter = 0;
\r
821 @SuppressWarnings("unchecked")
\r
822 public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
\r
823 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
\r
824 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
\r
825 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
\r
826 ClusterI c = clusterArray[clusterKey];
\r
829 if (c.isLoaded()) {
\r
830 if ((counter++ & 4095) == 0)
\r
831 refreshImportance((ClusterImpl) c);
\r
834 if (!(c instanceof ClusterSmall)) {
\r
835 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
\r
839 ClusterSmall cs = (ClusterSmall) c;
\r
841 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
\r
842 long start = System.nanoTime();
\r
843 cluster = load2(cs.getClusterId(), cs.getClusterKey());
\r
844 long load = System.nanoTime()-start;
\r
846 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
\r
847 int was = clusterLoadHistogram.get(cluster.getClusterId());
\r
848 clusterLoadHistogram.put(cluster.getClusterId(), was+1);
\r
849 clusterLoadCounter++;
\r
850 String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
\r
851 if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
\r
852 new Exception(text).printStackTrace();
\r
854 System.err.println(text);
\r
858 cluster = load2(cs.getClusterId(), cs.getClusterKey());
\r
860 } catch (DatabaseException e) {
\r
861 Logger.defaultLogError(e);
\r
862 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
863 e.printStackTrace();
\r
864 String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey
\r
865 + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
\r
866 // TODO: this jams the system => needs refactoring.
\r
867 throw new RuntimeDatabaseException(msg, e);
\r
869 return (T) cluster;
\r
872 @SuppressWarnings("unchecked")
\r
873 final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
\r
874 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
\r
875 if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
\r
876 throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
\r
877 ClusterI c = clusterArray[clusterKey];
\r
879 throw new RuntimeException("No cluster for resource key " + resourceKey);
\r
880 if (c.isLoaded()) {
\r
881 if ((counter++ & 4095) == 0)
\r
882 refreshImportance((ClusterImpl) c);
\r
885 if (!(c instanceof ClusterSmall)) {
\r
886 Logger.defaultLogError("Proxy must be instance of ClusterSmall");
\r
890 ClusterSmall cs = (ClusterSmall) c;
\r
892 // System.err.println("Load2 " + resourceKey);
\r
893 cluster = load2(cs.getClusterId(), cs.getClusterKey());
\r
894 } catch (DatabaseException e) {
\r
895 if (DebugPolicy.REPORT_CLUSTER_EVENTS)
\r
896 e.printStackTrace();
\r
897 int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
\r
898 long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
\r
899 String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
\r
900 Logger.defaultLogError(msg, e);
\r
901 c.setDeleted(true, null);
\r
904 return (T) cluster;
\r
907 void printDebugInfo(ClusterSupport support)
\r
908 throws DatabaseException {
\r
909 final int SIZE = clusters.size();
\r
911 for (int i = 1; i < SIZE; ++i) {
\r
912 ClusterI c = clusterArray[i];
\r
913 c.getNumberOfResources(support);
\r
914 long size = c.getUsedSpace();
\r
915 System.out.println("cluster=" + c.getClusterId() + " size=" + size);
\r
916 c.printDebugInfo("koss: ", support);
\r
919 System.out.println("Total number of clusters " + SIZE);
\r
920 System.out.println("Total cluster size " + sum);
\r
923 Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
\r
925 public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
\r
926 ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
\r
927 if (curr.isLoaded())
\r
930 // getPrefetched();
\r
932 // curr = (ClusterImpl) clusterArray[clusterKey];
\r
933 // if (curr.isLoaded())
\r
936 final Semaphore s = new Semaphore(0);
\r
937 final DatabaseException[] ex = new DatabaseException[1];
\r
939 load2(clusterId, clusterKey, new Callback<DatabaseException>() {
\r
941 public void run(DatabaseException e) {
\r
948 } catch (InterruptedException e) {
\r
949 Logger.defaultLogError(e);
\r
953 if (Development.DEVELOPMENT) {
\r
954 if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
\r
956 ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
\r
957 if (loaded instanceof ClusterSmall)
\r
958 ((ClusterSmall) loaded).check();
\r
959 else if (loaded instanceof ClusterBig)
\r
960 ((ClusterBig) loaded).check();
\r
961 } catch (Throwable t) {
\r
962 t.printStackTrace();
\r
967 validate(clusterKey);
\r
969 return (ClusterImpl) clusterArray[clusterKey];
\r
972 void validate(int clusterKey) {
\r
974 // if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
\r
976 // ClusterImpl server = (ClusterImpl)clusterArray[clusterKey];
\r
977 // String sd = server.dump(sessionImpl.clusterTranslator);
\r
978 // GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
\r
979 // ClusterImpl memory = gci.clusters.get(server.clusterUID);
\r
980 // if(memory == null)
\r
981 // System.err.println("ad");
\r
982 // String md = memory.dump(gci.support);
\r
984 // if(!sd.equals(md)) {
\r
986 // int diffPos = 0;
\r
987 // int minLength = Math.min(sd.length(), md.length());
\r
988 // for (int i = 0; i < minLength; i++) {
\r
989 // if (sd.charAt(i) != md.charAt(i)) {
\r
995 // int start = Math.max(diffPos-200, 0);
\r
996 // int end = Math.min(minLength-1, diffPos+200);
\r
998 // System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() + " == ");
\r
999 // System.err.println("== SESSION == ");
\r
1000 // System.err.println(sd.substring(start, end));
\r
1001 // System.err.println("== MEM == ");
\r
1002 // System.err.println(md.substring(start, end));
\r
1003 // System.err.println("== CLUSTER DIFFERENCE ENDS == ");
\r
1005 // throw new IllegalStateException();
\r
1011 public synchronized ClusterImpl getPrefetched() {
\r
1012 synchronized(prefetch) {
\r
1017 public synchronized void load2(long clusterId, int clusterKey, final Callback<DatabaseException> runnable) {
\r
1019 assert (Constants.ReservedClusterId != clusterId);
\r
1021 ClusterImpl cluster = null;
\r
1022 DatabaseException e = null;
\r
1026 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
\r
1027 // cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
\r
1028 Database.Session session = sessionImpl.graphSession.dbSession;
\r
1029 // cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
\r
1030 // cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
\r
1032 // cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
\r
1034 cluster = session.clone(clusterUID, new ClusterCreator() {
\r
1037 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
\r
1038 ClusterSupport support = sessionImpl.clusterTranslator;
\r
1040 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
\r
1041 } catch (DatabaseException e) {
\r
1042 e.printStackTrace();
\r
1049 } catch (Throwable t) {
\r
1050 // It's totally legal to call for non-existing cluster.
\r
1051 // Sometimes this indicates an error, though.
\r
1052 Logger.getDefault().logInfo("Load cluster failed.", t);
\r
1053 if (t instanceof DatabaseException)
\r
1054 e = (DatabaseException) t;
\r
1056 e = new DatabaseException("Load cluster failed.", t);
\r
1058 if (null == cluster) {
\r
1063 // new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
\r
1065 // Can not be called with null argument.
\r
1066 replaceCluster(cluster);
\r
1067 sessionImpl.onClusterLoaded(clusterId);
\r
1068 runnable.run(null);
\r
1072 public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
\r
1074 assert (Constants.ReservedClusterId != clusterId);
\r
1076 ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
\r
1078 Database.Session session = sessionImpl.graphSession.dbSession;
\r
1080 ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
\r
1083 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
\r
1084 ClusterSupport support = sessionImpl.clusterTranslator;
\r
1086 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
\r
1087 } catch (DatabaseException e) {
\r
1088 e.printStackTrace();
\r
1094 if (null == clusterArray[clusterKey])
\r
1095 clusterArray[clusterKey] = cluster;
\r
1097 replaceCluster(cluster);
\r
1098 sessionImpl.onClusterLoaded(clusterId);
\r
1100 validate(clusterKey);
\r
1105 public Collection<ClusterI> getClusters() {
\r
1106 ArrayList<ClusterI> result = new ArrayList<ClusterI>();
\r
1107 for (int i = 0; i < clusterArray.length; i++) {
\r
1108 ClusterI cluster = clusterArray[i];
\r
1109 if (cluster != null)
\r
1110 result.add(cluster);
\r
1115 public int size() {
\r
1116 return clusters.size();
\r
1119 public long timeCounter() {
\r
1120 return timeCounter++;
\r
1123 public boolean hasVirtual(int index) {
\r
1124 return virtuals[index];
\r
1127 public void markVirtual(int index) {
\r
1128 virtuals[index] = true;
\r
1131 public ClusterImpl[] getClusterArray() {
\r
1132 return clusterArray;
\r
1135 public boolean isImmutable(int id) {
\r
1137 int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
\r
1138 Boolean exist = immutables[clusterKey];
\r
1139 if(exist != null) return exist;
\r
1141 ClusterI cluster = getClusterByResourceKey(id);
\r
1142 boolean result = cluster == null ? false : cluster.getImmutable();
\r
1144 markImmutable(cluster, result);
\r
1149 public void markImmutable(ClusterI cluster, boolean value) {
\r
1150 immutables[cluster.getClusterKey()] = value;
\r
1153 public int getClusterKeyByUID(long first, long second) throws DatabaseException {
\r
1154 return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
\r