]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java
Declare asyncCount volatile since it is accessed from multiple threads
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / ClusterTable.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.TreeMap;
20 import java.util.concurrent.Semaphore;
21
22 import org.simantics.databoard.Bindings;
23 import org.simantics.db.ClusterCreator;
24 import org.simantics.db.Database;
25 import org.simantics.db.Database.Session.ClusterChanges;
26 import org.simantics.db.DevelopmentKeys;
27 import org.simantics.db.SessionVariables;
28 import org.simantics.db.common.utils.Logger;
29 import org.simantics.db.exception.ClusterDoesNotExistException;
30 import org.simantics.db.exception.DatabaseException;
31 import org.simantics.db.exception.ResourceNotFoundException;
32 import org.simantics.db.exception.RuntimeDatabaseException;
33 import org.simantics.db.impl.ClusterBase;
34 import org.simantics.db.impl.ClusterI;
35 import org.simantics.db.impl.ClusterSupport;
36 import org.simantics.db.impl.ClusterTraitsBase;
37 import org.simantics.db.impl.IClusterTable;
38 import org.simantics.db.impl.graph.WriteGraphImpl;
39 import org.simantics.db.impl.query.QueryProcessor;
40 import org.simantics.db.procore.cluster.ClusterBig;
41 import org.simantics.db.procore.cluster.ClusterImpl;
42 import org.simantics.db.procore.cluster.ClusterSmall;
43 import org.simantics.db.procore.cluster.ClusterTraits;
44 import org.simantics.db.procore.protocol.Constants;
45 import org.simantics.db.service.ClusterCollectorPolicy;
46 import org.simantics.db.service.ClusterCollectorPolicy.CollectorCluster;
47 import org.simantics.db.service.ClusterUID;
48 import org.simantics.utils.Development;
49 import org.simantics.utils.datastructures.Callback;
50
51 import fi.vtt.simantics.procore.DebugPolicy;
52 import fi.vtt.simantics.procore.internal.ClusterControlImpl.ClusterStateImpl;
53 import fi.vtt.simantics.procore.internal.SessionImplSocket.TaskHelper;
54 import gnu.trove.map.hash.TLongIntHashMap;
55 import gnu.trove.map.hash.TLongObjectHashMap;
56 import gnu.trove.procedure.TIntProcedure;
57 import gnu.trove.procedure.TLongObjectProcedure;
58 import gnu.trove.procedure.TObjectProcedure;
59 import gnu.trove.set.hash.TIntHashSet;
60
61 public final class ClusterTable implements IClusterTable {
62
63     int maximumBytes = 128 * 1024 * 1024;
64     int limit = (int)(0.8*(double)maximumBytes);
65
66     long timeCounter = 0;
67
68     final private SessionImplSocket sessionImpl;
69     final private ArrayList<ClusterI> writeOnlyClusters = new ArrayList<ClusterI>();
70     final private TIntHashSet writeOnlyInvalidates = new TIntHashSet();
71     final private static int ARRAY_SIZE = ClusterTraits.getClusterArraySize();
72     final private ClusterImpl[] clusterArray = new ClusterImpl[ARRAY_SIZE];
73     final private Boolean[] immutables = new Boolean[ARRAY_SIZE];
74     final private boolean[] virtuals = new boolean[ARRAY_SIZE];
75     static class ImportanceEntry implements CollectorCluster {
76         final public long importance;
77         final public long clusterId;
78         public ImportanceEntry(ClusterImpl impl) {
79             this(impl.getImportance(), impl.getClusterId());
80         }
81
82         public ImportanceEntry(long importance, long clusterId) {
83             this.importance = importance;
84             this.clusterId = clusterId;
85         }
86         @Override
87         public long getImportance() {
88             return importance;
89         }
90         @Override
91         public long getClusterId() {
92             return clusterId;
93         }
94     }
95     private class Clusters {
96         // This makes sure that non-null values from hashMap.get can be trusted (no unsynchronized rehashes occur)
97         final public TLongObjectHashMap<ClusterImpl> hashMap = new TLongObjectHashMap<ClusterImpl>(2 * ARRAY_SIZE);
98         //private final HashMap<ClusterUID, ClusterImpl> clusterU2I = new HashMap<ClusterUID, ClusterImpl>(); // Maps cluster UID to cluster.
99         private Clusters() {
100             clear();
101         }
102         private void clear() {
103             hashMap.clear();
104           //  clusterU2I.clear();
105             hashMap.put(0, null); // reserved for null value
106           //  clusterU2I.put(ClusterUID.make(0, 0), null);
107         }
108         private int size() {
109             return hashMap.size();
110         }
111         private ClusterImpl getClusterByClusterId(long clusterId) {
112             return hashMap.get(clusterId);
113         }
114         private ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
115                 return getClusterByClusterId(clusterUID.second);
116             //return clusterU2I.get(clusterUID);
117         }
118         private ClusterImpl getClusterByClusterUID(long id1, long id2) {
119                 return getClusterByClusterId(id2);
120         }
121         private ClusterImpl makeProxy(long clusterId) {
122             return makeProxy(ClusterUID.make(0, clusterId));
123         }
124         private ClusterImpl makeProxy(ClusterUID clusterUID) {
125             ClusterImpl proxy = hashMap.get(clusterUID.second);
126             if (null != proxy)
127                 return proxy;
128             int clusterKey = hashMap.size();
129             ClusterSmall sentinel = new ClusterSmall(clusterUID, clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
130             if (sentinel.clusterId != sentinel.clusterUID.second)
131                 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
132             create(sentinel);
133             return sentinel;
134         }
135         private void replace(ClusterImpl proxy) {
136             ClusterImpl old = clusterArray[proxy.clusterKey];
137             create(proxy);
138             if (null != old) {
139                 if (old.clusterKey != proxy.clusterKey)
140                     throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
141                 if (old.clusterId != proxy.clusterId)
142                     throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
143                 if (!old.clusterUID.equals(proxy.clusterUID))
144                     throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
145             }
146         }
147         private ClusterSmall freeProxy(ClusterImpl proxy) {
148             ClusterImpl clusterImpl = hashMap.get(proxy.clusterId);
149             if (null == clusterImpl)
150                 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
151 //            ClusterUID clusterUID = ClusterUID.make(0, proxy.clusterId);
152 //            ClusterImpl clusterImpl2 = clusterU2I.get(clusterUID );
153 //            if (clusterImpl != clusterImpl2)
154 //                throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
155             if (proxy.clusterId != clusterImpl.clusterId)
156                 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
157             if (proxy.clusterKey != clusterImpl.clusterKey)
158                 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
159             ClusterSmall sentinel = new ClusterSmall(makeClusterUID(proxy.clusterId) , proxy.clusterKey, ClusterTable.this, sessionImpl.clusterTranslator);
160             return (ClusterSmall)create(sentinel);
161         }
162         private ClusterImpl create(ClusterImpl clusterImpl) {
163             hashMap.put(clusterImpl.clusterId, clusterImpl);
164 //            clusterU2I.put(clusterImpl.clusterUID, clusterImpl);
165             clusterArray[clusterImpl.clusterKey] = clusterImpl;
166             return clusterImpl;
167         }
168         private void forEachValue(TObjectProcedure<ClusterImpl> procedure) {
169             hashMap.forEachValue(procedure);
170         }
171         private void forEachEntry(TLongObjectProcedure<ClusterImpl> procedure) {
172             hashMap.forEachEntry(procedure);
173         }
174         private ClusterUID makeClusterUID(long clusterId) {
175             return ClusterUID.make(0, clusterId);
176         }
177         private void removeProxy(ClusterImpl proxy) {
178             hashMap.remove(proxy.clusterId);
179             clusterArray[proxy.clusterKey] = null;
180         }
181     }
182     final public TreeMap<Long, CollectorCluster> importanceMap = new TreeMap<Long, CollectorCluster>();
183
184     private ClusterCollectorPolicy collectorPolicy;
185     private boolean dirtySizeInBytes = true;
186     private long sizeInBytes = 0;
187     private final Clusters clusters;
188     ClusterUID makeClusterUID(long clusterId) {
189         return clusters.makeClusterUID(clusterId);
190     }
191     public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
192         int clusterKey = ClusterTraits.getClusterKeyFromResourceKey(resourceKey);
193         return clusterArray[clusterKey].clusterUID;
194     }
195     ClusterTable(SessionImplSocket sessionImpl, File folderPath) {
196         this.sessionImpl = sessionImpl;
197         clusters = new Clusters();
198     }
199
200     void dispose() {
201         clusters.clear();
202         importanceMap.clear();
203     }
204
205     public void setCollectorPolicy(ClusterCollectorPolicy policy) {
206         this.collectorPolicy = policy;
207     }
208
209     /**
210      * Sets Cluster Table allocation size by percentage of maximum usable
211      * memory. Allocation is limited between 5% and 50% of maximum memory.
212      *
213      * @param precentage
214      */
215     private void setAllocateSize(double percentage) {
216
217         percentage = Math.max(percentage, 0.05);
218         percentage = Math.min(percentage, 0.5);
219
220         maximumBytes = (int) Math.floor(percentage * Runtime.getRuntime().maxMemory());
221     }
222
223     private double estimateProperAllocation(ClusterCollectorSupport support) {
224         long max = Runtime.getRuntime().maxMemory();
225         long free = Runtime.getRuntime().freeMemory();
226         long total = Runtime.getRuntime().totalMemory();
227         long realFree = max - total + free; // amount of free memory
228         long current = support.getCurrentSize(); // currently allocated cache
229         long inUseTotal = max - realFree; // currently used memory
230         long inUseOther = inUseTotal - current; // memory that the application uses (without the cache)
231         double otherUsePercentage = (double) inUseOther / (double) max; // percentage of memory that the application uses
232         double aimFree = 0.2; // percentage of maximum heap that we try to keep free
233         double estimate = 1.0 - otherUsePercentage - aimFree;
234         estimate = Math.min(estimate, 0.5);
235         estimate = Math.max(estimate, 0.05);
236         // System.out.println("Estimated allocation percentage " + estimate +
237 // " memory stats: max: " + max + ", free: " + realFree + ", inUse: " +
238 // inUseTotal + ", cached: " + current + ", cacheSize: " + maximumBytes);
239         return estimate;
240     }
241
242     void checkCollect() {
243         collector.collect();
244     }
245
246     synchronized ClusterImpl getClusterByClusterId(long clusterId) {
247         return clusters.getClusterByClusterId(clusterId);
248     }
249
250     ClusterBase getClusterByClusterKey(int clusterKey) {
251         return clusterArray[clusterKey];
252     }
253
254     synchronized ClusterImpl makeProxy(ClusterUID clusterUID, long clusterId) {
255         if (clusterUID.second != clusterId)
256             throw new RuntimeDatabaseException("Illegal id for cluster=" + clusterUID + " id=" + clusterId);
257         return clusters.makeProxy(clusterUID);
258     }
259
260     synchronized ClusterImpl makeCluster(long clusterId, boolean writeOnly) {
261         checkCollect();
262         ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
263         if (null != proxy)
264             return proxy;
265         ClusterUID clusterUID = ClusterUID.make(0, clusterId);
266         int clusterKey = clusters.size(); // new key
267         if (writeOnly) {
268             proxy = new ClusterWriteOnly(clusterUID, clusterKey, sessionImpl);
269             writeOnlyClusters.add(proxy);
270             return clusters.create(proxy);
271         } else {
272             ClusterImpl cluster = ClusterImpl.make(clusterUID, clusterKey, sessionImpl.clusterTranslator);
273             clusters.create(cluster);
274             if (!cluster.isLoaded())
275                 Logger.defaultLogError(new Exception("Bug in ClusterTable.makeCluster(long, boolean), cluster not loaded"));
276             importanceMap.put(cluster.getImportance(), new ImportanceEntry(cluster));
277             if(collectorPolicy != null) collectorPolicy.added(cluster);
278             return cluster;
279         }
280     }
281
282     synchronized void replaceCluster(ClusterI cluster) {
283         checkCollect();
284         int clusterKey = cluster.getClusterKey();
285         ClusterI existing = clusterArray[clusterKey];
286         if (existing.hasVirtual())
287             cluster.markVirtual();
288
289         importanceMap.remove(existing.getImportance());
290         if (collectorPolicy != null)
291             collectorPolicy.removed((ClusterImpl)existing);
292
293         clusters.replace((ClusterImpl)cluster);
294         if (!cluster.isLoaded())
295             Logger.defaultLogError(new Exception("Bug in ClusterTable.replaceCluster(ClusterI), cluster not loaded"));
296
297         importanceMap.put(cluster.getImportance(), new ImportanceEntry((ClusterImpl)cluster));
298         if(collectorPolicy != null) collectorPolicy.added((ClusterImpl)cluster);
299
300         if (!dirtySizeInBytes && !existing.isLoaded()) {
301             sizeInBytes += cluster.getCachedSize();
302         }
303
304     }
305
306     synchronized void release(CollectorCluster cluster) {
307         importanceMap.remove(cluster.getImportance());
308         release(cluster.getClusterId());
309     }
310
311     synchronized void release(long clusterId) {
312         ClusterImpl clusterImpl = clusters.getClusterByClusterId(clusterId);
313         if (null == clusterImpl)
314             return;
315         if(!clusterImpl.isLoaded() || clusterImpl.isEmpty())
316             return;
317         clusters.freeProxy(clusterImpl);
318         importanceMap.remove(clusterImpl.getImportance());
319         if (collectorPolicy != null)
320             collectorPolicy.removed(clusterImpl);
321         if (sessionImpl.writeState != null)
322             sessionImpl.clusterStream.flush(clusterImpl.clusterUID);
323         if (!dirtySizeInBytes) {
324             sizeInBytes -= clusterImpl.getCachedSize();
325         }
326     }
327
328     synchronized void compact(long id) {
329         ClusterI impl = clusters.getClusterByClusterId(id);
330         if (impl != null)
331             impl.compact();
332     }
333
334     void updateSize() {
335 // cachedSize = getSizeInBytes();
336     }
337
338     double getLoadProbability() {
339         // This can currently cause stack overflow
340         return 1.0;
341 // if(cachedSize < SLOW_LIMIT) return 1.0;
342 // if(cachedSize > HIGH_LIMIT) return 1e-2;
343 //
344 // double pos = (double)(cachedSize - SLOW_LIMIT) / (double)(HIGH_LIMIT -
345 // SLOW_LIMIT);
346 // double val = 0.1 * ((pos-1) * (pos-1)) + 1e-2;
347 // return val;
348     }
349
350     class SizeProcedure implements TObjectProcedure<ClusterImpl> {
351         public long result = 0;
352
353         @Override
354         public boolean execute(ClusterImpl cluster) {
355             if (cluster != null) {
356                 try {
357                     if (cluster.isLoaded() && !cluster.isEmpty()) {
358                         result += cluster.getCachedSize();
359                     }
360                 } catch (Throwable t) {
361                     Logger.defaultLogError(t);
362                 }
363             }
364             return true;
365         }
366
367         public void clear() {
368             result = 0;
369         }
370
371     };
372
373     private SizeProcedure sizeProcedure = new SizeProcedure();
374     long getSizeInBytes() {
375         if (dirtySizeInBytes) {
376             sizeProcedure.clear();
377             clusters.forEachValue(sizeProcedure);
378             sizeInBytes = sizeProcedure.result;
379             // System.err.println("recomputed size of clusterTable => " + sizeInBytes);
380             setDirtySizeInBytes(false);
381         }
382         return sizeInBytes;
383     }
384
385     public void setDirtySizeInBytes(boolean value) {
386         dirtySizeInBytes = value;
387     }
388
389     ClusterStateImpl getState() {
390         final ClusterStateImpl result = new ClusterStateImpl();
391         clusters.forEachEntry(new TLongObjectProcedure<ClusterImpl>() {
392             @Override
393             public boolean execute(long arg0, ClusterImpl arg1) {
394                 if (arg1 == null)
395                     return true;
396                 if (arg1.isLoaded() && !arg1.isEmpty()) {
397                     result.ids.add(new ImportanceEntry(arg1));
398                 }
399                 return true;
400             }
401         });
402         return result;
403     }
404
405     void restoreState(ClusterStateImpl state) {
406         ClusterStateImpl current = getState();
407         for (CollectorCluster id : current.ids)
408             if (!state.ids.contains(id))
409                 collectorSupport.release(id);
410     }
411
412     class ClusterCollectorImpl implements ClusterCollector {
413
414         final private ClusterCollectorSupport support;
415         private ClusterCollectorPolicy policy;
416         ClusterCollectorImpl(ClusterCollectorSupport support) {
417             this.support = support;
418         }
419
420         ClusterCollectorPolicy setPolicy(ClusterCollectorPolicy newPolicy) {
421
422             ClusterCollectorPolicy oldPolicy = policy;
423             policy = newPolicy;
424
425             if(policy != null) {
426                 for (CollectorCluster id : support.getResidentClusters()) {
427                     policy.added(getClusterByClusterId(id.getClusterId()));
428                 }
429             }
430
431             support.setPolicy(policy);
432
433             return oldPolicy;
434
435         }
436
437         @Override
438         public void collect() {
439
440             if(policy != null) {
441
442                 release(policy.select());
443
444             } else {
445
446                 int size = support.getCurrentSize();
447                 boolean dynamicAllocation = useDynamicAllocation();
448                 if (dynamicAllocation)
449                     setAllocateSize(estimateProperAllocation(support));
450                 if (DebugPolicy.CLUSTER_COLLECTION) {
451                     System.out.println("Cluster collector activated, current size = " + size + " limit = " + maximumBytes);
452                 }
453                 if (dynamicAllocation) {
454                     int collectSize = maximumBytes / 2;
455                     collectSize = Math.min(collectSize, 32 * 1024 * 1024);
456                     // try to keep allocated clusters below the maximum
457                     if (maximumBytes - size > collectSize)
458                         return;
459                     collectSize += size - maximumBytes;
460                     collect(collectSize);
461                 } else {
462                     // try to keep allocated clusters below the maximum
463                     if (size < maximumBytes)
464                         return;
465                     // shave off 20%
466                     collect(size-limit);
467                 }
468
469             }
470
471         }
472
473         private boolean useDynamicAllocation() {
474             return "true".equalsIgnoreCase(System.getProperty("org.simantics.db.cluster.dynamicAlloc"));
475         }
476
477         @Override
478         public void collect(int target) {
479
480             if(policy != null) {
481
482                 release(policy.select(target));
483
484             } else {
485
486                 ArrayList<CollectorCluster> toRelease = new ArrayList<CollectorCluster>();
487
488                 for (CollectorCluster cluster : support.getResidentClusters()) {
489                     target -= support.getClusterSize(cluster);
490                     if (target > 0) {
491                         toRelease.add(cluster);
492                     } else {
493                         break;
494                     }
495                 }
496
497                 release(toRelease);
498
499                 if (DebugPolicy.CLUSTER_COLLECTION) {
500                     System.out.println("Cluster collector finished, current size = " + support.getCurrentSize());
501                 }
502
503             }
504
505         }
506
507         void release(Collection<CollectorCluster> toRelease) {
508             for (CollectorCluster id : toRelease) {
509                 support.release(id);
510             }
511         }
512
513     }
514
515     private ClusterCollectorSupport collectorSupport = new ClusterCollectorSupportImpl(this);
516     ClusterCollectorImpl collector = new ClusterCollectorImpl(collectorSupport);
517
518     void gc() {
519         collector.collect();
520     }
521
522     private long newResourceClusterId = Constants.NewClusterId;
523     public static final int CLUSTER_FILL_SIZE = ClusterTraitsBase.getMaxNumberOfResources();
524
525     /*
526      * Uusi id varataan vasta, kun lis�t��n resurssi => reservedIds sis�lt��
527      * vain jo k�yt�ss� olevia klustereita
528      */
529
530     ClusterImpl getNewResourceCluster(ClusterSupport cs, GraphSession graphSession, boolean writeOnly)
531     throws DatabaseException {
532         ClusterImpl result = null;
533         if (Constants.NewClusterId == newResourceClusterId) {
534             newResourceClusterId = graphSession.newClusterId();
535             result = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
536         } else {
537             ClusterImpl cluster = getClusterByClusterIdOrThrow(newResourceClusterId);
538             if (cluster.getNumberOfResources(cs) >= CLUSTER_FILL_SIZE) {
539                 newResourceClusterId = graphSession.newClusterId();
540                 cluster = getClusterByClusterIdOrMake(newResourceClusterId, writeOnly);
541             }
542             result = cluster;
543         }
544         return ensureLoaded(result);
545     }
546
547     void flushCluster(GraphSession graphSession) {
548 // We seem to disagree about this.
549 // graphSession.newClusterId();
550         newResourceClusterId = Constants.NewClusterId;
551     }
552
553     void writeOnlyInvalidate(ClusterI impl) {
554         writeOnlyInvalidates.add(impl.getClusterKey());
555     }
556
557     void removeWriteOnlyClusters() {
558         for (ClusterI proxy : writeOnlyClusters) {
559             if (!(proxy instanceof ClusterImpl))
560                 throw new RuntimeDatabaseException("ClusterTable corrupted. Contact application support.");
561             clusters.freeProxy((ClusterImpl)proxy);
562         }
563         writeOnlyClusters.clear();
564         writeOnlyInvalidates.forEach(new TIntProcedure() {
565             @Override
566             public boolean execute(int clusterKey) {
567                 ClusterImpl proxy = clusterArray[clusterKey];
568                 ClusterUID clusterUID = proxy.getClusterUID();
569                 System.err.println("writeOnlyInvalidate " + clusterUID);
570                 clusters.freeProxy(proxy);
571                 return true;
572             }
573         });
574         writeOnlyInvalidates.clear();
575     }
576
577     public ClusterImpl getClusterByClusterUID(ClusterUID clusterUID) {
578         synchronized (this) {
579             return clusters.getClusterByClusterUID(clusterUID);
580         }
581     }
582     public int getClusterKeyByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
583         return getClusterKeyByClusterUIDOrMakeProxy(0/*clusterUID.first*/, clusterUID.second);
584     }
585     public int getClusterKeyByClusterUIDOrMakeProxy(long id1, long id2) {
586         return getClusterByClusterUIDOrMakeProxy(id1, id2).clusterKey;
587     }
588     public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) {
589         synchronized (this) {
590             ClusterImpl clusterImpl = clusters.getClusterByClusterUID(clusterUID);
591             if (null == clusterImpl)
592                 clusterImpl = clusters.makeProxy(clusterUID);
593             return clusterImpl;
594         }
595     }
596     public ClusterImpl getClusterByClusterUIDOrMakeProxy(long id1, long id2) {
597         synchronized (this) {
598             ClusterImpl clusterImpl = clusters.getClusterByClusterUID(id1, id2);
599             if (null == clusterImpl)
600                 clusterImpl = clusters.makeProxy(id2);
601             return clusterImpl;
602         }
603     }
604     ClusterImpl getLoadOrThrow(long clusterId) throws DatabaseException {
605         synchronized (this) {
606             ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
607             int clusterKey = 0;
608             if (proxy != null)
609                 if (proxy.isLoaded())
610                     return proxy;
611                 else
612                     clusterKey = proxy.getClusterKey();
613             try {
614                 if (clusterKey == 0) {
615                     proxy = clusters.makeProxy(clusterId);
616                     clusterKey = proxy.getClusterKey();
617                 }
618                 ClusterImpl ci = tryLoad(clusterId, clusterKey);
619                 if (null == ci)
620                     throw new ResourceNotFoundException(clusterId);
621                 return ci;
622             } catch (ClusterDoesNotExistException t) {
623                 clusters.removeProxy(proxy);
624                 throw t;
625             }
626         }
627     }
628
629     ClusterImpl getClusterByClusterIdOrMake(long clusterId, boolean writeOnly) {
630         synchronized (this) {
631             ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
632             if (proxy != null)
633                 return proxy;
634             else
635                 return makeCluster(clusterId, writeOnly);
636         }
637     }
638     ClusterImpl getClusterByClusterIdOrThrow(long clusterId) {
639         synchronized (this) {
640             ClusterImpl proxy = clusters.getClusterByClusterId(clusterId);
641             if (null == proxy)
642                 throw new IllegalArgumentException("Cluster id=" + clusterId + " is not created.");
643             return proxy;
644         }
645     }
646     long getClusterIdOrCreate(ClusterUID clusterUID) {
647         return clusterUID.second;
648     }
649     final long getClusterIdByResourceKey(final int resourceKey)
650     throws DatabaseException {
651         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKey(resourceKey);
652         if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
653             throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
654         ClusterI c = clusterArray[clusterKey];
655         if (c == null)
656             throw new RuntimeException("No cluster for key " + resourceKey);
657         return c.getClusterId();
658     }
659     final long getClusterIdByResourceKeyNoThrow(final int resourceKey) {
660         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
661         if (ClusterTraitsBase.isVirtualClusterKey(clusterKey)) {
662             Logger.defaultLogError("Tried to get a persistent cluster for a virtual resource. key=" + resourceKey);
663             return 0;
664         }
665         ClusterI c = clusterArray[clusterKey];
666         if (c == null) {
667             Logger.defaultLogError("No cluster for key " + resourceKey);
668             return 0;
669         }
670         return c.getClusterId();
671     }
672
673     int counter = 0;
674
675     void refresh(long csid, SessionImplSocket session, ClusterUID[] clusterUID) {
676         synchronized (this) {
677             session.flushCounter = 0;
678             session.clusterStream.reallyFlush();
679             ClientChangesImpl cs = new ClientChangesImpl(session);
680             if (session.clientChanges == null)
681                 session.clientChanges = cs;
682             for (int i=0; i<clusterUID.length; ++i) {
683                 try {
684                     if (DebugPolicy.REPORT_CLUSTER_EVENTS)
685                         System.err.println("cluster=" + clusterUID[i] + " has changed. length=" + clusterUID.length);
686                     ClusterImpl oldCluster = clusters.getClusterByClusterUID(clusterUID[i]);
687                     if (null == oldCluster)
688                         continue;
689                     if (!oldCluster.isLoaded())
690                         continue;
691                     int clusterKey = oldCluster.getClusterKey();
692                     if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
693                         continue;
694                     boolean big = oldCluster instanceof ClusterBig;
695                     boolean small = oldCluster instanceof ClusterSmall;
696                     if (!big && !small)
697                         continue;
698 //                    ClusterImpl newCluster = (ClusterImpl) sessionImpl.graphSession.getClusterImpl(clusterUID[i], clusterKey);
699                     Database.Session dbSession = sessionImpl.graphSession.dbSession;
700                     
701                     ClusterImpl newCluster = dbSession.clone(clusterUID[i], new ClusterCreator() {
702                         
703                         @Override
704                         public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
705                             ClusterSupport support = sessionImpl.clusterTranslator;
706                             try {
707                                 return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
708                             } catch (DatabaseException e) {
709                                 e.printStackTrace();
710                                 return null;
711                             }
712                         }
713                         
714                     });
715                     if (null == newCluster)
716                         continue;
717                     if (DebugPolicy.REPORT_CLUSTER_EVENTS)
718                         System.err.println("cluster=" + newCluster + "  updated.");
719                     importanceMap.remove(oldCluster.getImportance());
720                     if (collectorPolicy != null)
721                         collectorPolicy.removed(oldCluster);
722                     clusters.replace(newCluster);
723                     if (!newCluster.isLoaded())
724                         Logger.defaultLogError(new Exception("Bug in ClusterTable.refresh, cluster not loaded"));
725                     importanceMap.put(newCluster.getImportance(), new ImportanceEntry(newCluster));
726                     if (collectorPolicy != null)
727                         collectorPolicy.added(newCluster);
728                     // Now we have fetched the new cluster but to emulate effects of the changes in it we fetch the cluster changes from server.
729                     refreshCluster(csid, session, oldCluster, clusterUID[i], newCluster.clusterId, clusterKey);
730                 } catch (Throwable t) {
731                     Logger.defaultLogError("Failed to load cluster in refresh.", t);
732                 }
733             }
734             // Fake update of cluster changes.
735             QueryProcessor queryProcessor = session.getQueryProvider2();
736             WriteGraphImpl writer = WriteGraphImpl.create(queryProcessor, session.writeSupport, null);
737             TaskHelper th = null;
738             if (null == session.writeState) {
739                 th = new TaskHelper("Refresh");
740                 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
741                 try {
742                     session.getQueryProvider2().performDirtyUpdates(writer);
743                     session.fireMetadataListeners(writer, cs);
744                     session.getQueryProvider2().performScheduledUpdates(writer);
745                     session.fireReactionsToSynchronize(cs);
746                     session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
747                     session.printDiagnostics();
748                 } finally {
749                     if (null != th)
750                         session.writeState = null;
751                 }
752             }
753         }
754     }
755     final void refreshCluster(long csid, SessionImplSocket session, ClusterImpl cluster, ClusterUID clusterUID, long clusterId, int clusterKey)
756     throws DatabaseException {
757         if (DebugPolicy.REPORT_CLUSTER_EVENTS)
758             System.err.println("cluster=" + clusterUID + " id=" + clusterId + " key=" + clusterKey + " resources will be updated.");
759         // get cluster change sets
760         QueryProcessor queryProcessor = session.getQueryProvider2();
761         ClusterChanges cc;
762         try {
763             cc = session.graphSession.getClusterChanges(clusterUID, csid);
764         } catch (Exception e) {
765             Logger.defaultLogError("Could not get cluster changes. cluster=" + clusterUID, e);
766             release(clusterId);
767             return;
768         }
769         for (int i=0; i<cc.getResourceIndex().length; ++i) {
770             int resource = ClusterTraits.createResourceKey(clusterKey, cc.getResourceIndex()[i]);
771             ClusterUID pClusterUID = new ClusterUID(cc.getPredicateFirst()[i], cc.getPredicateSecond()[i]);
772             ClusterImpl pCluster = clusters.getClusterByClusterUID(pClusterUID);
773             if (null == pCluster)
774                 continue;
775             int pClusterKey = pCluster.getClusterKey();
776             int predicate = ClusterTraits.createResourceKey(pClusterKey, cc.getPredicateIndex()[i]);
777             queryProcessor.updateStatements(resource, predicate);
778             if (DebugPolicy.REPORT_CLUSTER_EVENTS)
779                 System.err.println("resource " + cc.getResourceIndex()[i] + " relation " + cc.getPredicateIndex()[i] + " changed.");
780         }
781         for (int i=0; i<cc.getValueIndex().length; ++i) {
782             int resource = ClusterTraits.createResourceKey(clusterKey, cc.getValueIndex()[i]);
783             queryProcessor.updateValue(resource);
784             if (DebugPolicy.REPORT_CLUSTER_EVENTS)
785                 System.err.println("value " + cc.getValueIndex()[i] + " changed.");
786         }
787     }
788     final void refreshImportance(ClusterImpl c) {
789
790         if (c.isWriteOnly())
791             return;
792
793         importanceMap.remove(c.getImportance());
794         if(collectorPolicy != null) collectorPolicy.removed(c);
795
796         long newImportance = timeCounter();
797 // System.err.println("refreshImportance " + c.getClusterId() + " => " + newImportance);
798         c.setImportance(newImportance);
799         if (!c.isLoaded())
800             Logger.defaultLogError(new Exception("Bug in ClusterTable.refreshImportance(ClusterImpl), cluster not loaded"));
801
802         importanceMap.put(c.getImportance(), new ImportanceEntry(c));
803         if(collectorPolicy != null) collectorPolicy.added(c);
804
805     }
806
807     static long loadTime = 0;
808
809     @SuppressWarnings("unchecked")
810     public final <T extends ClusterI> T getClusterProxyByResourceKey(final int resourceKey) {
811         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
812         if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
813             throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
814         ClusterI cluster = clusterArray[clusterKey];
815         if (cluster == null)
816             throw new RuntimeException("No proxy for existing cluster. Resource key = " + resourceKey);
817         return (T)cluster;
818     }
819
820     TLongIntHashMap clusterLoadHistogram = new TLongIntHashMap();
821     int clusterLoadCounter = 0;
822
823     private <T extends ClusterI> T ensureLoaded(T c) {
824         ClusterI cluster;
825         ClusterImpl cs = (ClusterImpl) c;
826         try {
827             if(DebugPolicy.REPORT_CLUSTER_LOADING) {
828                 long start = System.nanoTime();
829                 cluster = load2(cs.getClusterId(), cs.getClusterKey());
830                 long load = System.nanoTime()-start;
831                 loadTime +=  load;
832                 if(DebugPolicy.REPORT_CLUSTER_LOADING) {
833                     int was = clusterLoadHistogram.get(cluster.getClusterId());
834                     clusterLoadHistogram.put(cluster.getClusterId(), was+1);
835                     clusterLoadCounter++;
836                     String text = "Load2 " + cluster + " " + 1e-9*loadTime + "s. " + 1e-6*load + "ms. " + clusterLoadCounter + " " + was + " " + cluster.getUsedSpace() + " " + cluster.getImportance();
837                     if(DebugPolicy.REPORT_CLUSTER_LOADING_STACKS) {
838                         new Exception(text).printStackTrace();
839                     } else {
840                         System.err.println(text);
841                     }
842                 }
843             } else {
844                 cluster = load2(cs.getClusterId(), cs.getClusterKey());
845             }
846         } catch (DatabaseException e) {
847             Logger.defaultLogError(e);
848             if (DebugPolicy.REPORT_CLUSTER_EVENTS)
849                 e.printStackTrace();
850             String msg = "Failed to load cluster " + cs.getClusterUID();// + " resourceId=" + (((cs.getClusterId() << 16 + (resourceKey & 65535))));
851             // TODO: this jams the system => needs refactoring.
852             throw new RuntimeDatabaseException(msg, e);
853         }
854         return (T) cluster;
855     }
856     
857     @SuppressWarnings("unchecked")
858     public final <T extends ClusterI> T getClusterByResourceKey(final int resourceKey) {
859         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
860         if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
861             throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
862         ClusterI c = clusterArray[clusterKey];
863         if (c == null)
864             return null;
865         if (c.isLoaded()) {
866             if ((counter++ & 4095) == 0)
867                 refreshImportance((ClusterImpl) c);
868             return (T) c;
869         }
870         if (!(c instanceof ClusterSmall)) {
871             Logger.defaultLogError("Proxy must be instance of ClusterSmall");
872             return null;
873         }
874         return ensureLoaded((T)c);
875     }
876
877     @SuppressWarnings("unchecked")
878     final <T extends ClusterI> T checkedGetClusterByResourceKey(final int resourceKey) {
879         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(resourceKey);
880         if (ClusterTraitsBase.isVirtualClusterKey(clusterKey))
881             throw new RuntimeException("Tried to get a persistent cluster for a virtual resource.");
882         ClusterI c = clusterArray[clusterKey];
883         if (c == null)
884             throw new RuntimeException("No cluster for resource key " + resourceKey);
885         if (c.isLoaded()) {
886             if ((counter++ & 4095) == 0)
887                 refreshImportance((ClusterImpl) c);
888             return (T) c;
889         }
890         if (!(c instanceof ClusterSmall)) {
891             Logger.defaultLogError("Proxy must be instance of ClusterSmall");
892             return null;
893         }
894         ClusterI cluster;
895         ClusterSmall cs = (ClusterSmall) c;
896         try {
897 //          System.err.println("Load2 " + resourceKey);
898             cluster = load2(cs.getClusterId(), cs.getClusterKey());
899         } catch (DatabaseException e) {
900             if (DebugPolicy.REPORT_CLUSTER_EVENTS)
901                 e.printStackTrace();
902             int resourceIndex = resourceKey & ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(resourceKey);
903             long resourceId = ClusterTraitsBase.createResourceIdNoThrow(cs.getClusterId(), resourceIndex);
904             String msg = "Failed to load cluster " + cs.getClusterUID() + " for resource key " + resourceKey + " resourceIndex=" + resourceIndex + " resourceId=" + resourceId;
905             Logger.defaultLogError(msg, e);
906             c.setDeleted(true, null);
907             return (T)c;
908         }
909         return (T) cluster;
910     }
911
912     void printDebugInfo(ClusterSupport support)
913             throws DatabaseException {
914         final int SIZE = clusters.size();
915         long sum = 0;
916         for (int i = 1; i < SIZE; ++i) {
917             ClusterI c = clusterArray[i];
918             c.getNumberOfResources(support);
919             long size = c.getUsedSpace();
920             System.out.println("cluster=" + c.getClusterId() + " size=" + size);
921             c.printDebugInfo("koss: ", support);
922             sum += size;
923         }
924         System.out.println("Total number of clusters " + SIZE);
925         System.out.println("Total cluster size " + sum);
926     }
927
928     Map<Long,ClusterImpl> prefetch = new HashMap<Long,ClusterImpl>();
929
930     public synchronized ClusterImpl load2(long clusterId, int clusterKey) throws DatabaseException {
931         ClusterImpl curr = (ClusterImpl) clusterArray[clusterKey];
932         if (curr.isLoaded())
933             return curr;
934
935 //        getPrefetched();
936 //
937 //        curr = (ClusterImpl) clusterArray[clusterKey];
938 //        if (curr.isLoaded())
939 //            return curr;
940
941         final Semaphore s = new Semaphore(0);
942         final DatabaseException[] ex = new DatabaseException[1];
943
944         load2(clusterId, clusterKey, new Callback<DatabaseException>() {
945             @Override
946             public void run(DatabaseException e) {
947                 ex[0] = e;
948                 s.release();
949             }
950         });
951         try {
952             s.acquire();
953         } catch (InterruptedException e) {
954             Logger.defaultLogError(e);
955         }
956         if (null != ex[0])
957             throw ex[0];
958         if (Development.DEVELOPMENT) {
959             if (Development.<Boolean>getProperty(DevelopmentKeys.CLUSTERTABLE_VALIDATE_ON_LOAD, Bindings.BOOLEAN)) {
960                 try {
961                     ClusterImpl loaded = (ClusterImpl) clusterArray[clusterKey];
962                     if (loaded instanceof ClusterSmall)
963                         ((ClusterSmall) loaded).check();
964                     else if (loaded instanceof ClusterBig)
965                         ((ClusterBig) loaded).check();
966                 } catch (Throwable t) {
967                     t.printStackTrace();
968                 }
969             }
970         }
971         
972         validate(clusterKey);
973         
974         return (ClusterImpl) clusterArray[clusterKey];
975     }
976     
977     void validate(int clusterKey) {
978         
979 //      if(sessionImpl.graphSession.graphClient instanceof GraphClientImpl2) {
980 //          
981 //        ClusterImpl server = (ClusterImpl)clusterArray[clusterKey]; 
982 //        String sd = server.dump(sessionImpl.clusterTranslator);
983 //        GraphClientImpl2 gci = ((GraphClientImpl2)sessionImpl.graphSession.graphClient);
984 //        ClusterImpl memory = gci.clusters.get(server.clusterUID);
985 //        if(memory == null)
986 //            System.err.println("ad");
987 //        String md = memory.dump(gci.support);
988 //        
989 //        if(!sd.equals(md)) {
990 //            
991 //            int diffPos = 0;
992 //            int minLength = Math.min(sd.length(), md.length());
993 //            for (int i = 0; i < minLength; i++) {
994 //                if (sd.charAt(i) != md.charAt(i)) {
995 //                    diffPos = i;
996 //                    break;
997 //                }
998 //            }
999 //            
1000 //            int start = Math.max(diffPos-200, 0);
1001 //            int end = Math.min(minLength-1, diffPos+200);
1002 //
1003 //            System.err.println("== CLUSTER DIFFERENCE " + clusterArray[clusterKey].getClusterUID() +  " == ");
1004 //            System.err.println("== SESSION == ");
1005 //            System.err.println(sd.substring(start, end));
1006 //            System.err.println("== MEM == ");
1007 //            System.err.println(md.substring(start, end));
1008 //            System.err.println("== CLUSTER DIFFERENCE ENDS == ");
1009 //            
1010 //            throw new IllegalStateException();
1011 //        }
1012 //        
1013 //      }
1014     }
1015
1016     public synchronized ClusterImpl getPrefetched() {
1017         synchronized(prefetch) {
1018             return null;
1019         }
1020     }
1021
1022     public synchronized void load2(long clusterId, int clusterKey, final Callback<DatabaseException> runnable) {
1023
1024         assert (Constants.ReservedClusterId != clusterId);
1025
1026         ClusterImpl cluster = null;
1027         DatabaseException e = null;
1028
1029         try {
1030
1031             ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1032 //            cluster = (ClusterImpl) sessionImpl.getGraphSession().getClusterImpl(clusterUID, clusterKey);
1033             Database.Session session = sessionImpl.graphSession.dbSession;
1034 //            cluster = (ClusterImpl)gci.getClusterByClusterKey(clusterKey).clone(sessionImpl.clusterTranslator);
1035 //            cluster = (ClusterImpl)((ClusterImpl)gci.getClusterByClusterUIDOrMakeProxy(clusterUID)).clone(sessionImpl.clusterTranslator);
1036             
1037 //            cluster = gci.clone(clusterUID, sessionImpl.clusterTranslator);
1038             
1039             cluster = session.clone(clusterUID, new ClusterCreator() {
1040                 
1041                 @Override
1042                 public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1043                     ClusterSupport support = sessionImpl.clusterTranslator;
1044                     try {
1045                         return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1046                     } catch (DatabaseException e) {
1047                         e.printStackTrace();
1048                         return null;
1049                     }
1050                 }
1051                 
1052             });
1053             
1054         } catch (Throwable t) {
1055             // It's totally legal to call for non-existing cluster.
1056             // Sometimes this indicates an error, though.
1057             Logger.getDefault().logInfo("Load cluster failed.", t);
1058             if (t instanceof DatabaseException)
1059                 e = (DatabaseException) t;
1060             else
1061                 e = new DatabaseException("Load cluster failed.", t);
1062         }
1063         if (null == cluster) {
1064             runnable.run(e);
1065             return;
1066         }
1067
1068 //      new Exception("Load cluster " + cluster.getClusterId() + " " + cluster.getCachedSize() + " " + cluster.getImmutable()).printStackTrace();
1069
1070         // Can not be called with null argument.
1071         replaceCluster(cluster);
1072         sessionImpl.onClusterLoaded(clusterId);
1073         runnable.run(null);
1074
1075     }
1076
1077     public synchronized ClusterImpl tryLoad(long clusterId, int clusterKey) throws DatabaseException {
1078
1079         assert (Constants.ReservedClusterId != clusterId);
1080
1081         ClusterUID clusterUID = clusters.makeClusterUID(clusterId);
1082         
1083         Database.Session session = sessionImpl.graphSession.dbSession;
1084         
1085         ClusterImpl cluster = session.clone(clusterUID, new ClusterCreator() {
1086             
1087             @Override
1088             public <T> T create(ClusterUID uid, byte[] bytes, int[] ints, long[] longs) {
1089                 ClusterSupport support = sessionImpl.clusterTranslator;
1090                 try {
1091                     return (T)ClusterImpl.make(longs, ints, bytes, support, support.getClusterKeyByClusterUIDOrMake(uid));
1092                 } catch (DatabaseException e) {
1093                     e.printStackTrace();
1094                     return null;
1095                 }
1096             }
1097             
1098         });
1099         if (null == clusterArray[clusterKey])
1100             clusterArray[clusterKey] = cluster;
1101         else
1102             replaceCluster(cluster);
1103         sessionImpl.onClusterLoaded(clusterId);
1104         
1105         validate(clusterKey);
1106         
1107         return cluster;
1108     }
1109
1110     public Collection<ClusterI> getClusters() {
1111         ArrayList<ClusterI> result = new ArrayList<ClusterI>();
1112         for (int i = 0; i < clusterArray.length; i++) {
1113             ClusterI cluster = clusterArray[i];
1114             if (cluster != null)
1115                 result.add(cluster);
1116         }
1117         return result;
1118     }
1119
1120     public int size() {
1121         return clusters.size();
1122     }
1123
1124     public long timeCounter() {
1125         return timeCounter++;
1126     }
1127
1128     public boolean hasVirtual(int index) {
1129         return virtuals[index];
1130     }
1131
1132     public void markVirtual(int index) {
1133         virtuals[index] = true;
1134     }
1135
1136     public ClusterImpl[] getClusterArray() {
1137         return clusterArray;
1138     }
1139
1140     public boolean isImmutable(int id) {
1141
1142         int clusterKey = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(id);
1143         Boolean exist = immutables[clusterKey];
1144         if(exist != null) return exist;
1145
1146         ClusterI cluster = getClusterByResourceKey(id);
1147         if(cluster == null) {
1148                 return false;
1149         } else {
1150                 boolean result = cluster.getImmutable();
1151                 markImmutable(cluster, result);
1152                 return result;
1153         }
1154         
1155     }
1156
1157     public void markImmutable(ClusterI cluster, boolean value) {
1158         immutables[cluster.getClusterKey()] = value;
1159     }
1160     @Override
1161     public int getClusterKeyByUID(long first, long second) throws DatabaseException {
1162         return getClusterKeyByClusterUIDOrMakeProxy(ClusterUID.make(first, second));
1163     }
1164
1165 }