]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.db.impl.query;\r
13 \r
14 import java.io.BufferedOutputStream;\r
15 import java.io.File;\r
16 import java.io.FileOutputStream;\r
17 import java.io.IOException;\r
18 import java.io.PrintStream;\r
19 import java.util.ArrayList;\r
20 import java.util.Arrays;\r
21 import java.util.Collection;\r
22 import java.util.Collections;\r
23 import java.util.HashMap;\r
24 import java.util.HashSet;\r
25 import java.util.IdentityHashMap;\r
26 import java.util.Iterator;\r
27 import java.util.LinkedList;\r
28 import java.util.List;\r
29 import java.util.Map;\r
30 import java.util.Set;\r
31 import java.util.concurrent.Semaphore;\r
32 import java.util.concurrent.atomic.AtomicBoolean;\r
33 import java.util.concurrent.atomic.AtomicInteger;\r
34 import java.util.concurrent.locks.Condition;\r
35 import java.util.concurrent.locks.ReentrantLock;\r
36 \r
37 import org.simantics.databoard.Bindings;\r
38 import org.simantics.db.AsyncReadGraph;\r
39 import org.simantics.db.DevelopmentKeys;\r
40 import org.simantics.db.DirectStatements;\r
41 import org.simantics.db.ReadGraph;\r
42 import org.simantics.db.RelationInfo;\r
43 import org.simantics.db.Resource;\r
44 import org.simantics.db.Session;\r
45 import org.simantics.db.Statement;\r
46 import org.simantics.db.VirtualGraph;\r
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
48 import org.simantics.db.common.utils.Logger;\r
49 import org.simantics.db.debug.ListenerReport;\r
50 import org.simantics.db.exception.DatabaseException;\r
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;\r
52 import org.simantics.db.exception.NoInverseException;\r
53 import org.simantics.db.exception.ResourceNotFoundException;\r
54 import org.simantics.db.impl.DebugPolicy;\r
55 import org.simantics.db.impl.ResourceImpl;\r
56 import org.simantics.db.impl.graph.MultiIntProcedure;\r
57 import org.simantics.db.impl.graph.ReadGraphImpl;\r
58 import org.simantics.db.impl.graph.ReadGraphSupport;\r
59 import org.simantics.db.impl.graph.WriteGraphImpl;\r
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;\r
61 import org.simantics.db.impl.procedure.InternalProcedure;\r
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;\r
63 import org.simantics.db.impl.support.ResourceSupport;\r
64 import org.simantics.db.procedure.AsyncMultiListener;\r
65 import org.simantics.db.procedure.AsyncMultiProcedure;\r
66 import org.simantics.db.procedure.AsyncProcedure;\r
67 import org.simantics.db.procedure.AsyncSetListener;\r
68 import org.simantics.db.procedure.Listener;\r
69 import org.simantics.db.procedure.ListenerBase;\r
70 import org.simantics.db.procedure.MultiProcedure;\r
71 import org.simantics.db.procedure.Procedure;\r
72 import org.simantics.db.procedure.StatementProcedure;\r
73 import org.simantics.db.request.AsyncMultiRead;\r
74 import org.simantics.db.request.AsyncRead;\r
75 import org.simantics.db.request.ExternalRead;\r
76 import org.simantics.db.request.MultiRead;\r
77 import org.simantics.db.request.Read;\r
78 import org.simantics.db.request.RequestFlags;\r
79 import org.simantics.db.request.WriteTraits;\r
80 import org.simantics.layer0.Layer0;\r
81 import org.simantics.utils.DataContainer;\r
82 import org.simantics.utils.Development;\r
83 import org.simantics.utils.datastructures.Pair;\r
84 import org.simantics.utils.datastructures.collections.CollectionUtils;\r
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;\r
86 \r
87 import gnu.trove.map.hash.THashMap;\r
88 import gnu.trove.procedure.TIntProcedure;\r
89 import gnu.trove.procedure.TLongProcedure;\r
90 import gnu.trove.procedure.TObjectProcedure;\r
91 import gnu.trove.set.hash.THashSet;\r
92 import gnu.trove.set.hash.TIntHashSet;\r
93 \r
94 @SuppressWarnings({"rawtypes", "unchecked"})\r
95 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {\r
96 \r
97         final public UnaryQueryHashMap<IntProcedure>                      directPredicatesMap;\r
98         final public UnaryQueryHashMap<IntProcedure>                      principalTypesMap;\r
99         final public THashMap<String, URIToResource>                      uriToResourceMap;\r
100         final public THashMap<String, NamespaceIndex>                     namespaceIndexMap22;\r
101         final public UnaryQueryHashMap<IntProcedure>                      projectsMap;\r
102         final public UnaryQueryHashMap<InternalProcedure<RelationInfo>>   relationInfoMap;\r
103         final public UnaryQueryHashMap<InternalProcedure<IntSet>>         superTypesMap;\r
104         final public UnaryQueryHashMap<InternalProcedure<IntSet>>         typeHierarchyMap;\r
105         final public UnaryQueryHashMap<InternalProcedure<IntSet>>         superRelationsMap;\r
106         final public UnaryQueryHashMap<InternalProcedure<IntSet>>         typesMap;\r
107         final public UnaryQueryHashMap<InternalProcedure<byte[]>>         valueMap;\r
108         final public DoubleKeyQueryHashMap<IntProcedure>                     directObjectsMap;\r
109         final public DoubleKeyQueryHashMap<IntProcedure>                     objectsMap;\r
110         final public UnaryQueryHashMap<IntProcedure>                      orderedSetMap;\r
111         final public UnaryQueryHashMap<IntProcedure>                      predicatesMap;\r
112         final public DoubleKeyQueryHashMap<TripleIntProcedure>               statementsMap;\r
113         final public UnaryQueryHashMap<IntProcedure>                      assertedPredicatesMap;\r
114         final public BinaryQueryHashMap<TripleIntProcedure>               assertedStatementsMap;\r
115         final public StableHashMap<ExternalRead, ExternalReadEntry>            externalReadMap; \r
116         final public StableHashMap<AsyncRead, AsyncReadEntry>                  asyncReadMap; \r
117         final public StableHashMap<Read, ReadEntry>                            readMap;\r
118         final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>        asyncMultiReadMap; \r
119         final public StableHashMap<MultiRead, MultiReadEntry>                  multiReadMap; \r
120 \r
121         final private THashMap<CacheEntry, ArrayList<ListenerEntry>>       listeners;\r
122 \r
123         public static int                                       indent                = 0;\r
124 \r
125         public int                                              size                  = 0;\r
126         \r
127         \r
128         // Garbage collection\r
129         \r
130         public int                                              boundQueries          = 0;\r
131 \r
132         // Statistics\r
133         private int                                             hits                  = 0;\r
134 \r
135         private int                                             misses                = 0;\r
136 \r
137         private int                                             updates               = 0;\r
138 \r
139         final private int                                       functionalRelation;\r
140 \r
141         final private int                                       superrelationOf;\r
142 \r
143         final private int                                       instanceOf;\r
144 \r
145         final private int                                       inverseOf;\r
146 \r
147         final private int                                       asserts;\r
148 \r
149         final private int                                       hasPredicate;\r
150 \r
151         final private int                                       hasPredicateInverse;\r
152 \r
153         final private int                                       hasObject;\r
154 \r
155         final private int                                       inherits;\r
156 \r
157         final private int                                       subrelationOf;\r
158 \r
159         final private int                                       rootLibrary;\r
160 \r
161         /**\r
162          * A cache for the root library resource. Initialized in\r
163          * {@link #getRootLibraryResource()}.\r
164          */\r
165         private volatile ResourceImpl                           rootLibraryResource;\r
166 \r
167         final private int                                       library;\r
168 \r
169         final private int                                       consistsOf;\r
170 \r
171         final private int                                       hasName;\r
172 \r
173         AtomicInteger                                       sleepers = new AtomicInteger(0);\r
174 \r
175         private boolean                                         updating              = false;\r
176 \r
177         static public boolean                                         collecting            = false;\r
178 \r
179         private boolean                                         firingListeners       = false;\r
180 \r
181         final public QuerySupport                               querySupport;\r
182         final public Session                                    session;\r
183         final public ResourceSupport                            resourceSupport;\r
184 \r
185         private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();\r
186 \r
187         QueryThread[]                                   executors;\r
188 \r
189         public ArrayList<SessionTask>[]                           queues;\r
190 \r
191         enum ThreadState {\r
192 \r
193                 INIT, RUN, SLEEP, DISPOSED\r
194 \r
195         }\r
196 \r
197         public ThreadState[]                                                                    threadStates;\r
198         public ReentrantLock[]                                                                  threadLocks;\r
199         public Condition[]                                                                          threadConditions;\r
200 \r
201         public ArrayList<SessionTask>[]                           ownTasks;\r
202 \r
203         public ArrayList<SessionTask>[]                           ownSyncTasks;\r
204 \r
205         ArrayList<SessionTask>[]                           delayQueues;\r
206         \r
207         public boolean synch = true;\r
208 \r
209         final Object querySupportLock;\r
210         \r
211         public Long modificationCounter = 0L;\r
212 \r
213         public void close() {\r
214         }\r
215 \r
216         final public void scheduleOwn(int caller, SessionTask request) {\r
217                 ownTasks[caller].add(request);\r
218         }\r
219 \r
220         final public void scheduleAlways(int caller, SessionTask request) {\r
221 \r
222                 int performer = request.thread;\r
223                 if(caller == performer) {\r
224                         ownTasks[caller].add(request);\r
225                 } else {\r
226                         schedule(caller, request);\r
227                 }\r
228 \r
229         }\r
230 \r
231         final public void schedule(int caller, SessionTask request) {\r
232 \r
233                 int performer = request.thread;\r
234 \r
235                 if(DebugPolicy.SCHEDULE)\r
236                         System.out.println("schedule " + request + " " + caller + " -> " + performer);\r
237 \r
238                 assert(performer >= 0);\r
239 \r
240                 assert(request != null);\r
241 \r
242                 if(caller == performer) {\r
243                         request.run(caller);\r
244                 } else {\r
245                         ReentrantLock queueLock = threadLocks[performer];\r
246                         queueLock.lock();\r
247                         queues[performer].add(request);\r
248                         // This thread could have been sleeping\r
249                         if(queues[performer].size() == 1) {\r
250                                 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();\r
251                                 threadConditions[performer].signalAll();\r
252                         }\r
253                         queueLock.unlock();\r
254                 }\r
255 \r
256         }\r
257 \r
258 \r
259         final int THREADS;\r
260 \r
261         final public int  THREAD_MASK;\r
262         final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); \r
263 \r
264         public static abstract class SessionTask {\r
265 \r
266                 final public int thread;\r
267                 final public int syncCaller;\r
268                 final public Object object;\r
269 \r
270                 public SessionTask(WriteTraits object, int thread) {\r
271                         this.thread = thread;\r
272                         this.syncCaller = -1;\r
273                         this.object = object;\r
274                 }\r
275 \r
276                 public SessionTask(Object object, int thread, int syncCaller) {\r
277                         this.thread = thread;\r
278                         this.syncCaller = syncCaller;\r
279                         this.object = object;\r
280                 }\r
281 \r
282                 public abstract void run(int thread);\r
283 \r
284                 @Override\r
285                 public String toString() {\r
286                         return "SessionTask[" + object + "]";\r
287                 }\r
288 \r
289         }\r
290 \r
291         public static abstract class SessionRead extends SessionTask {\r
292 \r
293                 final public Semaphore notify;\r
294                 final public DataContainer<Throwable> throwable; \r
295 \r
296                 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {\r
297                         super(object, thread, thread);\r
298                         this.throwable = throwable;\r
299                         this.notify = notify;\r
300                 }\r
301 \r
302                 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {\r
303                         super(object, thread, syncThread);\r
304                         this.throwable = throwable;\r
305                         this.notify = notify;\r
306                 }\r
307 \r
308         }\r
309 \r
310         long waitingTime = 0;\r
311 \r
312         static int koss = 0;\r
313         static int koss2 = 0;\r
314 \r
315         public boolean resume(ReadGraphImpl graph) {\r
316                 return executors[0].runSynchronized();\r
317         }\r
318 \r
319         public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)\r
320                         throws DatabaseException {\r
321 \r
322                 THREADS = threads;\r
323                 THREAD_MASK = threads - 1;\r
324 \r
325                 querySupport = core;\r
326                 session = querySupport.getSession();\r
327                 resourceSupport = querySupport.getSupport();\r
328                 querySupportLock = core.getLock();\r
329 \r
330                 executors = new QueryThread[THREADS];\r
331                 queues = new ArrayList[THREADS];\r
332                 threadLocks = new ReentrantLock[THREADS];\r
333                 threadConditions = new Condition[THREADS];\r
334                 threadStates = new ThreadState[THREADS];\r
335                 ownTasks = new ArrayList[THREADS];\r
336                 ownSyncTasks = new ArrayList[THREADS];\r
337                 delayQueues = new ArrayList[THREADS * THREADS];\r
338 \r
339                 //        freeSchedule = new AtomicInteger(0);\r
340 \r
341                 for (int i = 0; i < THREADS * THREADS; i++) {\r
342                         delayQueues[i] = new ArrayList<SessionTask>();\r
343                 }\r
344 \r
345                 for (int i = 0; i < THREADS; i++) {\r
346 \r
347                         //            tasks[i] = new ArrayList<Runnable>();\r
348                         ownTasks[i] = new ArrayList<SessionTask>();\r
349                         ownSyncTasks[i] = new ArrayList<SessionTask>();\r
350                         queues[i] = new ArrayList<SessionTask>();\r
351                         threadLocks[i] = new ReentrantLock();\r
352                         threadConditions[i] = threadLocks[i].newCondition();\r
353                         //            limits[i] = false;\r
354                         threadStates[i] = ThreadState.INIT;\r
355 \r
356                 }\r
357 \r
358                 for (int i = 0; i < THREADS; i++) {\r
359 \r
360                         final int index = i;\r
361 \r
362                         executors[i] = new QueryThread(session, this, index, "Query Thread " + index);\r
363 \r
364                         threadSet.add(executors[i]);\r
365 \r
366                 }\r
367 \r
368                 directPredicatesMap = new UnaryQueryHashMap();\r
369                 valueMap = new UnaryQueryHashMap();\r
370                 principalTypesMap = new UnaryQueryHashMap();\r
371                 uriToResourceMap = new THashMap<String, URIToResource>();\r
372                 namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();\r
373                 projectsMap = new UnaryQueryHashMap();\r
374                 relationInfoMap = new UnaryQueryHashMap();\r
375                 typeHierarchyMap = new UnaryQueryHashMap();\r
376                 superTypesMap = new UnaryQueryHashMap();\r
377                 superRelationsMap = new UnaryQueryHashMap();\r
378                 typesMap = new UnaryQueryHashMap();\r
379                 objectsMap = new DoubleKeyQueryHashMap();\r
380                 orderedSetMap = new UnaryQueryHashMap();\r
381                 predicatesMap = new UnaryQueryHashMap();\r
382                 statementsMap = new DoubleKeyQueryHashMap();\r
383                 directObjectsMap = new DoubleKeyQueryHashMap();\r
384                 assertedPredicatesMap = new UnaryQueryHashMap();\r
385                 assertedStatementsMap = new BinaryQueryHashMap();\r
386                 asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>(); \r
387                 readMap = new StableHashMap<Read, ReadEntry>();\r
388                 asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>(); \r
389                 multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>(); \r
390                 externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>(); \r
391                 listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);\r
392 \r
393                 // Now start threads\r
394                 for (int i = 0; i < THREADS; i++) {\r
395                         executors[i].start();\r
396                 }\r
397 \r
398                 // Make sure that query threads are up and running\r
399                 while(sleepers.get() != THREADS) {\r
400                         try {\r
401                                 Thread.sleep(5);\r
402                         } catch (InterruptedException e) {\r
403                                 e.printStackTrace();\r
404                         }\r
405                 }\r
406 \r
407                 rootLibrary = core.getBuiltin("http:/");\r
408                 boolean builtinsInstalled = rootLibrary != 0;\r
409 \r
410                 if (builtinsInstalled) {\r
411                         functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);\r
412                         assert (functionalRelation != 0);\r
413                 } else\r
414                         functionalRelation = 0;\r
415 \r
416                 if (builtinsInstalled) {\r
417                         instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);\r
418                         assert (instanceOf != 0);\r
419                 } else\r
420                         instanceOf = 0;\r
421 \r
422                 if (builtinsInstalled) {\r
423                         inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);\r
424                         assert (inverseOf != 0);\r
425                 } else\r
426                         inverseOf = 0;\r
427 \r
428 \r
429                 if (builtinsInstalled) {\r
430                         inherits = core.getBuiltin(Layer0.URIs.Inherits);\r
431                         assert (inherits != 0);\r
432                 } else\r
433                         inherits = 0;\r
434 \r
435                 if (builtinsInstalled) {\r
436                         asserts = core.getBuiltin(Layer0.URIs.Asserts);\r
437                         assert (asserts != 0);\r
438                 } else\r
439                         asserts = 0;\r
440 \r
441                 if (builtinsInstalled) {\r
442                         hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);\r
443                         assert (hasPredicate != 0);\r
444                 } else\r
445                         hasPredicate = 0;\r
446 \r
447                 if (builtinsInstalled) {\r
448                         hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);\r
449                         assert (hasPredicateInverse != 0);\r
450                 } else\r
451                         hasPredicateInverse = 0;\r
452 \r
453                 if (builtinsInstalled) {\r
454                         hasObject = core.getBuiltin(Layer0.URIs.HasObject);\r
455                         assert (hasObject != 0);\r
456                 } else\r
457                         hasObject = 0;\r
458 \r
459                 if (builtinsInstalled) {\r
460                         subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);\r
461                         assert (subrelationOf != 0);\r
462                 } else\r
463                         subrelationOf = 0;\r
464 \r
465                 if (builtinsInstalled) {\r
466                         superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);\r
467                         assert (superrelationOf != 0);\r
468                 } else\r
469                         superrelationOf = 0;\r
470 \r
471                 if (builtinsInstalled) {\r
472                         library = core.getBuiltin(Layer0.URIs.Library);\r
473                         assert (library != 0);\r
474                 } else\r
475                         library = 0;\r
476 \r
477                 if (builtinsInstalled) {\r
478                         consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);\r
479                         assert (consistsOf != 0);\r
480                 } else\r
481                         consistsOf = 0;\r
482 \r
483                 if (builtinsInstalled) {\r
484                         hasName = core.getBuiltin(Layer0.URIs.HasName);\r
485                         assert (hasName != 0);\r
486                 } else\r
487                         hasName = 0;\r
488 \r
489         }\r
490 \r
491         final public void releaseWrite(ReadGraphImpl graph) {\r
492                 performDirtyUpdates(graph);\r
493                 modificationCounter++;\r
494         }\r
495 \r
496         final public int getId(final Resource r) {\r
497                 return querySupport.getId(r);\r
498         }\r
499 \r
500         public QuerySupport getCore() {\r
501                 return querySupport;\r
502         }\r
503 \r
504         public int getFunctionalRelation() {\r
505                 return functionalRelation;\r
506         }\r
507 \r
508         public int getInherits() {\r
509                 return inherits;\r
510         }\r
511 \r
512         public int getInstanceOf() {\r
513                 return instanceOf;\r
514         }\r
515 \r
516         public int getInverseOf() {\r
517                 return inverseOf;\r
518         }\r
519 \r
520         public int getSubrelationOf() {\r
521                 return subrelationOf;\r
522         }\r
523 \r
524         public int getSuperrelationOf() {\r
525                 return superrelationOf;\r
526         }\r
527 \r
528         public int getAsserts() {\r
529                 return asserts;\r
530         }\r
531 \r
532         public int getHasPredicate() {\r
533                 return hasPredicate;\r
534         }\r
535 \r
536         public int getHasPredicateInverse() {\r
537                 return hasPredicateInverse;\r
538         }\r
539 \r
540         public int getHasObject() {\r
541                 return hasObject;\r
542         }\r
543 \r
544         public int getRootLibrary() {\r
545                 return rootLibrary;\r
546         }\r
547 \r
548         public Resource getRootLibraryResource() {\r
549                 if (rootLibraryResource == null) {\r
550                         // Synchronization is not needed here, it doesn't matter if multiple\r
551                         // threads simultaneously set rootLibraryResource once.\r
552                         int root = getRootLibrary();\r
553                         if (root == 0)\r
554                                 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");\r
555                         this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);\r
556                 }\r
557                 return rootLibraryResource;\r
558         }\r
559 \r
560         public int getLibrary() {\r
561                 return library;\r
562         }\r
563 \r
564         public int getConsistsOf() {\r
565                 return consistsOf;\r
566         }\r
567 \r
568         public int getHasName() {\r
569                 return hasName;\r
570         }\r
571 \r
572         public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
573 \r
574                 URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {\r
575 \r
576                         @Override\r
577                         public void execute(ReadGraphImpl graph, Integer result) {\r
578 \r
579                                 if (result != null && result != 0) {\r
580                                         procedure.execute(graph, result);\r
581                                         return;\r
582                                 }\r
583 \r
584                                 // Fall back to using the fixed builtins.\r
585                                 result = querySupport.getBuiltin(id);\r
586                                 if (result != 0) {\r
587                                         procedure.execute(graph, result);\r
588                                         return;\r
589                                 } \r
590 \r
591                                 try {\r
592                                         result = querySupport.getRandomAccessReference(id);\r
593                                 } catch (ResourceNotFoundException e) {\r
594                                         procedure.exception(graph, e);\r
595                                         return;\r
596                                 }\r
597 \r
598                                 if (result != 0) {\r
599                                         procedure.execute(graph, result);\r
600                                 } else {\r
601                                         procedure.exception(graph, new ResourceNotFoundException(id));\r
602                                 }\r
603 \r
604                         }\r
605 \r
606                         @Override\r
607                         public void exception(ReadGraphImpl graph, Throwable t) {\r
608                                 procedure.exception(graph, t);\r
609                         }\r
610 \r
611                 });\r
612 \r
613         }\r
614 \r
615         public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
616 \r
617                 Integer result = querySupport.getBuiltin(id);\r
618                 if (result != 0) {\r
619                         procedure.execute(graph, result);\r
620                 } else {\r
621                         procedure.exception(graph, new ResourceNotFoundException(id));\r
622                 }\r
623 \r
624         }\r
625 \r
626         public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {\r
627 \r
628                 int hash = requestHash(query);\r
629 \r
630                 AsyncReadEntry<T> entry = asyncReadMap.get(query, hash); \r
631 \r
632                 if(parent == null && listener == null) {\r
633                         if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
634                                 System.out.println("ready " + query);\r
635                                 entry.performFromCache(graph, this, procedure);\r
636 //                              graph.state.barrier.dec(query);\r
637                                 return;\r
638                         } else {\r
639                                 query.perform(graph, procedure);\r
640 //                              graph.state.barrier.dec(query);\r
641                                 return;\r
642                         }\r
643                 }\r
644 \r
645                 if(entry == null) {\r
646 \r
647                         entry = new AsyncReadEntry<T>(query);\r
648                         entry.setPending();\r
649                         entry.clearResult(querySupport);\r
650                         asyncReadMap.put(query, entry, hash);\r
651 \r
652                         performForEach(graph, query, entry, parent, listener, procedure, false);\r
653 \r
654                 } else {\r
655 \r
656                         if(entry.isPending()) {\r
657                                 synchronized(entry) {\r
658                                         if(entry.isPending()) {\r
659                                             throw new IllegalStateException();\r
660                                                 //                      final AsyncBarrierImpl parentBarrier = graph.state.barrier;\r
661                                                 //                      if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();\r
662                                                 //                        entry.procs.add(new AsyncProcedure<T>() {\r
663                                                 //\r
664                                                 //                                                      @Override\r
665                                                 //                                                      public void execute(AsyncReadGraph graph, T result) {\r
666                                                 //                                                              procedure.execute(graph, result);\r
667                                                 //                                                              parentBarrier.dec(query);\r
668                                                 //                                                      }\r
669                                                 //\r
670                                                 //                                                      @Override\r
671                                                 //                                                      public void exception(AsyncReadGraph graph, Throwable throwable) {\r
672                                                 //                                                              procedure.exception(graph, throwable);\r
673                                                 //                                                              parentBarrier.dec(query);\r
674                                                 //                                                      }\r
675                                                 //                              \r
676                                                 //                        });\r
677 //                                              if(graph.parent != null || listener != null) {\r
678 //                                                      registerDependencies(graph, entry, parent, listener, procedure, false);\r
679 //                                              }\r
680 //\r
681 //                                              query.perform(graph, procedure);\r
682 //\r
683 //                                              return;\r
684 \r
685                                         }\r
686                                 }\r
687                         }\r
688 \r
689                         if(entry.isReady()) { \r
690                                 entry.performFromCache(graph, this, procedure);\r
691                                 registerDependencies(graph, entry, parent, listener, procedure, false);\r
692                         } else {\r
693                                 performForEach(graph, query, entry, parent, listener, procedure, false);\r
694                         }\r
695 \r
696                 }\r
697 \r
698         }\r
699 \r
700 \r
701         final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
702 \r
703                 MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query); \r
704                 if(entry == null) {\r
705 \r
706                         entry = new MultiReadEntry(query);\r
707                         entry.setPending();\r
708                         entry.clearResult(provider.querySupport);\r
709 \r
710                         provider.multiReadMap.put(query, entry);\r
711 \r
712                         provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
713 \r
714                 } else {\r
715 \r
716                         if(entry.isPending()) {\r
717 \r
718                                 synchronized(entry) {\r
719 \r
720                                         if(entry.isPending()) {\r
721                                             throw new IllegalStateException();\r
722 \r
723 //                                              if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();\r
724 //                                              entry.procs.add(new Pair(procedure, parentBarrier));\r
725 //                                              if(graph.parent != null || listener != null) {\r
726 //                                                      provider.registerDependencies(graph, entry, parent, listener, procedure, false);\r
727 //                                              }\r
728 \r
729                                                 // If this was synchronized we must wait here until completion\r
730                                                 //                        if(graph.state.synchronizedExecution) {\r
731                                                 //                            while(entry.isPending()) {\r
732                                                 //                              graph.resumeTasks(graph.callerThread, null, null);\r
733                                                 //                            }\r
734                                                 //                        }\r
735 //\r
736 //                                              return;\r
737 \r
738                                         }\r
739                                 }\r
740 \r
741                                 entry.performFromCache(graph, provider, procedure);\r
742 //                              graph.state.barrier.dec(query);\r
743                                 return;\r
744 \r
745                         } else {\r
746 \r
747                                 provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
748 \r
749                         }\r
750 \r
751                 }\r
752 \r
753         }\r
754 \r
755         public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
756 \r
757                 int hash = requestHash(query);\r
758 \r
759                 AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);\r
760 \r
761                 if(parent == null && listener == null) {\r
762                         if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
763                                 System.out.println("ready " + query);\r
764                                 entry.performFromCache(graph, this, procedure);\r
765                                 return;\r
766                         } else {\r
767                                 query.perform(graph, procedure);\r
768                                 return;\r
769                         }\r
770                 }\r
771 \r
772                 if(entry == null) {\r
773 \r
774                         entry = new AsyncMultiReadEntry<T>(query); \r
775                         entry.setPending();\r
776                         entry.clearResult(querySupport);\r
777 \r
778                         asyncMultiReadMap.put(query, entry, hash);\r
779 \r
780                         performForEach(graph, query, entry, parent, listener, procedure, false);\r
781 \r
782                 } else {\r
783 \r
784                         if(entry.isPending()) {\r
785                             \r
786                                 synchronized(entry) {\r
787                                         if(entry.isPending()) {\r
788                                             throw new IllegalStateException();\r
789 //                                              if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();\r
790 //                                              entry.procs.add(procedure);\r
791 //                                              if(graph.parent != null || listener != null) {\r
792 //                                                      registerDependencies(graph, entry, parent, listener, procedure, false);\r
793 //                                              }\r
794 //                                              return;\r
795                                         }\r
796                                 }\r
797                         }\r
798 \r
799                         performForEach(graph, query, entry, parent, listener, procedure, false);\r
800 \r
801                 }\r
802 \r
803         }\r
804 \r
805         final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {\r
806 \r
807                 final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query); \r
808                 if(entry == null) {\r
809                         provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);\r
810                 } else {\r
811                         if(entry.isPending()) {\r
812                                 synchronized(entry) {\r
813                                         if(entry.isPending()) {\r
814                                             throw new IllegalStateException();\r
815 //                                              if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();\r
816 //                                              entry.procs.add(procedure);\r
817 //                                              return;\r
818                                         }\r
819                                 }\r
820                         }\r
821                         provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
822                 }\r
823 \r
824         }\r
825 \r
826         public int requestHash(Object object) {\r
827                 try {\r
828                         return object.hashCode();\r
829                 } catch (Throwable t) {\r
830                         Logger.defaultLogError(t);\r
831                         return 0;\r
832                 }\r
833         }\r
834         \r
835     @Override\r
836         public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {\r
837 \r
838                 assert(query != null);\r
839 \r
840                 ReadEntry entry = readMap.get(query);\r
841 \r
842                 if(entry != null) {\r
843                         if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {\r
844                                 return (T)entry.get(graph, this, procedure);\r
845                         } else if (entry.isPending()) {\r
846                             throw new IllegalStateException();\r
847                         }\r
848                 }\r
849 \r
850                 if(entry == null) {\r
851 \r
852                         entry = new ReadEntry(query);\r
853                         entry.setPending();\r
854                         entry.clearResult(querySupport);\r
855 \r
856                         readMap.put(query, entry);\r
857 \r
858                         return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
859 \r
860                 } else {\r
861 \r
862                         if(entry.isPending()) {\r
863                 throw new IllegalStateException();\r
864                         } else {\r
865                                 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
866                         }\r
867 \r
868                 }\r
869 \r
870         }\r
871 \r
872         public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
873 \r
874                 assert(query != null);\r
875                 assert(procedure != null);\r
876 \r
877                 final MultiReadEntry entry = multiReadMap.get(query);\r
878 \r
879                 if(parent == null && !(listener != null)) {\r
880                         if(entry != null && entry.isReady()) { \r
881                                 entry.performFromCache(graph, this, procedure);\r
882                                 return;\r
883                         }\r
884                 }\r
885 \r
886                 runMultiRead(graph, entry, query, parent, this, listener, procedure);\r
887 \r
888         }\r
889 \r
890         public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {\r
891 \r
892                 assert(query != null);\r
893                 assert(procedure != null);\r
894 \r
895                 final ExternalReadEntry entry = externalReadMap.get(query);\r
896 \r
897                 if(parent == null && !(listener != null)) {\r
898                         if(entry != null && entry.isReady()) { \r
899                                 entry.performFromCache(procedure);\r
900                                 return;\r
901                         }\r
902                 }\r
903 \r
904                 runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);\r
905 \r
906         }\r
907 \r
908         public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,\r
909                         boolean inferredDependency) {\r
910 \r
911                 if (DebugPolicy.PERFORM)\r
912                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
913 \r
914                 assert (!dirty);\r
915                 assert (!collecting);\r
916 \r
917                 assert(!entry.isDiscarded());\r
918 \r
919                 final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);\r
920 \r
921                 // FRESH, REFUTED, EXCEPTED go here \r
922                 if (!entry.isReady()) {\r
923 \r
924                         entry.setPending();\r
925 \r
926                         size++;\r
927 \r
928                         try {\r
929 \r
930                                 final ReadGraphImpl finalParentGraph = parentGraph;\r
931 \r
932                                 query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {\r
933 \r
934                                         @Override\r
935                                         public void execute(AsyncReadGraph returnGraph, T result) {\r
936                                                 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
937                                                 //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
938                                                 entry.addOrSet(finalParentGraph, result);\r
939                                                 if(listenerEntry != null) {\r
940                                                         primeListenerEntry(listenerEntry, result);\r
941                                                 }\r
942                                                 try {\r
943                                                         procedure.execute(finalParentGraph, result);\r
944                                                 } catch (Throwable t) {\r
945                                                         t.printStackTrace();\r
946                                                 }\r
947 //                                              parentBarrier.dec(query);\r
948                                         }\r
949 \r
950                                         @Override\r
951                                         public void exception(AsyncReadGraph returnGraph, Throwable t) {\r
952                                                 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
953 //                                              AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
954                                                 entry.except(finalParentGraph, t);\r
955                                                 try {\r
956                                                         procedure.exception(finalParentGraph, t);\r
957                                                 } catch (Throwable t2) {\r
958                                                         t2.printStackTrace();\r
959                                                 }\r
960 //                                              parentBarrier.dec(query);\r
961                                         }\r
962 \r
963                                         @Override\r
964                                         public String toString() {\r
965                                                 return procedure.toString();\r
966                                         }\r
967 \r
968                                 });\r
969 \r
970                         } catch (Throwable t) {\r
971 \r
972                                 entry.except(t);\r
973                                 try {\r
974                                         procedure.exception(parentGraph, t);\r
975                                 } catch (Throwable t2) {\r
976                                         t2.printStackTrace();\r
977                                 }\r
978 //                              parentBarrier.dec(query);\r
979 \r
980                         }\r
981 \r
982                         misses++;\r
983 \r
984                 } else {\r
985 \r
986                         entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {\r
987 \r
988                                 @Override\r
989                                 public void exception(AsyncReadGraph graph, Throwable throwable) {\r
990                                         procedure.exception(graph, throwable);\r
991                                 }\r
992 \r
993                                 @Override\r
994                                 public void execute(AsyncReadGraph graph, T result) {\r
995                                         procedure.execute(graph, result);\r
996                                         if(listenerEntry != null) {\r
997                                                 primeListenerEntry(listenerEntry, result);\r
998                                         }\r
999                                 }\r
1000 \r
1001                         });\r
1002 \r
1003 //                      parentBarrier.dec(query);\r
1004 \r
1005                         hits++;\r
1006 \r
1007                 }\r
1008 \r
1009                 assert (!entry.isDiscarded());\r
1010 \r
1011         }\r
1012 \r
1013         public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,\r
1014                         boolean inferredDependency) throws Throwable {\r
1015 \r
1016                 if (DebugPolicy.PERFORM)\r
1017                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1018 \r
1019                 assert (!dirty);\r
1020                 assert (!collecting);\r
1021 \r
1022                 entry.assertNotDiscarded();\r
1023 \r
1024                 if(entry.isReady()) {\r
1025 \r
1026                         // EXCEPTED goes here\r
1027 \r
1028 //                      if(procedure != null) entry.performFromCache(graph, this, procedure);\r
1029 //                      parentBarrier.dec(query);\r
1030                         hits++;\r
1031 \r
1032                         ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
1033                         \r
1034                         T result = (T)entry.get(graph, this, procedure); \r
1035 \r
1036                         if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
1037 \r
1038                         return result;\r
1039 \r
1040                 } else {\r
1041 \r
1042                         // FRESH, REFUTED, PENDING go here\r
1043 \r
1044                         entry.setPending();\r
1045 \r
1046                         size++;\r
1047                         misses++;\r
1048 \r
1049                 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
1050 \r
1051                         final ReadGraphImpl performGraph = graph.newSync(entry);\r
1052 \r
1053                         try {\r
1054 \r
1055                                 if(Development.DEVELOPMENT)\r
1056                                         Development.recordHistogram("run " + query);\r
1057 \r
1058                                 T result = query.perform(performGraph);\r
1059                                 entry.addOrSet(performGraph, result);\r
1060                                 \r
1061                                 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
1062 \r
1063                                 return (T)entry.get(graph, this, procedure);\r
1064 \r
1065                         }  catch (Throwable t) {\r
1066 \r
1067                                 entry.except(t);\r
1068                                 return (T)entry.get(graph, this, procedure);\r
1069 \r
1070                         }\r
1071 \r
1072                 } \r
1073 \r
1074         }\r
1075 \r
1076         public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
1077                         boolean inferredDependency) {\r
1078 \r
1079                 if (DebugPolicy.PERFORM)\r
1080                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1081 \r
1082                 assert (!dirty);\r
1083                 assert (!collecting);\r
1084 \r
1085                 assert(!entry.isPending());\r
1086                 assert(!entry.isDiscarded());\r
1087 \r
1088                 // FRESH, REFUTED, EXCEPTED go here \r
1089                 if (!entry.isReady()) {\r
1090 \r
1091                         entry.setPending();\r
1092                         entry.clearResult(querySupport);\r
1093 \r
1094                         multiReadMap.put(query, entry);\r
1095                         size++;\r
1096 \r
1097                         final ReadGraphImpl newGraph = graph.newSync(entry);\r
1098 //                      newGraph.state.barrier.inc();\r
1099 \r
1100                         try {\r
1101 \r
1102                                 query.perform(newGraph, new AsyncMultiProcedure<T>() {\r
1103 \r
1104                                         @Override\r
1105                                         public void execute(AsyncReadGraph graph, T result) {\r
1106                                                 entry.addOrSet(result);\r
1107                                                 try {\r
1108                                                         procedure.execute(graph, result);\r
1109                                                 } catch (Throwable t) {\r
1110                                                         t.printStackTrace();\r
1111                                                 }\r
1112                                         }\r
1113 \r
1114                                         @Override\r
1115                                         public void finished(AsyncReadGraph graph) {\r
1116                                                 entry.finish(graph);\r
1117                                                 try {\r
1118                                                         procedure.finished(graph);\r
1119                                                 } catch (Throwable t) {\r
1120                                                         t.printStackTrace();\r
1121                                                 }\r
1122 //                                              newGraph.state.barrier.dec();\r
1123 //                                              parentBarrier.dec();\r
1124                                         }\r
1125 \r
1126                                         @Override\r
1127                                         public void exception(AsyncReadGraph graph, Throwable t) {\r
1128                                                 entry.except(t);\r
1129                                                 try {\r
1130                                                         procedure.exception(graph, t);\r
1131                                                 } catch (Throwable t2) {\r
1132                                                         t2.printStackTrace();\r
1133                                                 }\r
1134 //                                              newGraph.state.barrier.dec();\r
1135 //                                              parentBarrier.dec();\r
1136                                         }\r
1137 \r
1138                                 });\r
1139 \r
1140                         } catch (DatabaseException e) {\r
1141 \r
1142                                 entry.except(e);\r
1143                                 try {\r
1144                                         procedure.exception(graph, e);\r
1145                                 } catch (Throwable t2) {\r
1146                                         t2.printStackTrace();\r
1147                                 }\r
1148 //                              newGraph.state.barrier.dec();\r
1149 //                              parentBarrier.dec();\r
1150 \r
1151                         } catch (Throwable t) {\r
1152 \r
1153                                 DatabaseException e = new DatabaseException(t);\r
1154 \r
1155                                 entry.except(e);\r
1156                                 try {\r
1157                                         procedure.exception(graph, e);\r
1158                                 } catch (Throwable t2) {\r
1159                                         t2.printStackTrace();\r
1160                                 }\r
1161 //                              newGraph.state.barrier.dec();\r
1162 //                              parentBarrier.dec();\r
1163 \r
1164                         }\r
1165 \r
1166                         misses++;\r
1167 \r
1168                 } else {\r
1169 \r
1170                         entry.performFromCache(graph, this, procedure);\r
1171                         hits++;\r
1172 \r
1173 \r
1174                 }\r
1175 \r
1176                 assert (!entry.isDiscarded());\r
1177 \r
1178                 registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
1179 \r
1180         }\r
1181 \r
1182 \r
1183         public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
1184                         boolean inferredDependency) {\r
1185 \r
1186                 if (DebugPolicy.PERFORM)\r
1187                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1188 \r
1189                 assert (!dirty);\r
1190                 assert (!collecting);\r
1191 \r
1192                 try {\r
1193 \r
1194                         assert(!entry.isDiscarded());\r
1195 \r
1196                         // FRESH, REFUTED, EXCEPTED go here \r
1197                         if (!entry.isReady()) {\r
1198 \r
1199                                 size++;\r
1200 \r
1201                                 try {\r
1202 \r
1203                                         ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);\r
1204 \r
1205                                         query.perform(performGraph, new AsyncMultiProcedure<T>() {\r
1206 \r
1207                                                 @Override\r
1208                                                 public void execute(AsyncReadGraph graph, T result) {\r
1209                                                         ReadGraphImpl impl = (ReadGraphImpl)graph;\r
1210 //                                                      ReadGraphImpl executeGraph = callerGraph.newAsync();\r
1211                                                         entry.addOrSet(result);\r
1212                                                         try {\r
1213                                                                 procedure.execute(callerGraph, result);\r
1214                                                         } catch (Throwable t) {\r
1215                                                                 t.printStackTrace();\r
1216                                                         }\r
1217                                                 }\r
1218 \r
1219                                                 @Override\r
1220                                                 public void finished(AsyncReadGraph graph) {\r
1221                                                         ReadGraphImpl impl = (ReadGraphImpl)graph;\r
1222 //                                                      ReadGraphImpl executeGraph = callerGraph.newAsync();\r
1223                                                         entry.finish(callerGraph);\r
1224                                                         try {\r
1225                                                                 procedure.finished(callerGraph);\r
1226                                                         } catch (Throwable t) {\r
1227                                                                 t.printStackTrace();\r
1228                                                         }\r
1229                                                 }\r
1230 \r
1231                                                 @Override\r
1232                                                 public void exception(AsyncReadGraph graph, Throwable t) {\r
1233                                                         ReadGraphImpl impl = (ReadGraphImpl)graph;\r
1234 //                                                      ReadGraphImpl executeGraph = callerGraph.newAsync();\r
1235                                                         entry.except(callerGraph, t);\r
1236                                                         try {\r
1237                                                                 procedure.exception(callerGraph, t);\r
1238                                                         } catch (Throwable t2) {\r
1239                                                                 t2.printStackTrace();\r
1240                                                         }\r
1241                                                 }\r
1242 \r
1243                                         });\r
1244 \r
1245                                 } catch (Throwable t) {\r
1246 \r
1247                                         entry.except(t);\r
1248                                         try {\r
1249                                                 procedure.exception(callerGraph, t);\r
1250                                         } catch (Throwable t2) {\r
1251                                                 t2.printStackTrace();\r
1252                                         }\r
1253 \r
1254                                 }\r
1255 \r
1256 \r
1257                                 misses++;\r
1258 \r
1259                         } else {\r
1260 \r
1261                                 entry.performFromCache(callerGraph, this, procedure);\r
1262 \r
1263                                 hits++;\r
1264 \r
1265                         }\r
1266 \r
1267                         assert (!entry.isDiscarded());\r
1268 \r
1269                         registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);\r
1270 \r
1271                 } catch (Throwable t) {\r
1272 \r
1273                         Logger.defaultLogError(t);\r
1274 \r
1275                 } finally {\r
1276 \r
1277                 }\r
1278 \r
1279         }\r
1280 \r
1281         public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,\r
1282                         boolean inferredDependency) {\r
1283 \r
1284                 if (DebugPolicy.PERFORM)\r
1285                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1286 \r
1287                 assert (!dirty);\r
1288                 assert (!collecting);\r
1289 \r
1290                 assert(!entry.isPending());\r
1291                 assert(!entry.isDiscarded());\r
1292 \r
1293                 registerDependencies(graph, entry, parent, base, procedure, inferredDependency);\r
1294 \r
1295                 // FRESH, REFUTED, EXCEPTED go here \r
1296                 if (!entry.isReady()) {\r
1297 \r
1298                         entry.setPending();\r
1299                         entry.clearResult(querySupport);\r
1300 \r
1301                         externalReadMap.put(query, entry);\r
1302                         size++;\r
1303 \r
1304                         try {\r
1305 \r
1306                                 query.register(graph, new Listener<T>() {\r
1307 \r
1308                                         AtomicBoolean used = new AtomicBoolean(false);\r
1309 \r
1310                                         @Override\r
1311                                         public void execute(T result) {\r
1312                                                 \r
1313                                                 // Just for safety\r
1314                                                 if(entry.isDiscarded()) return;\r
1315                                                 if(entry.isExcepted()) entry.setPending();\r
1316                                                 \r
1317                                                 if(used.compareAndSet(false, true)) {\r
1318                                                         entry.addOrSet(QueryProcessor.this, result);\r
1319                                                         procedure.execute(result);\r
1320                                                 } else {\r
1321                                                         entry.queue(result);\r
1322                                                         updatePrimitive(query);\r
1323                                                 }\r
1324                                                 \r
1325                                         }\r
1326 \r
1327                                         @Override\r
1328                                         public void exception(Throwable t) {\r
1329                                                 \r
1330                                                 entry.except(t);\r
1331 \r
1332                                                 if(used.compareAndSet(false, true)) {\r
1333                                                         procedure.exception(t);\r
1334                                                 } else {\r
1335 //                                                      entry.queue(result);\r
1336                                                         updatePrimitive(query);\r
1337                                                 }\r
1338                                                 \r
1339                                         }\r
1340 \r
1341                                         @Override\r
1342                                         public String toString() {\r
1343                                                 return procedure.toString();\r
1344                                         }\r
1345 \r
1346                                         @Override\r
1347                                         public boolean isDisposed() {\r
1348                                                 return entry.isDiscarded() || !isBound(entry);\r
1349                                         }\r
1350 \r
1351                                 });\r
1352 \r
1353                         } catch (Throwable t) {\r
1354 \r
1355                                 entry.except(t);\r
1356                                 procedure.exception(t);\r
1357 \r
1358                         }\r
1359 \r
1360                         misses++;\r
1361 \r
1362                 } else {\r
1363 \r
1364                         entry.performFromCache(procedure);\r
1365 \r
1366                         hits++;\r
1367 \r
1368                 }\r
1369 \r
1370                 assert (!entry.isDiscarded());\r
1371 \r
1372         }\r
1373 \r
1374         private boolean isBound(ExternalReadEntry<?> entry) {\r
1375                 if(entry.hasParents()) return true;\r
1376                 else if(hasListener(entry)) return true;\r
1377                 else return false;\r
1378         }\r
1379 \r
1380         synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {\r
1381 \r
1382                 if (parent != null && !inferred) {\r
1383                         try {\r
1384                                 if(!child.isImmutable(graph))\r
1385                                         child.addParent(parent);\r
1386                         } catch (DatabaseException e) {\r
1387                                 Logger.defaultLogError(e);\r
1388                         }\r
1389                         if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);\r
1390                 }\r
1391 \r
1392                 if (listener != null) {\r
1393                         return registerListener(child, listener, procedure);\r
1394                 } else {\r
1395                         return null;\r
1396                 }\r
1397 \r
1398         }\r
1399 \r
1400         public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
1401 \r
1402                 if (DebugPolicy.PERFORM)\r
1403                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1404 \r
1405                 assert (!dirty);\r
1406                 assert (!collecting);\r
1407 \r
1408                 try {\r
1409 \r
1410                         registerDependencies(graph, query, parent, listener, procedure, false);\r
1411 \r
1412                         // FRESH, REFUTED, EXCEPTED go here \r
1413                         if (!query.isReady()) {\r
1414 \r
1415                                 boolean fresh = query.isFresh();\r
1416 \r
1417                                 if(fresh) {\r
1418                                         size++;\r
1419                                 }\r
1420 \r
1421                                 query.computeForEach(graph, this, procedure, true);\r
1422 \r
1423                                 misses++;\r
1424 \r
1425                         } else {\r
1426 \r
1427                                 query.performFromCache(graph, this, procedure);\r
1428 \r
1429                                 hits++;\r
1430 \r
1431                         }\r
1432 \r
1433                 } catch (Throwable t) {\r
1434 \r
1435                         Logger.defaultLogError(t);\r
1436 \r
1437                 }\r
1438         }\r
1439 \r
1440         public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
1441 \r
1442                 if (DebugPolicy.PERFORM)\r
1443                         System.out.println("PE[ " + (query.hashCode() &  THREAD_MASK) + "] " + query);\r
1444 \r
1445                 assert (!dirty);\r
1446                 assert (!collecting);\r
1447 \r
1448                 try {\r
1449 \r
1450                         assert(query.assertNotDiscarded());\r
1451 \r
1452                         registerDependencies(graph, query, parent, listener, procedure, false);\r
1453 \r
1454                         // FRESH, REFUTED, EXCEPTED go here \r
1455                         if (!query.isReady()) {\r
1456 \r
1457                                 size++;\r
1458                                 misses++;\r
1459 \r
1460                                 return query.computeForEach(graph, this, procedure, true);\r
1461 \r
1462 \r
1463                         } else {\r
1464 \r
1465                                 hits++;\r
1466 \r
1467                                 return query.performFromCache(graph, this, procedure);\r
1468 \r
1469                         }\r
1470 \r
1471                 } catch (Throwable t) {\r
1472 \r
1473                         Logger.defaultLogError(t);\r
1474                         return null;\r
1475 \r
1476                 } \r
1477 \r
1478         }\r
1479         \r
1480         static class Dummy implements InternalProcedure<Object>, IntProcedure {\r
1481 \r
1482                 @Override\r
1483                 public void execute(ReadGraphImpl graph, int i) {\r
1484                 }\r
1485 \r
1486                 @Override\r
1487                 public void finished(ReadGraphImpl graph) {\r
1488                 }\r
1489 \r
1490                 @Override\r
1491                 public void execute(ReadGraphImpl graph, Object result) {\r
1492                 }\r
1493 \r
1494                 @Override\r
1495                 public void exception(ReadGraphImpl graph, Throwable throwable) {\r
1496                 }\r
1497                 \r
1498         }\r
1499         \r
1500         private static final Dummy dummy = new Dummy();\r
1501 \r
1502     public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {\r
1503 \r
1504         if (DebugPolicy.PERFORM)\r
1505             System.out.println("PE[ " + (query.hashCode() &  THREAD_MASK) + "] " + query);\r
1506 \r
1507         assert (!dirty);\r
1508         assert (!collecting);\r
1509 \r
1510         assert(query.assertNotDiscarded());\r
1511 \r
1512         registerDependencies(graph, query, parent, listener, procedure, false);\r
1513 \r
1514         // FRESH, REFUTED, EXCEPTED go here \r
1515         if (!query.isReady()) {\r
1516 \r
1517             size++;\r
1518             misses++;\r
1519 \r
1520             query.computeForEach(graph, this, (Procedure)dummy, true);\r
1521             return query.get(graph, this, null);\r
1522 \r
1523         } else {\r
1524 \r
1525             hits++;\r
1526 \r
1527             return query.get(graph, this, procedure);\r
1528 \r
1529         }\r
1530 \r
1531     }\r
1532         \r
1533         public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {\r
1534 \r
1535                 if (DebugPolicy.PERFORM)\r
1536                         System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
1537 \r
1538                 assert (!dirty);\r
1539                 assert (!collecting);\r
1540 \r
1541                 try {\r
1542 \r
1543                         if(query.isDiscarded()) {\r
1544                                 System.err.println("aff");\r
1545                         }\r
1546                         assert(!query.isDiscarded());\r
1547 \r
1548                         // FRESH, REFUTED, EXCEPTED go here \r
1549                         if (!query.isReady()) {\r
1550 \r
1551                                 query.computeForEach(graph.withAsyncParent(query), this, procedure);\r
1552 \r
1553                                 size++;\r
1554                                 misses++;\r
1555 \r
1556                         } else {\r
1557 \r
1558                                 query.performFromCache(graph, this, procedure);\r
1559 \r
1560                                 hits++;\r
1561 \r
1562                         }\r
1563 \r
1564                         assert (!query.isDiscarded());\r
1565 \r
1566                         registerDependencies(graph, query, parent, listener, procedure, false);\r
1567 \r
1568                 } catch (Throwable t) {\r
1569 \r
1570                         t.printStackTrace();\r
1571                         Logger.defaultLogError(t);\r
1572 \r
1573                 }\r
1574 \r
1575         }\r
1576 \r
1577         interface QueryCollectorSupport {\r
1578                 public CacheCollectionResult allCaches();\r
1579                 public Collection<CacheEntry> getRootList();\r
1580                 public int getCurrentSize();\r
1581                 public int calculateCurrentSize();\r
1582                 public CacheEntryBase iterate(int level);\r
1583                 public void remove();\r
1584                 public void setLevel(CacheEntryBase entry, int level);\r
1585                 public boolean start(boolean flush);\r
1586         }\r
1587 \r
1588         interface QueryCollector {\r
1589 \r
1590                 public void collect(int youngTarget, int allowedTimeInMs);\r
1591 \r
1592         }\r
1593 \r
1594         class QueryCollectorSupportImpl implements QueryCollectorSupport {\r
1595 \r
1596                 private static final boolean DEBUG = false;\r
1597                 private static final double ITERATION_RATIO = 0.2;\r
1598                 \r
1599                 private CacheCollectionResult iteration = new CacheCollectionResult();\r
1600                 private boolean fresh = true;\r
1601                 private boolean needDataInStart = true;\r
1602                 \r
1603                 QueryCollectorSupportImpl() {\r
1604                         iteration.restart();\r
1605                 }\r
1606 \r
1607                 public CacheCollectionResult allCaches() {\r
1608                         CacheCollectionResult result = new CacheCollectionResult();\r
1609                         QueryProcessor.this.allCaches(result);\r
1610                         result.restart();\r
1611                         return result;\r
1612                 }\r
1613                 \r
1614                 public boolean start(boolean flush) {\r
1615                         // We need new data from query maps\r
1616                         fresh = true;\r
1617                         if(needDataInStart || flush) {\r
1618                                 // Last run ended after processing all queries => refresh data\r
1619                                 restart(flush ? 0.0 : ITERATION_RATIO);\r
1620                         } else {\r
1621                                 // continue with previous big data\r
1622                         }\r
1623                         // Notify caller about iteration situation\r
1624                         return iteration.isAtStart();\r
1625                 }\r
1626 \r
1627                 private void restart(double targetRatio) {\r
1628                         \r
1629                         needDataInStart = true;\r
1630 \r
1631                         long start = System.nanoTime();\r
1632                         if(fresh) {\r
1633                                 \r
1634                                 // We need new data from query maps\r
1635                                 \r
1636                                 int iterationSize = iteration.size()+1;\r
1637                                 int diff = calculateCurrentSize()-iterationSize;\r
1638                                 \r
1639                                 double ratio = (double)diff / (double)iterationSize;\r
1640                                 boolean dirty = Math.abs(ratio) >= targetRatio;\r
1641                                 \r
1642                                 if(dirty) {\r
1643                                         iteration = allCaches();\r
1644                                         if(DEBUG) {\r
1645                                                 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");\r
1646                                                 for(int i=0;i<CacheCollectionResult.LEVELS;i++)\r
1647                                                         System.err.print(" " + iteration.levels[i].size());\r
1648                                                 System.err.println("");\r
1649                                         }\r
1650                                 } else {\r
1651                                         iteration.restart();\r
1652                                 }\r
1653                                 \r
1654                                 fresh = false;\r
1655                                 needDataInStart = false;\r
1656                         } else {\r
1657                                 // We are returning here within the same GC round - reuse the cache table\r
1658                                 iteration.restart();\r
1659                         }\r
1660                         \r
1661                         return;\r
1662                         \r
1663                 }\r
1664                 \r
1665                 @Override\r
1666                 public CacheEntryBase iterate(int level) {\r
1667                         \r
1668                         CacheEntryBase entry = iteration.next(level);\r
1669                         if(entry == null) {\r
1670                                 restart(ITERATION_RATIO);\r
1671                                 return null;\r
1672                         }\r
1673                         \r
1674                         while(entry != null && entry.isDiscarded()) {\r
1675                                 entry = iteration.next(level);\r
1676                         }\r
1677                         \r
1678                         return entry;\r
1679                         \r
1680                 }\r
1681                 \r
1682                 @Override\r
1683                 public void remove() {\r
1684                         iteration.remove();\r
1685                 }\r
1686                 \r
1687                 @Override\r
1688                 public void setLevel(CacheEntryBase entry, int level) {\r
1689                         iteration.setLevel(entry, level);\r
1690                 }\r
1691 \r
1692                 public Collection<CacheEntry> getRootList() {\r
1693 \r
1694                         ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();\r
1695 \r
1696                         for (Object e : valueMap.values()) {\r
1697                                 result.add((CacheEntry) e);\r
1698                         }\r
1699                         for (Object e : directPredicatesMap.values()) {\r
1700                                 result.add((CacheEntry) e);\r
1701                         }\r
1702                         for (Object e : objectsMap.values()) {\r
1703                                 result.add((CacheEntry) e);\r
1704                         }\r
1705                         for (Object e : directObjectsMap.values()) {\r
1706                                 result.add((CacheEntry) e);\r
1707                         }\r
1708                         for (Object e : principalTypesMap.values()) {\r
1709                                 result.add((CacheEntry) e);\r
1710                         }\r
1711                         for (Object e : superRelationsMap.values()) {\r
1712                                 result.add((CacheEntry) e);\r
1713                         }\r
1714                         for (Object e : superTypesMap.values()) {\r
1715                                 result.add((CacheEntry) e);\r
1716                         }\r
1717                         for (Object e : typesMap.values()) {\r
1718                                 result.add((CacheEntry) e);\r
1719                         }\r
1720                         for (Object e : objectsMap.values()) {\r
1721                                 result.add((CacheEntry) e);\r
1722                         }\r
1723                         for (Object e : assertedStatementsMap.values()) {\r
1724                                 result.add((CacheEntry) e);\r
1725                         }\r
1726                         for (Object e : readMap.values()) {\r
1727                                 if(e instanceof CacheEntry) {\r
1728                                         result.add((CacheEntry) e);\r
1729                                 } else {\r
1730                                         System.err.println("e=" + e);\r
1731                                 }\r
1732                         }\r
1733                         for (Object e : asyncReadMap.values()) {\r
1734                                 if(e instanceof CacheEntry) {\r
1735                                         result.add((CacheEntry) e);\r
1736                                 } else {\r
1737                                         System.err.println("e=" + e);\r
1738                                 }\r
1739                         }\r
1740                         for (Object e : externalReadMap.values()) {\r
1741                                 result.add((CacheEntry) e);\r
1742                         }\r
1743                         for (Object e : orderedSetMap.values()) {\r
1744                                 result.add((CacheEntry) e);\r
1745                         }\r
1746 \r
1747                         return result;\r
1748 \r
1749                 }\r
1750 \r
1751                 @Override\r
1752                 public int calculateCurrentSize() {\r
1753                         \r
1754                         int realSize = 0;\r
1755                         \r
1756                         realSize += directPredicatesMap.size();\r
1757                         realSize += principalTypesMap.size();\r
1758                         realSize += uriToResourceMap.size();\r
1759                         realSize += namespaceIndexMap22.size();\r
1760                         realSize += projectsMap.size();\r
1761                         \r
1762                         realSize += relationInfoMap.size();\r
1763                         realSize += superTypesMap.size();\r
1764                         realSize += typeHierarchyMap.size();\r
1765                         realSize += superRelationsMap.size();\r
1766                         realSize += typesMap.size();\r
1767                         \r
1768                         realSize += valueMap.size();\r
1769                         realSize += directObjectsMap.size();\r
1770                         realSize += objectsMap.size();\r
1771                         realSize += orderedSetMap.size();\r
1772                         realSize += predicatesMap.size();\r
1773                         \r
1774                         realSize += statementsMap.size();\r
1775                         realSize += assertedPredicatesMap.size();\r
1776                         realSize += assertedStatementsMap.size();\r
1777                         realSize += externalReadMap.size();\r
1778                         realSize += asyncReadMap.size();\r
1779                         \r
1780                         realSize += readMap.size();\r
1781                         realSize += asyncMultiReadMap.size();\r
1782                         realSize += multiReadMap.size();\r
1783                         \r
1784                         size = realSize;\r
1785                         \r
1786                         return realSize;\r
1787                         \r
1788                 }\r
1789 \r
1790                 @Override\r
1791                 public int getCurrentSize() {\r
1792                         return size;\r
1793                 }\r
1794 \r
1795         }\r
1796         //    final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);\r
1797 \r
1798         private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();\r
1799         private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);\r
1800 \r
1801     public int querySize() {\r
1802         return size;\r
1803     }\r
1804 \r
1805         public void gc(int youngTarget, int allowedTimeInMs) {\r
1806 \r
1807                 collector.collect(youngTarget, allowedTimeInMs);\r
1808 \r
1809         }\r
1810 \r
1811         public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {\r
1812 \r
1813                 assert (entry != null);\r
1814 \r
1815                 if (base.isDisposed())\r
1816                         return null;\r
1817 \r
1818                 return addListener(entry, base, procedure);\r
1819 \r
1820         }\r
1821 \r
1822         private void primeListenerEntry(final ListenerEntry entry, final Object result) {\r
1823                 entry.setLastKnown(result);\r
1824         }\r
1825 \r
1826         private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {\r
1827 \r
1828                 assert (entry != null);\r
1829                 assert (procedure != null);\r
1830 \r
1831                 ArrayList<ListenerEntry> list = listeners.get(entry);\r
1832                 if (list == null) {\r
1833                         list = new ArrayList<ListenerEntry>(1);\r
1834                         listeners.put(entry, list);\r
1835                 }\r
1836 \r
1837                 ListenerEntry result = new ListenerEntry(entry, base, procedure);\r
1838                 int currentIndex = list.indexOf(result);\r
1839                 // There was already a listener\r
1840                 if(currentIndex > -1) {\r
1841                         ListenerEntry current = list.get(currentIndex);\r
1842                         if(!current.base.isDisposed()) return null;\r
1843                         list.set(currentIndex, result);\r
1844                 } else {\r
1845                         list.add(result);\r
1846                 }\r
1847 \r
1848                 if(DebugPolicy.LISTENER) {\r
1849                         new Exception().printStackTrace();\r
1850                         System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);\r
1851                 }\r
1852 \r
1853                 return result;\r
1854 \r
1855         }\r
1856 \r
1857         private void scheduleListener(ListenerEntry entry) {\r
1858                 assert (entry != null);\r
1859                 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);\r
1860                 scheduledListeners.add(entry);\r
1861         }\r
1862 \r
1863         private void removeListener(ListenerEntry entry) {\r
1864                 assert (entry != null);\r
1865                 ArrayList<ListenerEntry> list = listeners.get(entry.entry);\r
1866                 if(list == null) return;\r
1867                 boolean success = list.remove(entry);\r
1868                 assert (success);\r
1869                 if (list.isEmpty())\r
1870                         listeners.remove(entry.entry);\r
1871         }\r
1872 \r
1873         private boolean hasListener(CacheEntry entry) {\r
1874                 if(listeners.get(entry) != null) return true;\r
1875                 return false;\r
1876         }\r
1877 \r
1878         boolean hasListenerAfterDisposing(CacheEntry entry) {\r
1879                 if(listeners.get(entry) != null) {\r
1880                         ArrayList<ListenerEntry> entries = listeners.get(entry);\r
1881                         ArrayList<ListenerEntry> list = null;\r
1882                         for (ListenerEntry e : entries) {\r
1883                                 if (e.base.isDisposed()) {\r
1884                                         if(list == null) list = new ArrayList<ListenerEntry>();\r
1885                                         list.add(e);\r
1886                                 }\r
1887                         }\r
1888                         if(list != null) {\r
1889                                 for (ListenerEntry e : list) {\r
1890                                         entries.remove(e);\r
1891                                 }\r
1892                         }\r
1893                         if (entries.isEmpty()) {\r
1894                                 listeners.remove(entry);\r
1895                                 return false;\r
1896                         }\r
1897                         return true;\r
1898                 }\r
1899                 return false;\r
1900         }\r
1901 \r
1902         List<ListenerEntry> getListenerEntries(CacheEntry entry) {\r
1903                 hasListenerAfterDisposing(entry);\r
1904                 if(listeners.get(entry) != null)\r
1905                         return listeners.get(entry);\r
1906                 else \r
1907                         return Collections.emptyList();\r
1908         }\r
1909 \r
1910         void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {\r
1911 \r
1912                 if(!workarea.containsKey(entry)) {\r
1913 \r
1914                         HashSet<ListenerBase> ls = new HashSet<ListenerBase>();\r
1915                         for(ListenerEntry e : getListenerEntries(entry))\r
1916                                 ls.add(e.base);\r
1917 \r
1918                         workarea.put(entry, ls);\r
1919 \r
1920                         for(CacheEntry parent : entry.getParents(this)) {\r
1921                                 processListenerReport(parent, workarea);\r
1922                                 ls.addAll(workarea.get(parent));\r
1923                         }\r
1924 \r
1925                 }\r
1926 \r
1927         }\r
1928 \r
1929         public synchronized ListenerReport getListenerReport() throws IOException {\r
1930 \r
1931                 class ListenerReportImpl implements ListenerReport {\r
1932 \r
1933                         Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();\r
1934 \r
1935                         @Override\r
1936                         public void print(PrintStream b) {\r
1937                                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();\r
1938                                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {\r
1939                                         for(ListenerBase l : e.getValue()) {\r
1940                                                 Integer i = hist.get(l);\r
1941                                                 hist.put(l, i != null ? i-1 : -1);\r
1942                                         }\r
1943                                 }\r
1944 \r
1945                                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
1946                                         b.print("" + -p.second + " " + p.first + "\n");\r
1947                                 }\r
1948 \r
1949                                 b.flush();\r
1950                         }\r
1951 \r
1952                 }\r
1953 \r
1954                 ListenerReportImpl result = new ListenerReportImpl();\r
1955 \r
1956                 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();\r
1957                 for(CacheEntryBase entry : all) {\r
1958                         hasListenerAfterDisposing(entry);\r
1959                 }\r
1960                 for(CacheEntryBase entry : all) {\r
1961                         processListenerReport(entry, result.workarea);\r
1962                 }\r
1963 \r
1964                 return result;\r
1965 \r
1966         }\r
1967 \r
1968         public synchronized String reportListeners(File file) throws IOException {\r
1969 \r
1970                 if (!isAlive())\r
1971                         return "Disposed!";\r
1972 \r
1973                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
1974                 ListenerReport report = getListenerReport();\r
1975                 report.print(b);\r
1976 \r
1977                 return "Done reporting listeners.";\r
1978 \r
1979         }\r
1980 \r
1981         void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {\r
1982 \r
1983                 if(entry.isDiscarded()) return;\r
1984                 if(workarea.containsKey(entry)) return;\r
1985                 \r
1986                 Iterable<CacheEntry> parents = entry.getParents(this);\r
1987                 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();\r
1988                 for(CacheEntry e : parents) {\r
1989                         if(e.isDiscarded()) continue;\r
1990                         ps.add(e);\r
1991                         processParentReport(e, workarea);\r
1992                 }\r
1993                 workarea.put(entry, ps);\r
1994 \r
1995         }\r
1996 \r
1997         public synchronized String reportQueryActivity(File file) throws IOException {\r
1998                 \r
1999                 System.err.println("reportQueries " + file.getAbsolutePath());\r
2000 \r
2001                 if (!isAlive())\r
2002                         return "Disposed!";\r
2003 \r
2004                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
2005 \r
2006                 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);\r
2007                 Collections.reverse(entries);\r
2008                 \r
2009                 for(Pair<String,Integer> entry : entries) {\r
2010                         b.println(entry.first + ": " + entry.second);\r
2011                 }\r
2012 \r
2013                 b.close();\r
2014                 \r
2015                 Development.histogram.clear();\r
2016 \r
2017                 return "OK";\r
2018 \r
2019         }\r
2020         \r
2021         public synchronized String reportQueries(File file) throws IOException {\r
2022 \r
2023                 System.err.println("reportQueries " + file.getAbsolutePath());\r
2024 \r
2025                 if (!isAlive())\r
2026                         return "Disposed!";\r
2027 \r
2028                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
2029 \r
2030                 long start = System.nanoTime();\r
2031 \r
2032 //              ArrayList<CacheEntry> all = ;\r
2033                 \r
2034                 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();\r
2035                 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();\r
2036                 for(CacheEntryBase entry : caches) {\r
2037                         processParentReport(entry, workarea);\r
2038                 }\r
2039                 \r
2040                 //        for(CacheEntry e : all) System.err.println("entry: " + e);\r
2041 \r
2042                 long duration = System.nanoTime() - start;\r
2043                 System.err.println("Query root set in " + 1e-9*duration + "s.");\r
2044 \r
2045                 start = System.nanoTime();\r
2046 \r
2047                 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); \r
2048 \r
2049                 int listeners = 0;\r
2050 \r
2051                 for(CacheEntry entry : workarea.keySet()) {\r
2052                         boolean listener = hasListenerAfterDisposing(entry);\r
2053                         boolean hasParents = entry.getParents(this).iterator().hasNext();\r
2054                         if(listener) {\r
2055                                 // Bound\r
2056                                 flagMap.put(entry, 0);\r
2057                         } else if (!hasParents) {\r
2058                                 // Unbound\r
2059                                 flagMap.put(entry, 1);\r
2060                         } else {\r
2061                                 // Unknown\r
2062                                 flagMap.put(entry, 2);\r
2063                         }\r
2064                         //              // Write leaf bit\r
2065                         //              entry.flags |= 4;\r
2066                 }\r
2067 \r
2068                 boolean done = true;\r
2069                 int loops = 0;\r
2070 \r
2071                 do {\r
2072 \r
2073                         done = true;\r
2074 \r
2075                         long start2 = System.nanoTime();\r
2076 \r
2077                         int boundCounter = 0;\r
2078                         int unboundCounter = 0;\r
2079                         int unknownCounter = 0;\r
2080 \r
2081                         for(CacheEntry entry : workarea.keySet()) {\r
2082 \r
2083                                 //System.err.println("process " + entry);\r
2084 \r
2085                                 int flags = flagMap.get(entry);\r
2086                                 int bindStatus = flags & 3;\r
2087 \r
2088                                 if(bindStatus == 0) boundCounter++;\r
2089                                 else if(bindStatus == 1) unboundCounter++;\r
2090                                 else if(bindStatus == 2) unknownCounter++;\r
2091 \r
2092                                 if(bindStatus < 2) continue;\r
2093 \r
2094                                 int newStatus = 1;\r
2095                                 for(CacheEntry parent : entry.getParents(this)) {\r
2096 \r
2097                                         if(parent.isDiscarded()) flagMap.put(parent, 1);\r
2098 \r
2099                                         int flags2 = flagMap.get(parent);\r
2100                                         int bindStatus2 = flags2 & 3;\r
2101                                         // Parent is bound => child is bound\r
2102                                         if(bindStatus2 == 0) {\r
2103                                                 newStatus = 0;\r
2104                                                 break;\r
2105                                         }\r
2106                                         // Parent is unknown => child is unknown\r
2107                                         else if (bindStatus2 == 2) {\r
2108                                                 newStatus = 2;\r
2109                                                 done = false;\r
2110                                                 break;\r
2111                                         }\r
2112                                 }\r
2113 \r
2114                                 flagMap.put(entry, newStatus);\r
2115 \r
2116                         }\r
2117 \r
2118                         duration = System.nanoTime() - start2;\r
2119                         System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
2120                         b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
2121 \r
2122                 } while(!done && loops++ < 20);\r
2123 \r
2124                 if(loops >= 20) {\r
2125 \r
2126                         for(CacheEntry entry : workarea.keySet()) {\r
2127 \r
2128                                 int bindStatus = flagMap.get(entry);\r
2129                                 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);\r
2130 \r
2131                         }\r
2132 \r
2133                 }\r
2134 \r
2135                 duration = System.nanoTime() - start;\r
2136                 System.err.println("Query analysis in " + 1e-9*duration + "s.");\r
2137 \r
2138                 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();\r
2139 \r
2140                 for(CacheEntry entry : workarea.keySet()) {\r
2141                         Class<?> clazz = entry.getClass();\r
2142                         if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); \r
2143                         else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); \r
2144                         else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); \r
2145                         else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); \r
2146                         else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); \r
2147                         Integer c = counts.get(clazz);\r
2148                         if(c == null) counts.put(clazz, -1);\r
2149                         else counts.put(clazz, c-1);\r
2150                 }\r
2151 \r
2152                 b.print("// Simantics DB client query report file\n");\r
2153                 b.print("// This file contains the following information\n");\r
2154                 b.print("// -The amount of cached query instances per query class\n");\r
2155                 b.print("// -The sizes of retained child sets\n");\r
2156                 b.print("// -List of parents for each query (search for 'P <query name>')\n");\r
2157                 b.print("//  -Followed by status, where\n");\r
2158                 b.print("//   -0=bound\n");\r
2159                 b.print("//   -1=free\n");\r
2160                 b.print("//   -2=unknown\n");\r
2161                 b.print("//   -L=has listener\n");\r
2162                 b.print("// -List of children for each query (search for 'C <query name>')\n");\r
2163 \r
2164                 b.print("----------------------------------------\n");\r
2165 \r
2166                 b.print("// Queries by class\n");\r
2167                 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {\r
2168                         b.print(-p.second + " " + p.first.getName() + "\n");\r
2169                 }\r
2170 \r
2171                 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();\r
2172                 for(CacheEntry e : workarea.keySet())\r
2173                         hist.put(e, -1);\r
2174                 \r
2175                 boolean changed = true;\r
2176                 int iter = 0;\r
2177                 while(changed && iter++<50) {\r
2178                         \r
2179                         changed = false;\r
2180                         \r
2181                         Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();\r
2182                         for(CacheEntry e : workarea.keySet())\r
2183                                 newHist.put(e, -1);\r
2184 \r
2185                         for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {\r
2186                                 Integer c = hist.get(e.getKey());\r
2187                                 for(CacheEntry p : e.getValue()) {\r
2188                                         Integer i = newHist.get(p);\r
2189                                         newHist.put(p, i+c);\r
2190                                 }\r
2191                         }\r
2192                         for(CacheEntry e : workarea.keySet()) {\r
2193                                 Integer value = newHist.get(e);\r
2194                                 Integer old = hist.get(e);\r
2195                                 if(!value.equals(old)) {\r
2196                                         hist.put(e, value);\r
2197 //                                      System.err.println("hist " + e + ": " + old + " => " + value);\r
2198                                         changed = true;\r
2199                                 }\r
2200                         }\r
2201                         \r
2202                         System.err.println("Retained set iteration " + iter);\r
2203 \r
2204                 }\r
2205 \r
2206                 b.print("// Queries by retained set\n");\r
2207                 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
2208                         b.print("" + -p.second + " " + p.first + "\n");\r
2209                 }\r
2210 \r
2211                 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();\r
2212 \r
2213                 b.print("// Entry parent listing\n");\r
2214                 for(CacheEntry entry : workarea.keySet()) {\r
2215                         int status = flagMap.get(entry);\r
2216                         boolean hasListener = hasListenerAfterDisposing(entry);\r
2217                         b.print("Q " + entry.toString());\r
2218                         if(hasListener) {\r
2219                                 b.print(" (L" + status + ")");\r
2220                                 listeners++;\r
2221                         } else {\r
2222                                 b.print(" (" + status + ")");\r
2223                         }\r
2224                         b.print("\n");\r
2225                         for(CacheEntry parent : workarea.get(entry)) {\r
2226                                 Collection<CacheEntry> inv = inverse.get(parent);\r
2227                                 if(inv == null) {\r
2228                                         inv = new ArrayList<CacheEntry>();\r
2229                                         inverse.put(parent, inv);\r
2230                                 }\r
2231                                 inv.add(entry);\r
2232                                 b.print("  " + parent.toString());\r
2233                                 b.print("\n");\r
2234                         }\r
2235                 }\r
2236 \r
2237                 b.print("// Entry child listing\n");\r
2238                 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {\r
2239                         b.print("C " + entry.getKey().toString());\r
2240                         b.print("\n");\r
2241                         for(CacheEntry child : entry.getValue()) {\r
2242                                 Integer h = hist.get(child);\r
2243                                 if(h != null) {\r
2244                                         b.print("  " + h);\r
2245                                 } else {\r
2246                                         b.print("  <no children>");\r
2247                                 }\r
2248                                 b.print("  " + child.toString());\r
2249                                 b.print("\n");\r
2250                         }\r
2251                 }\r
2252 \r
2253                 b.print("#queries: " + workarea.keySet().size() + "\n");\r
2254                 b.print("#listeners: " + listeners + "\n");\r
2255 \r
2256                 b.close();\r
2257 \r
2258                 return "Dumped " + workarea.keySet().size() + " queries.";\r
2259 \r
2260         }\r
2261 \r
2262         class UpdateEntry {\r
2263 \r
2264                 public CacheEntry caller;\r
2265 \r
2266                 public CacheEntry entry;\r
2267 \r
2268                 public int         indent;\r
2269 \r
2270                 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {\r
2271                         this.caller = caller;\r
2272                         this.entry = entry;\r
2273                         this.indent = indent;\r
2274                 }\r
2275 \r
2276         };\r
2277 \r
2278         boolean removeQuery(CacheEntry entry) {\r
2279 \r
2280                 // This entry has been removed before. No need to do anything here.\r
2281                 if(entry.isDiscarded()) return false;\r
2282 \r
2283                 assert (!entry.isDiscarded());\r
2284 \r
2285                 Query query = entry.getQuery();\r
2286 \r
2287                 query.removeEntry(this);\r
2288 \r
2289                 updates++;\r
2290                 size--;\r
2291 \r
2292                 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)\r
2293                         boundQueries--;\r
2294                 \r
2295                 entry.discard();\r
2296 \r
2297                 return true;\r
2298 \r
2299         }\r
2300 \r
2301         /**\r
2302          * \r
2303          * @return true if this entry is being listened\r
2304          */\r
2305         private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {\r
2306 \r
2307                 assert (e != null);\r
2308 \r
2309                 CacheEntry entry = e.entry;\r
2310 \r
2311 //              System.err.println("updateQuery " + entry);\r
2312                 \r
2313                 /*\r
2314                  * If the dependency graph forms a DAG, some entries are inserted in the\r
2315                  * todo list many times. They only need to be processed once though.\r
2316                  */\r
2317                 if (entry.isDiscarded()) {\r
2318                         if (Development.DEVELOPMENT) {\r
2319                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2320                                         System.out.print("D");\r
2321                                         for (int i = 0; i < e.indent; i++)\r
2322                                                 System.out.print(" ");\r
2323                                         System.out.println(entry.getQuery());\r
2324                                 }\r
2325                         }\r
2326 //                      System.err.println(" => DISCARDED");\r
2327                         return false;\r
2328                 }\r
2329 \r
2330                 if (entry.isRefuted()) {\r
2331                         if (Development.DEVELOPMENT) {\r
2332                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2333                                         System.out.print("R");\r
2334                                         for (int i = 0; i < e.indent; i++)\r
2335                                                 System.out.print(" ");\r
2336                                         System.out.println(entry.getQuery());\r
2337                                 }\r
2338                         }\r
2339                         return false;\r
2340                 }\r
2341 \r
2342                 if (entry.isExcepted()) {\r
2343                         if (Development.DEVELOPMENT) {\r
2344                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2345                                         System.out.print("E");\r
2346                                 }\r
2347                         }\r
2348                 }\r
2349 \r
2350                 if (entry.isPending()) {\r
2351                         if (Development.DEVELOPMENT) {\r
2352                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2353                                         System.out.print("P");\r
2354                                 }\r
2355                         }\r
2356                 }\r
2357 \r
2358                 updates++;\r
2359 \r
2360                 if (Development.DEVELOPMENT) {\r
2361                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2362                                 System.out.print("U ");\r
2363                                 for (int i = 0; i < e.indent; i++)\r
2364                                         System.out.print(" ");\r
2365                                 System.out.print(entry.getQuery());\r
2366                         }\r
2367                 }\r
2368 \r
2369                 Query query = entry.getQuery();\r
2370                 int type = query.type();\r
2371 \r
2372                 boolean hasListener = hasListener(entry); \r
2373 \r
2374                 if (Development.DEVELOPMENT) {\r
2375                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2376                                 if(hasListener(entry)) {\r
2377                                         System.out.println(" (L)");\r
2378                                 } else {\r
2379                                         System.out.println("");\r
2380                                 }\r
2381                         }\r
2382                 }\r
2383 \r
2384                 if(entry.isPending() || entry.isExcepted()) {\r
2385 \r
2386                         // If updated\r
2387                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
2388 \r
2389                                 immediates.put(entry, entry);\r
2390 \r
2391                         } else {\r
2392 \r
2393                                 if(hasListener) {\r
2394                                         entry.refute();\r
2395                                 } else {\r
2396                                         removeQuery(entry);\r
2397                                 }\r
2398 \r
2399                         }\r
2400 \r
2401                 } else {\r
2402 \r
2403                         // If updated\r
2404                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
2405 \r
2406                                 immediates.put(entry, entry);\r
2407 \r
2408                         } else {\r
2409 \r
2410                                 if(hasListener) {\r
2411                                         entry.refute();\r
2412                                 } else {\r
2413                                         removeQuery(entry);\r
2414                                 }\r
2415 \r
2416                         }\r
2417 \r
2418                 }\r
2419 \r
2420 //              System.err.println(" => FOO " + type);\r
2421 \r
2422                 if (hasListener) {\r
2423                         ArrayList<ListenerEntry> entries = listeners.get(entry);\r
2424                         if(entries != null) {\r
2425                                 for (ListenerEntry le : entries) {\r
2426                                         scheduleListener(le);\r
2427                                 }\r
2428                         }\r
2429                 }\r
2430 \r
2431                 // If invalid, update parents\r
2432                 if (type == RequestFlags.INVALIDATE) {\r
2433                         updateParents(e.indent, entry, todo);\r
2434                 }\r
2435 \r
2436                 return hasListener;\r
2437 \r
2438         }\r
2439 \r
2440         private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {\r
2441 \r
2442                 Iterable<CacheEntry> oldParents = entry.getParents(this);\r
2443                 for (CacheEntry parent : oldParents) {\r
2444 //                      System.err.println("updateParents " + entry + " => " + parent);\r
2445                         if(!parent.isDiscarded())\r
2446                                 todo.push(new UpdateEntry(entry, parent, indent + 2));\r
2447                 }\r
2448 \r
2449         }\r
2450 \r
2451         private boolean pruneListener(ListenerEntry entry) {\r
2452                 if (entry.base.isDisposed()) {\r
2453                         removeListener(entry);\r
2454                         return true;\r
2455                 } else {\r
2456                         return false;\r
2457                 }\r
2458         }\r
2459 \r
2460         /**\r
2461          * @param av1 an array (guaranteed)\r
2462          * @param av2 any object\r
2463          * @return <code>true</code> if the two arrays are equal\r
2464          */\r
2465         private final boolean arrayEquals(Object av1, Object av2) {\r
2466                 if (av2 == null)\r
2467                         return false;\r
2468                 Class<?> c1 = av1.getClass().getComponentType();\r
2469                 Class<?> c2 = av2.getClass().getComponentType();\r
2470                 if (c2 == null || !c1.equals(c2))\r
2471                         return false;\r
2472                 boolean p1 = c1.isPrimitive();\r
2473                 boolean p2 = c2.isPrimitive();\r
2474                 if (p1 != p2)\r
2475                         return false;\r
2476                 if (!p1)\r
2477                         return Arrays.equals((Object[]) av1, (Object[]) av2);\r
2478                 if (boolean.class.equals(c1))\r
2479                         return Arrays.equals((boolean[]) av1, (boolean[]) av2);\r
2480                 else if (byte.class.equals(c1))\r
2481                         return Arrays.equals((byte[]) av1, (byte[]) av2);\r
2482                 else if (int.class.equals(c1))\r
2483                         return Arrays.equals((int[]) av1, (int[]) av2);\r
2484                 else if (long.class.equals(c1))\r
2485                         return Arrays.equals((long[]) av1, (long[]) av2);\r
2486                 else if (float.class.equals(c1))\r
2487                         return Arrays.equals((float[]) av1, (float[]) av2);\r
2488                 else if (double.class.equals(c1))\r
2489                         return Arrays.equals((double[]) av1, (double[]) av2);\r
2490                 throw new RuntimeException("??? Contact application querySupport.");\r
2491         }\r
2492 \r
2493 \r
2494 \r
2495         final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {\r
2496 \r
2497                 try {\r
2498 \r
2499                         Query query = entry.getQuery();\r
2500 \r
2501                         if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);\r
2502 \r
2503                         entry.prepareRecompute(querySupport);\r
2504                         \r
2505                         ReadGraphImpl parentGraph = graph.withParent(entry);\r
2506 \r
2507                         query.recompute(parentGraph, this, entry);\r
2508 \r
2509                         if(entry.isExcepted()) return ListenerEntry.NO_VALUE;\r
2510 \r
2511                         Object newValue = entry.getResult();\r
2512 \r
2513                         if (ListenerEntry.NO_VALUE == oldValue) {\r
2514                                 if(DebugPolicy.CHANGES) {\r
2515                                         System.out.println("C " + query);\r
2516                                         System.out.println("- " + oldValue);\r
2517                                         System.out.println("- " + newValue);\r
2518                                 }\r
2519                                 return newValue;\r
2520                         }\r
2521 \r
2522                         boolean changed = false;\r
2523 \r
2524                         if (newValue != null) {\r
2525                                 if (newValue.getClass().isArray()) {\r
2526                                         changed = !arrayEquals(newValue, oldValue);\r
2527                                 } else {\r
2528                                         changed = !newValue.equals(oldValue);\r
2529                                 }\r
2530                         } else\r
2531                                 changed = (oldValue != null);\r
2532 \r
2533                         if(DebugPolicy.CHANGES && changed) {\r
2534                                 System.out.println("C " + query);\r
2535                                 System.out.println("- " + oldValue);\r
2536                                 System.out.println("- " + newValue);\r
2537                         }\r
2538 \r
2539                         return changed ? newValue : ListenerEntry.NOT_CHANGED;\r
2540 \r
2541                 } catch (Throwable t) {\r
2542 \r
2543                         Logger.defaultLogError(t);\r
2544                         entry.except(t);\r
2545                         return ListenerEntry.NO_VALUE;\r
2546 \r
2547                 }\r
2548 \r
2549         }\r
2550 \r
2551         public boolean hasScheduledUpdates() {\r
2552                 return !scheduledListeners.isEmpty();\r
2553         }\r
2554 \r
2555         public void performScheduledUpdates(WriteGraphImpl graph) {\r
2556 \r
2557                 assert (!updating);\r
2558                 assert (!collecting);\r
2559                 assert (!firingListeners);\r
2560 \r
2561                 firingListeners = true;\r
2562 \r
2563                 try {\r
2564 \r
2565                         // Performing may cause further events to be scheduled.\r
2566                         while (!scheduledListeners.isEmpty()) {\r
2567 \r
2568 //                              graph.restart();\r
2569 //                              graph.state.barrier.inc();\r
2570 \r
2571                                 // Clone current events to make new entries possible during\r
2572                                 // firing.\r
2573                                 THashSet<ListenerEntry> entries = scheduledListeners;\r
2574                                 scheduledListeners = new THashSet<ListenerEntry>();\r
2575 \r
2576                                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();\r
2577 \r
2578                                 for (ListenerEntry listenerEntry : entries) {\r
2579 \r
2580                                         if (pruneListener(listenerEntry)) {\r
2581                                                 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);\r
2582                                                 continue;\r
2583                                         }\r
2584 \r
2585                                         final CacheEntry entry = listenerEntry.entry;\r
2586                                         assert (entry != null);\r
2587 \r
2588                                         Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());\r
2589 \r
2590                                         if (newValue != ListenerEntry.NOT_CHANGED) {\r
2591                                                 if(DebugPolicy.LISTENER)\r
2592                                                         System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);\r
2593                                                 schedule.add(listenerEntry);\r
2594                                                 listenerEntry.setLastKnown(entry.getResult());\r
2595                                         }\r
2596 \r
2597                                 }\r
2598 \r
2599                                 for(ListenerEntry listenerEntry : schedule) {\r
2600                                         final CacheEntry entry = listenerEntry.entry;\r
2601                                         if(DebugPolicy.LISTENER)\r
2602                                                 System.out.println("Firing " + listenerEntry.procedure);\r
2603                                         try {\r
2604                                                 if(DebugPolicy.LISTENER)\r
2605                                                         System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);\r
2606                                                 entry.performFromCache(graph, this, listenerEntry.procedure);\r
2607                                         } catch (Throwable t) {\r
2608                                                 t.printStackTrace();\r
2609                                         }\r
2610                                 }\r
2611 \r
2612 //                              graph.state.barrier.dec();\r
2613 //                              graph.waitAsync(null);\r
2614 //                              graph.state.barrier.assertReady();\r
2615 \r
2616                         }\r
2617 \r
2618                 } finally {\r
2619                         firingListeners = false;\r
2620                 }\r
2621 \r
2622         }\r
2623 \r
2624         /**\r
2625          * \r
2626          * @return true if this entry still has listeners\r
2627          */\r
2628         public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {\r
2629 \r
2630                 assert (!collecting);\r
2631                 assert (!updating);\r
2632                 updating = true;\r
2633 \r
2634                 boolean hadListeners = false;\r
2635                 boolean listenersUnknown = false;\r
2636 \r
2637                 try {\r
2638 \r
2639                         assert(entry != null);\r
2640                         LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();\r
2641                         IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();\r
2642                         todo.add(new UpdateEntry(null, entry, 0));\r
2643 \r
2644                         while(true) {\r
2645 \r
2646                                 // Walk the tree and collect immediate updates\r
2647                                 while (!todo.isEmpty()) {\r
2648                                         UpdateEntry e = todo.pop();\r
2649                                         hadListeners |= updateQuery(e, todo, immediates);\r
2650                                 }\r
2651 \r
2652                                 if(immediates.isEmpty()) break;\r
2653 \r
2654                                 // Evaluate all immediate updates and collect parents to update\r
2655                                 for(CacheEntry immediate : immediates.values()) {\r
2656 \r
2657                                         if(immediate.isDiscarded()) {\r
2658                                                 continue;\r
2659                                         }\r
2660 \r
2661                                         if(immediate.isExcepted()) {\r
2662 \r
2663                                                 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);\r
2664                                                 if (newValue != ListenerEntry.NOT_CHANGED)\r
2665                                                         updateParents(0, immediate, todo);\r
2666 \r
2667                                         } else {\r
2668 \r
2669                                                 Object oldValue = immediate.getResult();\r
2670                                                 Object newValue = compareTo(graph, immediate, oldValue);\r
2671 \r
2672                                                 if (newValue != ListenerEntry.NOT_CHANGED) {\r
2673                                                         updateParents(0, immediate, todo);\r
2674                                                 } else {\r
2675                                                         // If not changed, keep the old value\r
2676                                                         immediate.setResult(oldValue);\r
2677                                                         listenersUnknown = true;\r
2678                                                 }\r
2679 \r
2680                                         }\r
2681 \r
2682                                 }\r
2683                                 immediates.clear();\r
2684 \r
2685                         }\r
2686 \r
2687                 } catch (Throwable t) {\r
2688                         Logger.defaultLogError(t);\r
2689                 }\r
2690 \r
2691                 assert (updating);\r
2692                 updating = false;\r
2693 \r
2694                 return hadListeners | listenersUnknown;\r
2695 \r
2696         }\r
2697 \r
2698         volatile public boolean dirty = false;\r
2699 \r
2700         private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();\r
2701         private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();\r
2702         private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();\r
2703         // Maybe use a mutex from util.concurrent?\r
2704         private Object primitiveUpdateLock = new Object();\r
2705         private THashSet scheduledPrimitiveUpdates = new THashSet();\r
2706 \r
2707         public void performDirtyUpdates(final ReadGraphImpl graph) {\r
2708 \r
2709                 dirty = false;\r
2710                 lastInvalidate = 0;\r
2711 \r
2712                 if (Development.DEVELOPMENT) {\r
2713                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2714                                 System.err.println("== Query update ==");\r
2715                         }\r
2716                 }\r
2717 \r
2718                 // Special case - one statement\r
2719                 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
2720 \r
2721                         long arg0 = scheduledObjectUpdates.getFirst();\r
2722 \r
2723                         final int subject = (int)(arg0 >>> 32);\r
2724                         final int predicate = (int)(arg0 & 0xffffffff);\r
2725 \r
2726                         for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
2727                         for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
2728                         for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
2729 \r
2730                         if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
2731                                 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
2732                                 if(principalTypes != null) update(graph, principalTypes);\r
2733                                 Types types = Types.entry(QueryProcessor.this, subject);\r
2734                                 if(types != null) update(graph, types);\r
2735                         }\r
2736 \r
2737                         if(predicate == subrelationOf) {\r
2738                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
2739                                 if(superRelations != null) update(graph, superRelations);\r
2740                         }\r
2741 \r
2742                         DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);\r
2743                         if(dp != null) update(graph, dp);\r
2744                         OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);\r
2745                         if(os != null) update(graph, os);\r
2746 \r
2747                         scheduledObjectUpdates.clear();\r
2748                         return;\r
2749 \r
2750                 }\r
2751 \r
2752                 // Special case - one value\r
2753                 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
2754 \r
2755                         int arg0 = scheduledValueUpdates.getFirst();\r
2756 \r
2757                         ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
2758                         if(valueQuery != null) update(graph, valueQuery);\r
2759 \r
2760                         scheduledValueUpdates.clear();\r
2761                         return;\r
2762 \r
2763                 }\r
2764 \r
2765                 final TIntHashSet predicates = new TIntHashSet();\r
2766                 final TIntHashSet orderedSets = new TIntHashSet();\r
2767 \r
2768                 THashSet primitiveUpdates;\r
2769                 synchronized (primitiveUpdateLock) {\r
2770                         primitiveUpdates = scheduledPrimitiveUpdates;\r
2771                         scheduledPrimitiveUpdates = new THashSet();\r
2772                 }\r
2773 \r
2774                 primitiveUpdates.forEach(new TObjectProcedure() {\r
2775 \r
2776                         @Override\r
2777                         public boolean execute(Object arg0) {\r
2778 \r
2779                                 ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);\r
2780                                 if (query != null) {\r
2781                                         boolean listening = update(graph, query);\r
2782                                         if (!listening && !query.hasParents()) {\r
2783                                                 externalReadMap.remove(arg0);\r
2784                                                 query.discard();\r
2785                                         }\r
2786                                 }\r
2787                                 return true;\r
2788                         }\r
2789 \r
2790                 });\r
2791 \r
2792                 scheduledValueUpdates.forEach(new TIntProcedure() {\r
2793 \r
2794                         @Override\r
2795                         public boolean execute(int arg0) {\r
2796                                 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
2797                                 if(valueQuery != null) update(graph, valueQuery);\r
2798                                 return true;\r
2799                         }\r
2800 \r
2801                 });\r
2802 \r
2803                 scheduledInvalidates.forEach(new TIntProcedure() {\r
2804 \r
2805                         @Override\r
2806                         public boolean execute(int resource) {\r
2807                                 \r
2808                                 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);\r
2809                                 if(valueQuery != null) update(graph, valueQuery);\r
2810                                 \r
2811                                 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);\r
2812                                 if(principalTypes != null) update(graph, principalTypes);\r
2813                                 Types types = Types.entry(QueryProcessor.this, resource);\r
2814                                 if(types != null) update(graph, types);\r
2815 \r
2816                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);\r
2817                                 if(superRelations != null) update(graph, superRelations);\r
2818 \r
2819                                 predicates.add(resource);\r
2820                                 \r
2821                                 return true;\r
2822                         }\r
2823 \r
2824                 });\r
2825 \r
2826                 scheduledObjectUpdates.forEach(new TLongProcedure() {\r
2827 \r
2828                         @Override\r
2829                         public boolean execute(long arg0) {\r
2830 \r
2831                                 final int subject = (int)(arg0 >>> 32);\r
2832                                 final int predicate = (int)(arg0 & 0xffffffff);\r
2833 \r
2834                                 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
2835                                         PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
2836                                         if(principalTypes != null) update(graph, principalTypes);\r
2837                                         Types types = Types.entry(QueryProcessor.this, subject);\r
2838                                         if(types != null) update(graph, types);\r
2839                                 }\r
2840 \r
2841                                 if(predicate == subrelationOf) {\r
2842                                         SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
2843                                         if(superRelations != null) update(graph, superRelations);\r
2844                                 }\r
2845 \r
2846                                 predicates.add(subject);\r
2847                                 orderedSets.add(predicate);\r
2848 \r
2849                                 return true;\r
2850 \r
2851                         }\r
2852 \r
2853                 });\r
2854 \r
2855                 predicates.forEach(new TIntProcedure() {\r
2856 \r
2857                         @Override\r
2858                         public boolean execute(final int subject) {\r
2859 \r
2860                                 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
2861                                 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
2862                                 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
2863 \r
2864                                 DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
2865                                 if(entry != null) update(graph, entry);\r
2866 \r
2867                                 return true;\r
2868 \r
2869                         }\r
2870 \r
2871                 });\r
2872 \r
2873                 orderedSets.forEach(new TIntProcedure() {\r
2874 \r
2875                         @Override\r
2876                         public boolean execute(int orderedSet) {\r
2877 \r
2878                                 OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);\r
2879                                 if(entry != null) update(graph, entry);\r
2880 \r
2881                                 return true;\r
2882 \r
2883                         }\r
2884 \r
2885                 });\r
2886 \r
2887                 //              for (Integer subject : predicates) {\r
2888                 //                      DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
2889                 //                      if(entry != null) update(graph, entry);\r
2890                 //              }\r
2891 \r
2892 \r
2893                 if (Development.DEVELOPMENT) {\r
2894                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
2895                                 System.err.println("== Query update ends ==");\r
2896                         }\r
2897                 }\r
2898 \r
2899                 scheduledValueUpdates.clear();\r
2900                 scheduledObjectUpdates.clear();\r
2901                 scheduledInvalidates.clear();\r
2902 \r
2903         }\r
2904 \r
2905         public void updateValue(final int resource) {\r
2906                 scheduledValueUpdates.add(resource);\r
2907                 dirty = true;\r
2908         }\r
2909 \r
2910         public void updateStatements(final int resource, final int predicate) {\r
2911                 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);\r
2912                 dirty = true;\r
2913         }\r
2914         \r
2915         private int lastInvalidate = 0;\r
2916         \r
2917         public void invalidateResource(final int resource) {\r
2918                 if(lastInvalidate == resource) return;\r
2919                 scheduledValueUpdates.add(resource);\r
2920                 lastInvalidate = resource;\r
2921                 dirty = true;\r
2922         }\r
2923 \r
2924         public void updatePrimitive(final ExternalRead primitive) {\r
2925 \r
2926                 // External reads may be updated from arbitrary threads.\r
2927                 // Synchronize to prevent race-conditions.\r
2928                 synchronized (primitiveUpdateLock) {\r
2929                         scheduledPrimitiveUpdates.add(primitive);\r
2930                 }\r
2931                 querySupport.dirtyPrimitives();\r
2932 \r
2933         }\r
2934 \r
2935         @Override\r
2936         public synchronized String toString() {\r
2937                 return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";\r
2938         }\r
2939 \r
2940         @Override\r
2941         protected void doDispose() {\r
2942 \r
2943                 for(int index = 0; index < THREADS; index++) { \r
2944                         executors[index].dispose();\r
2945                 }\r
2946 \r
2947                 // First just wait\r
2948                 for(int i=0;i<100;i++) {\r
2949 \r
2950                         boolean alive = false;\r
2951                         for(int index = 0; index < THREADS; index++) { \r
2952                                 alive |= executors[index].isAlive();\r
2953                         }\r
2954                         if(!alive) return;\r
2955                         try {\r
2956                                 Thread.sleep(5);\r
2957                         } catch (InterruptedException e) {\r
2958                                 Logger.defaultLogError(e);\r
2959                         }\r
2960 \r
2961                 }\r
2962 \r
2963                 // Then start interrupting\r
2964                 for(int i=0;i<100;i++) {\r
2965 \r
2966                         boolean alive = false;\r
2967                         for(int index = 0; index < THREADS; index++) { \r
2968                                 alive |= executors[index].isAlive();\r
2969                         }\r
2970                         if(!alive) return;\r
2971                         for(int index = 0; index < THREADS; index++) {\r
2972                                 executors[index].interrupt();\r
2973                         }\r
2974                 }\r
2975 \r
2976                 //              // Then just destroy\r
2977                 //              for(int index = 0; index < THREADS; index++) {\r
2978                 //                      executors[index].destroy();\r
2979                 //              }\r
2980 \r
2981                 for(int index = 0; index < THREADS; index++) {\r
2982                         try {\r
2983                                 executors[index].join(5000);\r
2984                         } catch (InterruptedException e) {\r
2985                                 Logger.defaultLogError("QueryThread " + index + " will not die.", e);\r
2986                         }\r
2987                         executors[index] = null;\r
2988                 }\r
2989 \r
2990         }\r
2991 \r
2992         public int getHits() {\r
2993                 return hits;\r
2994         }\r
2995 \r
2996         public int getMisses() {\r
2997                 return misses;\r
2998         }\r
2999 \r
3000         public int getSize() {\r
3001                 return size;\r
3002         }\r
3003 \r
3004         public Set<Long> getReferencedClusters() {\r
3005                 HashSet<Long> result = new HashSet<Long>();\r
3006                 for (CacheEntry entry : objectsMap.values()) {\r
3007                         Objects query = (Objects) entry.getQuery();\r
3008                         result.add(querySupport.getClusterId(query.r1()));\r
3009                 }\r
3010                 for (CacheEntry entry : directPredicatesMap.values()) {\r
3011                         DirectPredicates query = (DirectPredicates) entry.getQuery();\r
3012                         result.add(querySupport.getClusterId(query.id));\r
3013                 }\r
3014                 for (CacheEntry entry : valueMap.values()) {\r
3015                         ValueQuery query = (ValueQuery) entry.getQuery();\r
3016                         result.add(querySupport.getClusterId(query.id));\r
3017                 }\r
3018                 return result;\r
3019         }\r
3020 \r
3021         public void assertDone() {\r
3022         }\r
3023 \r
3024         CacheCollectionResult allCaches(CacheCollectionResult result) {\r
3025 \r
3026                 int level = Integer.MAX_VALUE;\r
3027                 directPredicatesMap.values(level, result);\r
3028                 principalTypesMap.values(level, result);\r
3029                 for(CacheEntryBase e : uriToResourceMap.values())\r
3030                         if(e.getLevel() <= level)\r
3031                                 result.add(e);\r
3032                 for(CacheEntryBase e : namespaceIndexMap22.values())\r
3033                         if(e.getLevel() <= level)\r
3034                                 result.add(e);\r
3035                 projectsMap.values(level, result);\r
3036                 \r
3037                 relationInfoMap.values(level, result);\r
3038                 superTypesMap.values(level, result);\r
3039                 typeHierarchyMap.values(level, result);\r
3040                 superRelationsMap.values(level, result);\r
3041                 typesMap.values(level, result);\r
3042 \r
3043                 valueMap.values(level, result);\r
3044                 directObjectsMap.values(level, result);\r
3045                 objectsMap.values(level, result);\r
3046                 orderedSetMap.values(level, result);\r
3047                 predicatesMap.values(level, result);\r
3048 \r
3049                 statementsMap.values(level, result);\r
3050                 assertedPredicatesMap.values(level, result);\r
3051                 assertedStatementsMap.values(level, result);\r
3052                 externalReadMap.values(level, result);\r
3053                 asyncReadMap.values(level, result);\r
3054                 \r
3055                 readMap.values(level, result);\r
3056                 asyncMultiReadMap.values(level, result);\r
3057                 multiReadMap.values(level, result);\r
3058 \r
3059                 return result;\r
3060 \r
3061         }\r
3062 \r
3063         public void printDiagnostics() {\r
3064         }\r
3065 \r
3066         public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {\r
3067                 querySupport.requestCluster(graph, clusterId, runnable);\r
3068         }\r
3069 \r
3070         public int clean() {\r
3071                 collector.collect(0, Integer.MAX_VALUE);\r
3072                 return size;\r
3073         }\r
3074 \r
3075         public void clean(final Collection<ExternalRead<?>> requests) {\r
3076                 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {\r
3077                         Iterator<ExternalRead<?>> iterator = requests.iterator();\r
3078                         @Override\r
3079                         public CacheCollectionResult allCaches() {\r
3080                                 throw new UnsupportedOperationException();\r
3081                         }\r
3082                         @Override\r
3083                         public CacheEntryBase iterate(int level) {\r
3084                                 if(iterator.hasNext()) {\r
3085                                         ExternalRead<?> request = iterator.next();\r
3086                                         ExternalReadEntry entry = externalReadMap.get(request);\r
3087                                         if (entry != null) return entry;\r
3088                                         else return iterate(level);\r
3089                                 } else {\r
3090                                         iterator = requests.iterator();\r
3091                                         return null;\r
3092                                 }\r
3093                         }\r
3094                         @Override\r
3095                         public void remove() {\r
3096                                 throw new UnsupportedOperationException();\r
3097                         }\r
3098                         @Override\r
3099                         public void setLevel(CacheEntryBase entry, int level) {\r
3100                                 throw new UnsupportedOperationException();\r
3101                         }\r
3102                         @Override\r
3103                         public Collection<CacheEntry> getRootList() {\r
3104                                 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());\r
3105                                 for (ExternalRead<?> request : requests) {\r
3106                                         ExternalReadEntry entry = externalReadMap.get(request);\r
3107                                         if (entry != null)\r
3108                                                 result.add(entry);\r
3109                                 }\r
3110                                 return result;\r
3111                         }\r
3112                         @Override\r
3113                         public int getCurrentSize() {\r
3114                                 return size;\r
3115                         }\r
3116                         @Override\r
3117                         public int calculateCurrentSize() {\r
3118                                 // This tells the collector to attempt collecting everything.\r
3119                                 return Integer.MAX_VALUE;\r
3120                         }\r
3121                         @Override\r
3122                         public boolean start(boolean flush) {\r
3123                                 return true;\r
3124                         }\r
3125                 };\r
3126                 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);\r
3127         }\r
3128 \r
3129         public void scanPending() {\r
3130 \r
3131                 ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();\r
3132 \r
3133                 entries.addAll(directPredicatesMap.values());\r
3134                 entries.addAll(principalTypesMap.values());\r
3135                 entries.addAll(uriToResourceMap.values());\r
3136                 entries.addAll(namespaceIndexMap22.values());\r
3137                 entries.addAll(projectsMap.values());\r
3138                 entries.addAll(relationInfoMap.values());\r
3139                 entries.addAll(superTypesMap.values());\r
3140                 entries.addAll(superRelationsMap.values());\r
3141                 entries.addAll(typesMap.values());\r
3142                 entries.addAll(valueMap.values());\r
3143                 entries.addAll(directObjectsMap.values());\r
3144                 entries.addAll(objectsMap.values());\r
3145                 entries.addAll(orderedSetMap.values());\r
3146                 entries.addAll(predicatesMap.values());\r
3147                 entries.addAll(orderedSetMap.values());\r
3148                 entries.addAll(statementsMap.values());\r
3149                 //                      entries.addAll(assertedObjectsMap.values());\r
3150                 entries.addAll(assertedPredicatesMap.values());\r
3151                 entries.addAll(assertedStatementsMap.values());\r
3152                 entries.addAll(externalReadMap.values());\r
3153                 entries.addAll(asyncReadMap.values());\r
3154                 entries.addAll(externalReadMap.values());\r
3155                 entries.addAll(readMap.values());\r
3156                 entries.addAll(asyncMultiReadMap.values());\r
3157                 entries.addAll(multiReadMap.values());\r
3158                 entries.addAll(readMap.values());\r
3159                 System.out.println(entries.size() + " entries.");\r
3160                 for(Object e : entries) {\r
3161                         if(e instanceof CacheEntry) {\r
3162                                 CacheEntry en = (CacheEntry)e;\r
3163                                 if(en.isPending()) System.out.println("pending " + e);\r
3164                                 if(en.isExcepted()) System.out.println("excepted " + e);\r
3165                                 if(en.isDiscarded()) System.out.println("discarded " + e);\r
3166                                 if(en.isRefuted()) System.out.println("refuted " + e);\r
3167                                 if(en.isFresh()) System.out.println("fresh " + e);\r
3168                         } else {\r
3169                                 //System.out.println("Unknown object " + e);\r
3170                         }\r
3171                 }\r
3172 \r
3173         }\r
3174 \r
3175         public ReadGraphImpl graphForVirtualRequest() {\r
3176                 return ReadGraphImpl.createAsync(this);\r
3177         }\r
3178 \r
3179         \r
3180         private HashMap<Resource, Class<?>> builtinValues;\r
3181         \r
3182         public Class<?> getBuiltinValue(Resource r) {\r
3183                 if(builtinValues == null) initBuiltinValues();\r
3184                 return builtinValues.get(r);\r
3185         }\r
3186 \r
3187         Exception callerException = null;\r
3188 \r
3189         public interface AsyncBarrier {\r
3190                 public void inc(); \r
3191                 public void dec();\r
3192                 //        public void inc(String debug); \r
3193                 //        public void dec(String debug);\r
3194         }\r
3195 \r
3196 //      final public QueryProcessor processor;\r
3197 //      final public QuerySupport support;\r
3198 \r
3199         //    boolean disposed = false;\r
3200 \r
3201         private void initBuiltinValues() {\r
3202 \r
3203                 Layer0 b = getSession().peekService(Layer0.class);\r
3204                 if(b == null) return;\r
3205 \r
3206                 builtinValues = new HashMap<Resource, Class<?>>();\r
3207 \r
3208                 builtinValues.put(b.String, String.class);\r
3209                 builtinValues.put(b.Double, Double.class);\r
3210                 builtinValues.put(b.Float, Float.class);\r
3211                 builtinValues.put(b.Long, Long.class);\r
3212                 builtinValues.put(b.Integer, Integer.class);\r
3213                 builtinValues.put(b.Byte, Byte.class);\r
3214                 builtinValues.put(b.Boolean, Boolean.class);\r
3215 \r
3216                 builtinValues.put(b.StringArray, String[].class);\r
3217                 builtinValues.put(b.DoubleArray, double[].class);\r
3218                 builtinValues.put(b.FloatArray, float[].class);\r
3219                 builtinValues.put(b.LongArray, long[].class);\r
3220                 builtinValues.put(b.IntegerArray, int[].class);\r
3221                 builtinValues.put(b.ByteArray, byte[].class);\r
3222                 builtinValues.put(b.BooleanArray, boolean[].class);\r
3223 \r
3224         }\r
3225 \r
3226 //      public ReadGraphSupportImpl(final QueryProcessor provider2) {\r
3227 //\r
3228 //              if (null == provider2) {\r
3229 //                      this.processor = null;\r
3230 //                      support = null;\r
3231 //                      return;\r
3232 //              }\r
3233 //              this.processor = provider2;\r
3234 //              support = provider2.getCore();\r
3235 //              initBuiltinValues();\r
3236 //\r
3237 //      }\r
3238 \r
3239 //      final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {\r
3240 //              return new ReadGraphSupportImpl(impl.processor);\r
3241 //      }\r
3242 \r
3243         @Override\r
3244         final public Session getSession() {\r
3245                 return session;\r
3246         }\r
3247         \r
3248         final public ResourceSupport getResourceSupport() {\r
3249                 return resourceSupport;\r
3250         }\r
3251 \r
3252         @Override\r
3253         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
3254 \r
3255                 assert(subject != null);\r
3256                 assert(procedure != null);\r
3257 \r
3258                 final ListenerBase listener = getListenerBase(procedure);\r
3259 \r
3260                 IntProcedure ip = new IntProcedure() {\r
3261 \r
3262                         AtomicBoolean first = new AtomicBoolean(true);\r
3263 \r
3264                         @Override\r
3265                         public void execute(ReadGraphImpl graph, int i) {\r
3266                                 try {\r
3267                                         if(first.get()) {\r
3268                                                 procedure.execute(graph, querySupport.getResource(i));\r
3269                                         } else {\r
3270                                                 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
3271                                         }\r
3272                                 } catch (Throwable t2) {\r
3273                                         Logger.defaultLogError(t2);\r
3274                                 }\r
3275                         }\r
3276 \r
3277                         @Override\r
3278                         public void finished(ReadGraphImpl graph) {\r
3279                                 try {\r
3280                                         if(first.compareAndSet(true, false)) {\r
3281                                                 procedure.finished(graph);\r
3282 //                                              impl.state.barrier.dec(this);\r
3283                                         } else {\r
3284                                                 procedure.finished(impl.newRestart(graph));\r
3285                                         }\r
3286 \r
3287                                 } catch (Throwable t2) {\r
3288                                         Logger.defaultLogError(t2);\r
3289                                 }\r
3290                         }\r
3291 \r
3292                         @Override\r
3293                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3294                                 try {\r
3295                                         if(first.compareAndSet(true, false)) {\r
3296                                                 procedure.exception(graph, t);\r
3297 //                                              impl.state.barrier.dec(this);\r
3298                                         } else {\r
3299                                                 procedure.exception(impl.newRestart(graph), t);\r
3300                                         }\r
3301                                 } catch (Throwable t2) {\r
3302                                         Logger.defaultLogError(t2);\r
3303                                 }\r
3304                         }\r
3305 \r
3306                 };\r
3307 \r
3308                 int sId = querySupport.getId(subject);\r
3309 \r
3310 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);\r
3311 //              else impl.state.barrier.inc(null, null);\r
3312 \r
3313                 Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);\r
3314 \r
3315         }\r
3316 \r
3317         @Override\r
3318         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
3319 \r
3320                 assert(subject != null);\r
3321                 assert(procedure != null);\r
3322 \r
3323                 final ListenerBase listener = getListenerBase(procedure);\r
3324 \r
3325 //              impl.state.barrier.inc();\r
3326 \r
3327                 Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
3328 \r
3329                         @Override\r
3330                         public void execute(ReadGraphImpl graph, int i) {\r
3331                                 try {\r
3332                                         procedure.execute(querySupport.getResource(i));\r
3333                                 } catch (Throwable t2) {\r
3334                                         Logger.defaultLogError(t2);\r
3335                                 }\r
3336                         }\r
3337 \r
3338                         @Override\r
3339                         public void finished(ReadGraphImpl graph) {\r
3340                                 try {\r
3341                                         procedure.finished();\r
3342                                 } catch (Throwable t2) {\r
3343                                         Logger.defaultLogError(t2);\r
3344                                 }\r
3345 //                              impl.state.barrier.dec();\r
3346                         }\r
3347 \r
3348                         @Override\r
3349                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3350                                 try {\r
3351                                         procedure.exception(t);\r
3352                                 } catch (Throwable t2) {\r
3353                                         Logger.defaultLogError(t2);\r
3354                                 }\r
3355 //                              impl.state.barrier.dec();\r
3356                         }\r
3357 \r
3358                 });\r
3359 \r
3360         }\r
3361         \r
3362         @Override\r
3363         final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
3364 \r
3365                 assert(subject != null);\r
3366                 \r
3367                 return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
3368 \r
3369         }\r
3370         \r
3371 \r
3372         @Override\r
3373         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
3374                         final Resource predicate, final MultiProcedure<Statement> procedure) {\r
3375 \r
3376                 assert(subject != null);\r
3377                 assert(predicate != null);\r
3378                 assert(procedure != null);\r
3379 \r
3380                 final ListenerBase listener = getListenerBase(procedure);\r
3381 \r
3382 //              impl.state.barrier.inc();\r
3383 \r
3384                 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
3385 \r
3386                         @Override\r
3387                         public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
3388                                 try {\r
3389                                         procedure.execute(querySupport.getStatement(s, p, o));\r
3390                                 } catch (Throwable t2) {\r
3391                                         Logger.defaultLogError(t2);\r
3392                                 }\r
3393                         }\r
3394 \r
3395                         @Override\r
3396                         public void finished(ReadGraphImpl graph) {\r
3397                                 try {\r
3398                                         procedure.finished();\r
3399                                 } catch (Throwable t2) {\r
3400                                         Logger.defaultLogError(t2);\r
3401                                 }\r
3402 //                              impl.state.barrier.dec();\r
3403                         }\r
3404 \r
3405                         @Override\r
3406                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3407                                 try {\r
3408                                         procedure.exception(t);\r
3409                                 } catch (Throwable t2) {\r
3410                                         Logger.defaultLogError(t2);\r
3411                                 }\r
3412 //                              impl.state.barrier.dec();\r
3413                         }\r
3414 \r
3415                 });\r
3416 \r
3417         }\r
3418 \r
3419         @Override\r
3420         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
3421                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
3422 \r
3423                 assert(subject != null);\r
3424                 assert(predicate != null);\r
3425                 assert(procedure != null);\r
3426 \r
3427                 final ListenerBase listener = getListenerBase(procedure);\r
3428 \r
3429                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
3430 \r
3431                         boolean first = true;\r
3432 \r
3433                         @Override\r
3434                         public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
3435                                 try {\r
3436                                         if(first) {\r
3437                                                 procedure.execute(graph, querySupport.getStatement(s, p, o));\r
3438                                         } else {\r
3439                                                 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));\r
3440                                         }\r
3441                                 } catch (Throwable t2) {\r
3442                                         Logger.defaultLogError(t2);\r
3443                                 }\r
3444                         }\r
3445 \r
3446                         @Override\r
3447                         public void finished(ReadGraphImpl graph) {\r
3448 \r
3449                                 try {\r
3450                                         if(first) {\r
3451                                                 first = false;\r
3452                                                 procedure.finished(graph);\r
3453 //                                              impl.state.barrier.dec(this);\r
3454                                         } else {\r
3455                                                 procedure.finished(impl.newRestart(graph));\r
3456                                         }\r
3457                                 } catch (Throwable t2) {\r
3458                                         Logger.defaultLogError(t2);\r
3459                                 }\r
3460 \r
3461                         }\r
3462 \r
3463                         @Override\r
3464                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3465 \r
3466                                 try {\r
3467                                         if(first) {\r
3468                                                 first = false;\r
3469                                                 procedure.exception(graph, t);\r
3470 //                                              impl.state.barrier.dec(this);\r
3471                                         } else {\r
3472                                                 procedure.exception(impl.newRestart(graph), t);\r
3473                                         }\r
3474                                 } catch (Throwable t2) {\r
3475                                         Logger.defaultLogError(t2);\r
3476                                 }\r
3477 \r
3478                         }\r
3479 \r
3480                 };\r
3481 \r
3482                 int sId = querySupport.getId(subject);\r
3483                 int pId = querySupport.getId(predicate);\r
3484 \r
3485 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
3486 //              else impl.state.barrier.inc(null, null);\r
3487 \r
3488                 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
3489 \r
3490         }\r
3491 \r
3492         @Override\r
3493         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
3494                         final Resource predicate, final StatementProcedure procedure) {\r
3495 \r
3496                 assert(subject != null);\r
3497                 assert(predicate != null);\r
3498                 assert(procedure != null);\r
3499 \r
3500                 final ListenerBase listener = getListenerBase(procedure);\r
3501 \r
3502                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
3503 \r
3504                         boolean first = true;\r
3505 \r
3506                         @Override\r
3507                         public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
3508                                 try {\r
3509                                         if(first) {\r
3510                                                 procedure.execute(graph, s, p, o);\r
3511                                         } else {\r
3512                                                 procedure.execute(impl.newRestart(graph), s, p, o);\r
3513                                         }\r
3514                                 } catch (Throwable t2) {\r
3515                                         Logger.defaultLogError(t2);\r
3516                                 }\r
3517                         }\r
3518 \r
3519                         @Override\r
3520                         public void finished(ReadGraphImpl graph) {\r
3521 \r
3522                                 try {\r
3523                                         if(first) {\r
3524                                                 first = false;\r
3525                                                 procedure.finished(graph);\r
3526 //                                              impl.state.barrier.dec(this);\r
3527                                         } else {\r
3528                                                 procedure.finished(impl.newRestart(graph));\r
3529                                         }\r
3530                                 } catch (Throwable t2) {\r
3531                                         Logger.defaultLogError(t2);\r
3532                                 }\r
3533 \r
3534                         }\r
3535 \r
3536                         @Override\r
3537                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3538 \r
3539                                 try {\r
3540                                         if(first) {\r
3541                                                 first = false;\r
3542                                                 procedure.exception(graph, t);\r
3543 //                                              impl.state.barrier.dec(this);\r
3544                                         } else {\r
3545                                                 procedure.exception(impl.newRestart(graph), t);\r
3546                                         }\r
3547                                 } catch (Throwable t2) {\r
3548                                         Logger.defaultLogError(t2);\r
3549                                 }\r
3550 \r
3551                         }\r
3552 \r
3553                 };\r
3554 \r
3555                 int sId = querySupport.getId(subject);\r
3556                 int pId = querySupport.getId(predicate);\r
3557 \r
3558 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
3559 //              else impl.state.barrier.inc(null, null);\r
3560 \r
3561                 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
3562 \r
3563         }\r
3564         \r
3565         @Override\r
3566         final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
3567 \r
3568                 assert(subject != null);\r
3569                 assert(predicate != null);\r
3570                 assert(procedure != null);\r
3571 \r
3572                 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
3573 \r
3574                         private Set<Statement> current = null;\r
3575                         private Set<Statement> run = new HashSet<Statement>();\r
3576 \r
3577                         @Override\r
3578                         public void execute(AsyncReadGraph graph, Statement result) {\r
3579 \r
3580                                 boolean found = false;\r
3581 \r
3582                                 if(current != null) {\r
3583 \r
3584                                         found = current.remove(result);\r
3585 \r
3586                                 }\r
3587 \r
3588                                 if(!found) procedure.add(graph, result);\r
3589 \r
3590                                 run.add(result);\r
3591 \r
3592                         }\r
3593 \r
3594                         @Override\r
3595                         public void finished(AsyncReadGraph graph) {\r
3596 \r
3597                                 if(current != null) { \r
3598                                         for(Statement r : current) procedure.remove(graph, r);\r
3599                                 }\r
3600 \r
3601                                 current = run;\r
3602 \r
3603                                 run = new HashSet<Statement>();\r
3604 \r
3605                         }\r
3606 \r
3607                         @Override\r
3608                         public void exception(AsyncReadGraph graph, Throwable t) {\r
3609                                 procedure.exception(graph, t);\r
3610                         }\r
3611 \r
3612                         @Override\r
3613                         public boolean isDisposed() {\r
3614                                 return procedure.isDisposed();\r
3615                         }\r
3616 \r
3617                 });\r
3618 \r
3619         }\r
3620 \r
3621         @Override\r
3622         final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,\r
3623                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
3624 \r
3625                 assert(subject != null);\r
3626                 assert(predicate != null);\r
3627                 assert(procedure != null);\r
3628 \r
3629                 final ListenerBase listener = getListenerBase(procedure);\r
3630 \r
3631 //              impl.state.barrier.inc();\r
3632 \r
3633                 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
3634 \r
3635                         @Override\r
3636                         public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
3637                                 try {\r
3638                                         procedure.execute(graph, querySupport.getStatement(s, p, o));\r
3639                                 } catch (Throwable t2) {\r
3640                                         Logger.defaultLogError(t2);\r
3641                                 }\r
3642                         }\r
3643 \r
3644                         @Override\r
3645                         public void finished(ReadGraphImpl graph) {\r
3646                                 try {\r
3647                                         procedure.finished(graph);\r
3648                                 } catch (Throwable t2) {\r
3649                                         Logger.defaultLogError(t2);\r
3650                                 }\r
3651 //                              impl.state.barrier.dec();\r
3652                         }\r
3653 \r
3654                         @Override\r
3655                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3656                                 try {\r
3657                                         procedure.exception(graph, t);\r
3658                                 } catch (Throwable t2) {\r
3659                                         Logger.defaultLogError(t2);\r
3660                                 }\r
3661 //                              impl.state.barrier.dec();\r
3662                         }\r
3663 \r
3664                 });\r
3665 \r
3666         }\r
3667 \r
3668         private static ListenerBase getListenerBase(Object procedure) {\r
3669                 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;\r
3670                 else return null;\r
3671         }\r
3672 \r
3673         @Override\r
3674         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {\r
3675 \r
3676                 assert(subject != null);\r
3677                 assert(predicate != null);\r
3678                 assert(procedure != null);\r
3679 \r
3680                 final ListenerBase listener = getListenerBase(procedure);\r
3681 \r
3682 //              impl.state.barrier.inc();\r
3683 \r
3684                 Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {\r
3685 \r
3686                         @Override\r
3687                         public void execute(ReadGraphImpl graph, int i) {\r
3688                                 try {\r
3689                                         procedure.execute(querySupport.getResource(i));\r
3690                                 } catch (Throwable t2) {\r
3691                                         Logger.defaultLogError(t2);\r
3692                                 }\r
3693                         }\r
3694 \r
3695                         @Override\r
3696                         public void finished(ReadGraphImpl graph) {\r
3697                                 try {\r
3698                                         procedure.finished();\r
3699                                 } catch (Throwable t2) {\r
3700                                         Logger.defaultLogError(t2);\r
3701                                 }\r
3702 //                              impl.state.barrier.dec();\r
3703                         }\r
3704 \r
3705                         @Override\r
3706                         public void exception(ReadGraphImpl graph, Throwable t) {\r
3707                                 System.out.println("forEachObject exception " + t);\r
3708                                 try {\r
3709                                         procedure.exception(t);\r
3710                                 } catch (Throwable t2) {\r
3711                                         Logger.defaultLogError(t2);\r
3712                                 }\r
3713 //                              impl.state.barrier.dec();\r
3714                         }\r
3715 \r
3716                 });\r
3717 \r
3718         }\r
3719 \r
3720 \r
3721 //      @Override\r
3722 //      final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
3723 //\r
3724 //              assert(subject != null);\r
3725 //              assert(predicate != null);\r
3726 //              assert(procedure != null);\r
3727 //\r
3728 //              final ListenerBase listener = getListenerBase(procedure);\r
3729 //\r
3730 //              int sId = querySupport.getId(subject);\r
3731 //              int pId = querySupport.getId(predicate);\r
3732 //\r
3733 //              MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);\r
3734 //\r
3735 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);\r
3736 //              else impl.state.barrier.inc(null, null);\r
3737 //\r
3738 //              //        final Exception caller = new Exception();\r
3739 //\r
3740 //              //        final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());\r
3741 //\r
3742 //              DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);\r
3743 //\r
3744 //      }\r
3745 \r
3746         @Override\r
3747         final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
3748 \r
3749                 assert(subject != null);\r
3750                 assert(procedure != null);\r
3751 \r
3752                 final ListenerBase listener = getListenerBase(procedure);\r
3753 \r
3754                 MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);\r
3755 \r
3756                 int sId = querySupport.getId(subject);\r
3757 \r
3758 //              if(AsyncBarrierImpl.BOOKKEEPING)  impl.state.barrier.inc(proc, "#DirectPredicates" + sId);\r
3759 //              else impl.state.barrier.inc(null, null);\r
3760 \r
3761                 DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);\r
3762 \r
3763         }\r
3764 \r
3765         @Override\r
3766         final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {\r
3767 \r
3768                 assert(subject != null);\r
3769                 assert(procedure != null);\r
3770 \r
3771                 final ListenerBase listener = getListenerBase(procedure);\r
3772 \r
3773                 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
3774 \r
3775         }\r
3776 \r
3777         @Override\r
3778         final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {\r
3779 \r
3780                 assert(subject != null);\r
3781                 assert(procedure != null);\r
3782 \r
3783                 final ListenerBase listener = getListenerBase(procedure);\r
3784 \r
3785                 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);\r
3786 \r
3787         }\r
3788 \r
3789         private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);\r
3790 \r
3791         @Override\r
3792         final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {\r
3793 \r
3794                 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {\r
3795 \r
3796                         private Resource single = null;\r
3797 \r
3798                         @Override\r
3799                         public synchronized void execute(AsyncReadGraph graph, Resource result) {\r
3800                                 if(single == null) {\r
3801                                         single = result;\r
3802                                 } else {\r
3803                                         single = INVALID_RESOURCE;\r
3804                                 }\r
3805                         }\r
3806 \r
3807                         @Override\r
3808                         public synchronized void finished(AsyncReadGraph graph) {\r
3809                                 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);\r
3810                                 else procedure.execute(graph, single);\r
3811                         }\r
3812 \r
3813                         @Override\r
3814                         public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {\r
3815                                 procedure.exception(graph, throwable);\r
3816                         }\r
3817 \r
3818                 });\r
3819 \r
3820         }\r
3821 \r
3822         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {\r
3823                 \r
3824                 final int sId = querySupport.getId(subject);\r
3825                 final int pId = querySupport.getId(predicate);\r
3826 \r
3827                 Objects.runner(impl, sId, pId, impl.parent, listener, procedure);\r
3828                 \r
3829         }\r
3830         \r
3831         final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {\r
3832 \r
3833         final int sId = querySupport.getId(subject);\r
3834         final int pId = querySupport.getId(predicate);\r
3835 \r
3836         return Objects.runner2(impl, sId, pId, impl.parent);\r
3837             \r
3838         }\r
3839 \r
3840         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
3841 \r
3842                 assert(subject != null);\r
3843                 assert(predicate != null);\r
3844 \r
3845                 final ListenerBase listener = getListenerBase(procedure);\r
3846 \r
3847                 if(impl.parent != null || listener != null) {\r
3848 \r
3849                         IntProcedure ip = new IntProcedure() {\r
3850 \r
3851                                 AtomicBoolean first = new AtomicBoolean(true);\r
3852 \r
3853                                 @Override\r
3854                                 public void execute(ReadGraphImpl graph, int i) {\r
3855                                         try {\r
3856                                                 if(first.get()) {\r
3857                                                         procedure.execute(impl, querySupport.getResource(i));\r
3858                                                 } else {\r
3859                                                         procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
3860                                                 }\r
3861                                         } catch (Throwable t2) {\r
3862                                                 Logger.defaultLogError(t2);\r
3863                                         }\r
3864 \r
3865                                 }\r
3866 \r
3867                                 @Override\r
3868                                 public void finished(ReadGraphImpl graph) {\r
3869                                         try {\r
3870                                                 if(first.compareAndSet(true, false)) {\r
3871                                                         procedure.finished(impl);\r
3872 //                                                      impl.state.barrier.dec(this);\r
3873                                                 } else {\r
3874                                                         procedure.finished(impl.newRestart(graph));\r
3875                                                 }\r
3876                                         } catch (Throwable t2) {\r
3877                                                 Logger.defaultLogError(t2);\r
3878                                         }\r
3879                                 }\r
3880 \r
3881                                 @Override\r
3882                                 public void exception(ReadGraphImpl graph, Throwable t) {\r
3883                                         try {\r
3884                                                 procedure.exception(graph, t);\r
3885                                         } catch (Throwable t2) {\r
3886                                                 Logger.defaultLogError(t2);\r
3887                                         }\r
3888 //                                      impl.state.barrier.dec(this);\r
3889                                 }\r
3890 \r
3891                                 @Override\r
3892                                 public String toString() {\r
3893                                         return "forEachObject with " + procedure;\r
3894                                 }\r
3895 \r
3896                         };\r
3897 \r
3898 //                      if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);\r
3899 //                      else impl.state.barrier.inc(null, null);\r
3900 \r
3901                         forEachObject(impl, subject, predicate, listener, ip);\r
3902 \r
3903                 } else {\r
3904 \r
3905                         IntProcedure ip = new IntProcedure() {\r
3906 \r
3907                                 @Override\r
3908                                 public void execute(ReadGraphImpl graph, int i) {\r
3909                                         procedure.execute(graph, querySupport.getResource(i));\r
3910                                 }\r
3911 \r
3912                                 @Override\r
3913                                 public void finished(ReadGraphImpl graph) {\r
3914                                         procedure.finished(graph);\r
3915                                 }\r
3916 \r
3917                                 @Override\r
3918                                 public void exception(ReadGraphImpl graph, Throwable t) {\r
3919                                         procedure.exception(graph, t);\r
3920                                 }\r
3921 \r
3922                                 @Override\r
3923                                 public String toString() {\r
3924                                         return "forEachObject with " + procedure;\r
3925                                 }\r
3926 \r
3927                         };\r
3928 \r
3929                         forEachObject(impl, subject, predicate, listener, ip);\r
3930 \r
3931                 }\r
3932 \r
3933         }\r
3934 \r
3935         @Override\r
3936         final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
3937 \r
3938                 assert(subject != null);\r
3939                 assert(predicate != null);\r
3940                 assert(procedure != null);\r
3941 \r
3942                 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
3943 \r
3944                         private Set<Resource> current = null;\r
3945                         private Set<Resource> run = new HashSet<Resource>();\r
3946 \r
3947                         @Override\r
3948                         public void execute(AsyncReadGraph graph, Resource result) {\r
3949 \r
3950                                 boolean found = false;\r
3951 \r
3952                                 if(current != null) {\r
3953 \r
3954                                         found = current.remove(result);\r
3955 \r
3956                                 }\r
3957 \r
3958                                 if(!found) procedure.add(graph, result);\r
3959 \r
3960                                 run.add(result);\r
3961 \r
3962                         }\r
3963 \r
3964                         @Override\r
3965                         public void finished(AsyncReadGraph graph) {\r
3966 \r
3967                                 if(current != null) { \r
3968                                         for(Resource r : current) procedure.remove(graph, r);\r
3969                                 }\r
3970 \r
3971                                 current = run;\r
3972 \r
3973                                 run = new HashSet<Resource>();\r
3974 \r
3975                         }\r
3976 \r
3977                         @Override\r
3978                         public boolean isDisposed() {\r
3979                                 return procedure.isDisposed();\r
3980                         }\r
3981 \r
3982                         @Override\r
3983                         public void exception(AsyncReadGraph graph, Throwable t) {\r
3984                                 procedure.exception(graph, t);\r
3985                         }\r
3986 \r
3987                         @Override\r
3988                         public String toString() {\r
3989                                 return "forObjectSet " + procedure;\r
3990                         }\r
3991 \r
3992                 });\r
3993 \r
3994         }\r
3995 \r
3996         @Override\r
3997         final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
3998 \r
3999                 assert(subject != null);\r
4000                 assert(procedure != null);\r
4001 \r
4002                 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {\r
4003 \r
4004                         private Set<Resource> current = null;\r
4005                         private Set<Resource> run = new HashSet<Resource>();\r
4006 \r
4007                         @Override\r
4008                         public void execute(AsyncReadGraph graph, Resource result) {\r
4009 \r
4010                                 boolean found = false;\r
4011 \r
4012                                 if(current != null) {\r
4013 \r
4014                                         found = current.remove(result);\r
4015 \r
4016                                 }\r
4017 \r
4018                                 if(!found) procedure.add(graph, result);\r
4019 \r
4020                                 run.add(result);\r
4021 \r
4022                         }\r
4023 \r
4024                         @Override\r
4025                         public void finished(AsyncReadGraph graph) {\r
4026 \r
4027                                 if(current != null) { \r
4028                                         for(Resource r : current) procedure.remove(graph, r);\r
4029                                 }\r
4030 \r
4031                                 current = run;\r
4032 \r
4033                                 run = new HashSet<Resource>();\r
4034 \r
4035                         }\r
4036 \r
4037                         @Override\r
4038                         public boolean isDisposed() {\r
4039                                 return procedure.isDisposed();\r
4040                         }\r
4041 \r
4042                         @Override\r
4043                         public void exception(AsyncReadGraph graph, Throwable t) {\r
4044                                 procedure.exception(graph, t);\r
4045                         }\r
4046 \r
4047                         @Override\r
4048                         public String toString() {\r
4049                                 return "forPredicateSet " + procedure;\r
4050                         }\r
4051 \r
4052                 });\r
4053 \r
4054         }\r
4055 \r
4056         @Override\r
4057         final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
4058 \r
4059                 assert(subject != null);\r
4060                 assert(procedure != null);\r
4061 \r
4062                 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {\r
4063 \r
4064                         private Set<Resource> current = null;\r
4065                         private Set<Resource> run = new HashSet<Resource>();\r
4066 \r
4067                         @Override\r
4068                         public void execute(AsyncReadGraph graph, Resource result) {\r
4069 \r
4070                                 boolean found = false;\r
4071 \r
4072                                 if(current != null) {\r
4073 \r
4074                                         found = current.remove(result);\r
4075 \r
4076                                 }\r
4077 \r
4078                                 if(!found) procedure.add(graph, result);\r
4079 \r
4080                                 run.add(result);\r
4081 \r
4082                         }\r
4083 \r
4084                         @Override\r
4085                         public void finished(AsyncReadGraph graph) {\r
4086 \r
4087                                 if(current != null) { \r
4088                                         for(Resource r : current) procedure.remove(graph, r);\r
4089                                 }\r
4090 \r
4091                                 current = run;\r
4092 \r
4093                                 run = new HashSet<Resource>();\r
4094 \r
4095                         }\r
4096 \r
4097                         @Override\r
4098                         public boolean isDisposed() {\r
4099                                 return procedure.isDisposed();\r
4100                         }\r
4101 \r
4102                         @Override\r
4103                         public void exception(AsyncReadGraph graph, Throwable t) {\r
4104                                 procedure.exception(graph, t);\r
4105                         }\r
4106 \r
4107                         @Override\r
4108                         public String toString() {\r
4109                                 return "forPrincipalTypeSet " + procedure;\r
4110                         }\r
4111 \r
4112                 });\r
4113 \r
4114         }\r
4115 \r
4116         @Override\r
4117         final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
4118 \r
4119                 assert(subject != null);\r
4120                 assert(predicate != null);\r
4121                 assert(procedure != null);\r
4122 \r
4123                 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
4124 \r
4125                         private Set<Resource> current = null;\r
4126                         private Set<Resource> run = new HashSet<Resource>();\r
4127 \r
4128                         @Override\r
4129                         public void execute(AsyncReadGraph graph, Resource result) {\r
4130 \r
4131                                 boolean found = false;\r
4132 \r
4133                                 if(current != null) {\r
4134 \r
4135                                         found = current.remove(result);\r
4136 \r
4137                                 }\r
4138 \r
4139                                 if(!found) procedure.add(graph, result);\r
4140 \r
4141                                 run.add(result);\r
4142 \r
4143                         }\r
4144 \r
4145                         @Override\r
4146                         public void finished(AsyncReadGraph graph) {\r
4147 \r
4148                                 if(current != null) { \r
4149                                         for(Resource r : current) procedure.remove(graph, r);\r
4150                                 }\r
4151 \r
4152                                 current = run;\r
4153 \r
4154                                 run = new HashSet<Resource>();\r
4155 \r
4156                         }\r
4157 \r
4158                         @Override\r
4159                         public boolean isDisposed() {\r
4160                                 return procedure.isDisposed();\r
4161                         }\r
4162 \r
4163                         @Override\r
4164                         public void exception(AsyncReadGraph graph, Throwable t) {\r
4165                                 procedure.exception(graph, t);\r
4166                         }\r
4167 \r
4168                         @Override\r
4169                         public String toString() {\r
4170                                 return "forObjectSet " + procedure;\r
4171                         }\r
4172 \r
4173                 });\r
4174 \r
4175         }\r
4176 \r
4177         @Override\r
4178         final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
4179 \r
4180                 assert(subject != null);\r
4181                 assert(predicate != null);\r
4182                 assert(procedure != null);\r
4183 \r
4184                 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
4185 \r
4186                         private Set<Statement> current = null;\r
4187                         private Set<Statement> run = new HashSet<Statement>();\r
4188 \r
4189                         @Override\r
4190                         public void execute(AsyncReadGraph graph, Statement result) {\r
4191 \r
4192                                 boolean found = false;\r
4193 \r
4194                                 if(current != null) {\r
4195 \r
4196                                         found = current.remove(result);\r
4197 \r
4198                                 }\r
4199 \r
4200                                 if(!found) procedure.add(graph, result);\r
4201 \r
4202                                 run.add(result);\r
4203 \r
4204                         }\r
4205 \r
4206                         @Override\r
4207                         public void finished(AsyncReadGraph graph) {\r
4208 \r
4209                                 if(current != null) { \r
4210                                         for(Statement s : current) procedure.remove(graph, s);\r
4211                                 }\r
4212 \r
4213                                 current = run;\r
4214 \r
4215                                 run = new HashSet<Statement>();\r
4216 \r
4217                         }\r
4218 \r
4219                         @Override\r
4220                         public boolean isDisposed() {\r
4221                                 return procedure.isDisposed();\r
4222                         }\r
4223 \r
4224                         @Override\r
4225                         public void exception(AsyncReadGraph graph, Throwable t) {\r
4226                                 procedure.exception(graph, t);\r
4227                         }\r
4228 \r
4229                         @Override\r
4230                         public String toString() {\r
4231                                 return "forStatementSet " + procedure;\r
4232                         }\r
4233 \r
4234                 });\r
4235 \r
4236         }\r
4237 \r
4238         @Override\r
4239         final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,\r
4240                         final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
4241 \r
4242                 assert(subject != null);\r
4243                 assert(predicate != null);\r
4244                 assert(procedure != null);\r
4245 \r
4246                 final ListenerBase listener = getListenerBase(procedure);\r
4247 \r
4248 //              impl.state.barrier.inc();\r
4249 \r
4250                 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {\r
4251 \r
4252                         @Override\r
4253                         public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
4254                                 try {\r
4255                                         procedure.execute(graph, querySupport.getResource(o));\r
4256                                 } catch (Throwable t2) {\r
4257                                         Logger.defaultLogError(t2);\r
4258                                 }\r
4259                         }\r
4260 \r
4261                         @Override\r
4262                         public void finished(ReadGraphImpl graph) {\r
4263                                 try {               \r
4264                                         procedure.finished(graph);\r
4265                                 } catch (Throwable t2) {\r
4266                                         Logger.defaultLogError(t2);\r
4267                                 }\r
4268 //                              impl.state.barrier.dec();\r
4269                         }\r
4270 \r
4271                         @Override\r
4272                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4273                                 try {\r
4274                                         procedure.exception(graph, t);\r
4275                                 } catch (Throwable t2) {\r
4276                                         Logger.defaultLogError(t2);\r
4277                                 }\r
4278 //                              impl.state.barrier.dec();\r
4279                         }\r
4280 \r
4281                 });\r
4282 \r
4283         }\r
4284 \r
4285         @Override\r
4286         final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
4287 \r
4288                 assert(subject != null);\r
4289                 assert(procedure != null);\r
4290 \r
4291                 final ListenerBase listener = getListenerBase(procedure);\r
4292 \r
4293                 IntProcedure ip = new IntProcedure() {\r
4294 \r
4295                         @Override\r
4296                         public void execute(ReadGraphImpl graph, int i) {\r
4297                                 try {\r
4298                                         procedure.execute(graph, querySupport.getResource(i));\r
4299                                 } catch (Throwable t2) {\r
4300                                         Logger.defaultLogError(t2);\r
4301                                 }\r
4302                         }\r
4303 \r
4304                         @Override\r
4305                         public void finished(ReadGraphImpl graph) {\r
4306                                 try {\r
4307                                         procedure.finished(graph);\r
4308                                 } catch (Throwable t2) {\r
4309                                         Logger.defaultLogError(t2);\r
4310                                 }\r
4311 //                              impl.state.barrier.dec(this);\r
4312                         }\r
4313 \r
4314                         @Override\r
4315                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4316                                 try {\r
4317                                         procedure.exception(graph, t);\r
4318                                 } catch (Throwable t2) {\r
4319                                         Logger.defaultLogError(t2);\r
4320                                 }\r
4321 //                              impl.state.barrier.dec(this);\r
4322                         }\r
4323 \r
4324                 };\r
4325 \r
4326                 int sId = querySupport.getId(subject);\r
4327 \r
4328 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);\r
4329 //              else impl.state.barrier.inc(null, null);\r
4330 \r
4331                 PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);\r
4332 \r
4333         }\r
4334 \r
4335         @Override\r
4336         final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
4337 \r
4338                 assert(subject != null);\r
4339                 assert(procedure != null);\r
4340 \r
4341                 final ListenerBase listener = getListenerBase(procedure);\r
4342 \r
4343 //              impl.state.barrier.inc();\r
4344 \r
4345                 PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
4346 \r
4347                         @Override\r
4348                         public void execute(ReadGraphImpl graph, int i) {\r
4349                                 try {\r
4350                                         procedure.execute(querySupport.getResource(i));\r
4351                                 } catch (Throwable t2) {\r
4352                                         Logger.defaultLogError(t2);\r
4353                                 }\r
4354                         }\r
4355 \r
4356                         @Override\r
4357                         public void finished(ReadGraphImpl graph) {\r
4358                                 try {\r
4359                                         procedure.finished();\r
4360                                 } catch (Throwable t2) {\r
4361                                         Logger.defaultLogError(t2);\r
4362                                 }\r
4363 //                              impl.state.barrier.dec();\r
4364                         }\r
4365 \r
4366                         @Override\r
4367                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4368                                 try {\r
4369                                         procedure.exception(t);\r
4370                                 } catch (Throwable t2) {\r
4371                                         Logger.defaultLogError(t2);\r
4372                                 }\r
4373 //                              impl.state.barrier.dec();\r
4374                         }\r
4375 \r
4376                 });\r
4377 \r
4378         }\r
4379 \r
4380     final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
4381 \r
4382         assert(subject != null);\r
4383         assert(procedure != null);\r
4384 \r
4385         final ListenerBase listener = getListenerBase(procedure);\r
4386 \r
4387         InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
4388 \r
4389             AtomicBoolean first = new AtomicBoolean(true);\r
4390 \r
4391             @Override\r
4392             public void execute(final ReadGraphImpl graph, IntSet set) {\r
4393                 try {\r
4394                     if(first.compareAndSet(true, false)) {\r
4395                         procedure.execute(graph, set);\r
4396 //                      impl.state.barrier.dec(this);\r
4397                     } else {\r
4398                         procedure.execute(impl.newRestart(graph), set);\r
4399                     }\r
4400                 } catch (Throwable t2) {\r
4401                     Logger.defaultLogError(t2);\r
4402                 }\r
4403             }\r
4404 \r
4405             @Override\r
4406             public void exception(ReadGraphImpl graph, Throwable t) {\r
4407                 try {\r
4408                     if(first.compareAndSet(true, false)) {\r
4409                         procedure.exception(graph, t);\r
4410 //                      impl.state.barrier.dec(this);\r
4411                     } else {\r
4412                         procedure.exception(impl.newRestart(graph), t);\r
4413                     }\r
4414                 } catch (Throwable t2) {\r
4415                     Logger.defaultLogError(t2);\r
4416                 }\r
4417             }\r
4418 \r
4419         };\r
4420 \r
4421         int sId = querySupport.getId(subject);\r
4422 \r
4423 //      if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);\r
4424 //      else impl.state.barrier.inc(null, null);\r
4425 \r
4426         Types.queryEach(impl, sId, this, impl.parent, listener, ip);\r
4427 \r
4428     }\r
4429     \r
4430         @Override\r
4431         final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
4432 \r
4433                 assert(subject != null);\r
4434                 \r
4435                 return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
4436 \r
4437         }\r
4438 \r
4439         @Override\r
4440         final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {\r
4441 \r
4442                 assert(subject != null);\r
4443                 assert(procedure != null);\r
4444 \r
4445                 final ListenerBase listener = getListenerBase(procedure);\r
4446 \r
4447 //              impl.state.barrier.inc();\r
4448 \r
4449                 RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {\r
4450 \r
4451                         AtomicBoolean first = new AtomicBoolean(true);\r
4452 \r
4453                         @Override\r
4454                         public void execute(final ReadGraphImpl graph, RelationInfo set) {\r
4455                                 try {\r
4456                                         if(first.compareAndSet(true, false)) {\r
4457                                                 procedure.execute(graph, set);\r
4458 //                                              impl.state.barrier.dec();\r
4459                                         } else {\r
4460                                                 procedure.execute(impl.newRestart(graph), set);\r
4461                                         }\r
4462                                 } catch (Throwable t2) {\r
4463                                         Logger.defaultLogError(t2);\r
4464                                 }\r
4465                         }\r
4466 \r
4467                         @Override\r
4468                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4469                                 try {\r
4470                                         if(first.compareAndSet(true, false)) {\r
4471                                                 procedure.exception(graph, t);\r
4472 //                                              impl.state.barrier.dec("ReadGraphSupportImpl.1353");\r
4473                                         } else {\r
4474                                                 procedure.exception(impl.newRestart(graph), t);\r
4475                                         }\r
4476                                 } catch (Throwable t2) {\r
4477                                         Logger.defaultLogError(t2);\r
4478                                 }\r
4479                         }\r
4480 \r
4481                 });\r
4482 \r
4483         }\r
4484 \r
4485         @Override\r
4486         final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
4487 \r
4488                 assert(subject != null);\r
4489                 assert(procedure != null);\r
4490 \r
4491                 final ListenerBase listener = getListenerBase(procedure);\r
4492 \r
4493 //              impl.state.barrier.inc();\r
4494 \r
4495                 SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {\r
4496 \r
4497                         AtomicBoolean first = new AtomicBoolean(true);\r
4498 \r
4499                         @Override\r
4500                         public void execute(final ReadGraphImpl graph, IntSet set) {\r
4501 //                              final HashSet<Resource> result = new HashSet<Resource>();\r
4502 //                              set.forEach(new TIntProcedure() {\r
4503 //\r
4504 //                                      @Override\r
4505 //                                      public boolean execute(int type) {\r
4506 //                                              result.add(querySupport.getResource(type));\r
4507 //                                              return true;\r
4508 //                                      }\r
4509 //\r
4510 //                              });\r
4511                                 try {\r
4512                                         if(first.compareAndSet(true, false)) {\r
4513                                                 procedure.execute(graph, set);\r
4514 //                                              impl.state.barrier.dec();\r
4515                                         } else {\r
4516                                                 procedure.execute(impl.newRestart(graph), set);\r
4517                                         }\r
4518                                 } catch (Throwable t2) {\r
4519                                         Logger.defaultLogError(t2);\r
4520                                 }\r
4521                         }\r
4522 \r
4523                         @Override\r
4524                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4525                                 try {\r
4526                                         if(first.compareAndSet(true, false)) {\r
4527                                                 procedure.exception(graph, t);\r
4528 //                                              impl.state.barrier.dec();\r
4529                                         } else {\r
4530                                                 procedure.exception(impl.newRestart(graph), t);\r
4531                                         }\r
4532                                 } catch (Throwable t2) {\r
4533                                         Logger.defaultLogError(t2);\r
4534                                 }\r
4535                         }\r
4536 \r
4537                 });\r
4538 \r
4539         }\r
4540 \r
4541         @Override\r
4542         final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
4543 \r
4544                 assert(subject != null);\r
4545                 assert(procedure != null);\r
4546 \r
4547                 final ListenerBase listener = getListenerBase(procedure);\r
4548 \r
4549                 IntProcedure ip = new IntProcedureAdapter() {\r
4550 \r
4551                         @Override\r
4552                         public void execute(final ReadGraphImpl graph, int superRelation) {\r
4553                                 try {\r
4554                                         procedure.execute(graph, querySupport.getResource(superRelation));\r
4555                                 } catch (Throwable t2) {\r
4556                                         Logger.defaultLogError(t2);\r
4557                                 }\r
4558                         }\r
4559 \r
4560                         @Override\r
4561                         public void finished(final ReadGraphImpl graph) {\r
4562                                 try {\r
4563                                         procedure.finished(graph);\r
4564                                 } catch (Throwable t2) {\r
4565                                         Logger.defaultLogError(t2);\r
4566                                 }\r
4567 //                              impl.state.barrier.dec(this);\r
4568                         }\r
4569 \r
4570 \r
4571                         @Override\r
4572                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4573                                 try {\r
4574                                         procedure.exception(graph, t);\r
4575                                 } catch (Throwable t2) {\r
4576                                         Logger.defaultLogError(t2);\r
4577                                 }\r
4578 //                              impl.state.barrier.dec(this);\r
4579                         }\r
4580 \r
4581                 };\r
4582 \r
4583                 int sId = querySupport.getId(subject); \r
4584 \r
4585 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);\r
4586 //              else impl.state.barrier.inc(null, null);\r
4587 \r
4588                 DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
4589 \r
4590         }\r
4591 \r
4592         @Override\r
4593         final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {\r
4594 \r
4595                 assert(subject != null);\r
4596                 assert(procedure != null);\r
4597 \r
4598                 final ListenerBase listener = getListenerBase(procedure);\r
4599 \r
4600 //              impl.state.barrier.inc();\r
4601 \r
4602                 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
4603 \r
4604         }\r
4605 \r
4606         @Override\r
4607         final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
4608 \r
4609                 assert(subject != null);\r
4610                 assert(procedure != null);\r
4611 \r
4612                 final ListenerBase listener = getListenerBase(procedure);\r
4613 \r
4614                 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
4615 \r
4616                         @Override\r
4617                         public void execute(final ReadGraphImpl graph, IntSet set) {\r
4618 //                              final HashSet<Resource> result = new HashSet<Resource>();\r
4619 //                              set.forEach(new TIntProcedure() {\r
4620 //\r
4621 //                                      @Override\r
4622 //                                      public boolean execute(int type) {\r
4623 //                                              result.add(querySupport.getResource(type));\r
4624 //                                              return true;\r
4625 //                                      }\r
4626 //\r
4627 //                              });\r
4628                                 try {\r
4629                                         procedure.execute(graph, set);\r
4630                                 } catch (Throwable t2) {\r
4631                                         Logger.defaultLogError(t2);\r
4632                                 }\r
4633 //                              impl.state.barrier.dec(this);\r
4634                         }\r
4635 \r
4636                         @Override\r
4637                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4638                                 try {\r
4639                                         procedure.exception(graph, t);\r
4640                                 } catch (Throwable t2) {\r
4641                                         Logger.defaultLogError(t2);\r
4642                                 }\r
4643 //                              impl.state.barrier.dec(this);\r
4644                         }\r
4645 \r
4646                 };\r
4647 \r
4648                 int sId = querySupport.getId(subject);\r
4649 \r
4650 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);\r
4651 //              else impl.state.barrier.inc(null, null);\r
4652 \r
4653                 SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
4654 \r
4655         }\r
4656 \r
4657         final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {\r
4658 \r
4659           int sId = querySupport.getId(subject);\r
4660       return ValueQuery.queryEach(impl, sId, impl.parent);\r
4661 \r
4662         }\r
4663 \r
4664         final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {\r
4665 \r
4666             return ValueQuery.queryEach(impl, subject, impl.parent);\r
4667 \r
4668         }\r
4669 \r
4670         @Override\r
4671         final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
4672 \r
4673                 assert(subject != null);\r
4674 \r
4675                 int sId = querySupport.getId(subject);\r
4676 \r
4677                 if(procedure != null) {\r
4678                 \r
4679                         final ListenerBase listener = getListenerBase(procedure);\r
4680 \r
4681                         InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
4682 \r
4683                                 AtomicBoolean first = new AtomicBoolean(true);\r
4684 \r
4685                                 @Override\r
4686                                 public void execute(ReadGraphImpl graph, byte[] result) {\r
4687                                         try {\r
4688                                                 if(first.compareAndSet(true, false)) {\r
4689                                                         procedure.execute(graph, result);\r
4690 //                                                      impl.state.barrier.dec(this);\r
4691                                                 } else {\r
4692                                                         procedure.execute(impl.newRestart(graph), result);\r
4693                                                 }\r
4694                                         } catch (Throwable t2) {\r
4695                                                 Logger.defaultLogError(t2);\r
4696                                         }\r
4697                                 }\r
4698 \r
4699                                 @Override\r
4700                                 public void exception(ReadGraphImpl graph, Throwable t) {\r
4701                                         try {\r
4702                                                 if(first.compareAndSet(true, false)) {\r
4703                                                         procedure.exception(graph, t);\r
4704 //                                                      impl.state.barrier.dec(this);\r
4705                                                 } else {\r
4706                                                         procedure.exception(impl.newRestart(graph), t);\r
4707                                                 }\r
4708                                         } catch (Throwable t2) {\r
4709                                                 Logger.defaultLogError(t2);\r
4710                                         }\r
4711                                 }\r
4712 \r
4713                         };\r
4714 \r
4715 //                      if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
4716 //                      else impl.state.barrier.inc(null, null);\r
4717 \r
4718                         return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
4719 \r
4720                 } else {\r
4721 \r
4722                         return ValueQuery.queryEach(impl, sId, impl.parent, null, null);\r
4723                         \r
4724                 }\r
4725 \r
4726         }\r
4727 \r
4728         @Override\r
4729         final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
4730 \r
4731                 assert(subject != null);\r
4732                 assert(procedure != null);\r
4733 \r
4734                 final ListenerBase listener = getListenerBase(procedure);\r
4735 \r
4736                 if(impl.parent != null || listener != null) {\r
4737 \r
4738                         InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
4739 \r
4740                                 AtomicBoolean first = new AtomicBoolean(true);\r
4741 \r
4742                                 @Override\r
4743                                 public void execute(ReadGraphImpl graph, byte[] result) {\r
4744                                         try {\r
4745                                                 if(first.compareAndSet(true, false)) {\r
4746                                                         procedure.execute(graph, result);\r
4747 //                                                      impl.state.barrier.dec(this);\r
4748                                                 } else {\r
4749                                                         procedure.execute(impl.newRestart(graph), result);\r
4750                                                 }\r
4751                                         } catch (Throwable t2) {\r
4752                                                 Logger.defaultLogError(t2);\r
4753                                         }\r
4754                                 }\r
4755 \r
4756                                 @Override\r
4757                                 public void exception(ReadGraphImpl graph, Throwable t) {\r
4758                                         try {\r
4759                                                 if(first.compareAndSet(true, false)) {\r
4760                                                         procedure.exception(graph, t);\r
4761 //                                                      impl.state.barrier.dec(this);\r
4762                                                 } else {\r
4763                                                         procedure.exception(impl.newRestart(graph), t);\r
4764                                                 }\r
4765                                         } catch (Throwable t2) {\r
4766                                                 Logger.defaultLogError(t2);\r
4767                                         }\r
4768                                 }\r
4769 \r
4770                         };\r
4771 \r
4772                         int sId = querySupport.getId(subject);\r
4773 \r
4774 //                      if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
4775 //                      else impl.state.barrier.inc(null, null);\r
4776 \r
4777                         ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
4778 \r
4779                 } else {\r
4780 \r
4781                         InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
4782 \r
4783                                 @Override\r
4784                                 public void execute(ReadGraphImpl graph, byte[] result) {\r
4785 \r
4786                                         procedure.execute(graph, result);\r
4787 \r
4788                                 }\r
4789 \r
4790                                 @Override\r
4791                                 public void exception(ReadGraphImpl graph, Throwable t) {\r
4792 \r
4793                                         procedure.exception(graph, t);\r
4794 \r
4795                                 }\r
4796 \r
4797                         };\r
4798 \r
4799                         int sId = querySupport.getId(subject);\r
4800 \r
4801                         ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
4802 \r
4803                 }\r
4804 \r
4805         }\r
4806 \r
4807         @Override\r
4808         final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {\r
4809 \r
4810                 assert(relation != null);\r
4811                 assert(procedure != null);\r
4812 \r
4813                 final ListenerBase listener = getListenerBase(procedure);\r
4814 \r
4815                 IntProcedure ip = new IntProcedure() {\r
4816 \r
4817                         private int result = 0;\r
4818                         \r
4819                     final AtomicBoolean found = new AtomicBoolean(false);\r
4820                     final AtomicBoolean done = new AtomicBoolean(false);\r
4821 \r
4822                         @Override\r
4823                         public void finished(ReadGraphImpl graph) {\r
4824                                 \r
4825                         // Shall fire exactly once!\r
4826                         if(done.compareAndSet(false, true)) {\r
4827                                 try {\r
4828                                         if(result == 0) {\r
4829                                                         procedure.exception(graph, new NoInverseException(""));\r
4830 //                                              impl.state.barrier.dec(this);\r
4831                                         } else {\r
4832                                                 procedure.execute(graph, querySupport.getResource(result));\r
4833 //                                              impl.state.barrier.dec(this);\r
4834                                         }\r
4835                                 } catch (Throwable t) {\r
4836                                         Logger.defaultLogError(t);\r
4837                                 }\r
4838                         }\r
4839                         \r
4840                         }\r
4841 \r
4842                         @Override\r
4843                         public void execute(ReadGraphImpl graph, int i) {\r
4844                                 \r
4845                                 if(found.compareAndSet(false, true)) {\r
4846                                         this.result = i;\r
4847                                 } else {\r
4848                                         // Shall fire exactly once!\r
4849                                         if(done.compareAndSet(false, true)) {\r
4850                                                 try {\r
4851                                                         procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));\r
4852 //                                                      impl.state.barrier.dec(this);\r
4853                                                 } catch (Throwable t) {\r
4854                                                 Logger.defaultLogError(t);\r
4855                                                 }\r
4856                                         }\r
4857                                 }\r
4858                                 \r
4859                         }\r
4860 \r
4861                         @Override\r
4862                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4863                                 // Shall fire exactly once!\r
4864                                 if(done.compareAndSet(false, true)) {\r
4865                                         try {\r
4866                                                 procedure.exception(graph, t);\r
4867 //                                              impl.state.barrier.dec(this);\r
4868                                         } catch (Throwable t2) {\r
4869                                         Logger.defaultLogError(t2);\r
4870                                         }\r
4871                                 }\r
4872                         }\r
4873 \r
4874                 };\r
4875 \r
4876                 int sId = querySupport.getId(relation);\r
4877 \r
4878 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);\r
4879 //              else impl.state.barrier.inc(null, null);\r
4880 \r
4881                 Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);\r
4882 \r
4883         }\r
4884 \r
4885         @Override\r
4886         final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
4887 \r
4888                 assert(id != null);\r
4889                 assert(procedure != null);\r
4890 \r
4891                 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {\r
4892 \r
4893                         @Override\r
4894                         public void execute(ReadGraphImpl graph, Integer result) {\r
4895                                 try {\r
4896                                         procedure.execute(graph, querySupport.getResource(result));\r
4897                                 } catch (Throwable t2) {\r
4898                                         Logger.defaultLogError(t2);\r
4899                                 }\r
4900 //                              impl.state.barrier.dec(this);\r
4901                         }   \r
4902 \r
4903                         @Override\r
4904                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4905 \r
4906                                 try {\r
4907                                         procedure.exception(graph, t);\r
4908                                 } catch (Throwable t2) {\r
4909                                         Logger.defaultLogError(t2);\r
4910                                 }\r
4911 //                              impl.state.barrier.dec(this);\r
4912                         }\r
4913 \r
4914                 };\r
4915 \r
4916 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");\r
4917 //              else impl.state.barrier.inc(null, null);\r
4918 \r
4919                 forResource(impl, id, impl.parent, ip);\r
4920 \r
4921         }\r
4922 \r
4923         @Override\r
4924         final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
4925 \r
4926                 assert(id != null);\r
4927                 assert(procedure != null);\r
4928 \r
4929 //              impl.state.barrier.inc();\r
4930 \r
4931                 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {\r
4932 \r
4933                         @Override\r
4934                         public void execute(ReadGraphImpl graph, Integer result) {\r
4935                                 try {\r
4936                                         procedure.execute(graph, querySupport.getResource(result)); \r
4937                                 } catch (Throwable t2) {\r
4938                                         Logger.defaultLogError(t2);\r
4939                                 }\r
4940 //                              impl.state.barrier.dec();\r
4941                         }   \r
4942 \r
4943                         @Override\r
4944                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4945                                 try {\r
4946                                         procedure.exception(graph, t);\r
4947                                 } catch (Throwable t2) {\r
4948                                         Logger.defaultLogError(t2);\r
4949                                 }\r
4950 //                              impl.state.barrier.dec();\r
4951                         }\r
4952 \r
4953                 });\r
4954 \r
4955         }\r
4956 \r
4957         @Override\r
4958         final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
4959 \r
4960                 assert(subject != null);\r
4961                 assert(procedure != null);\r
4962 \r
4963                 final ListenerBase listener = getListenerBase(procedure);\r
4964 \r
4965 //              impl.state.barrier.inc();\r
4966 \r
4967                 DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
4968 \r
4969                         boolean found = false;\r
4970 \r
4971                         @Override\r
4972                         public void execute(ReadGraphImpl graph, int object) {\r
4973                                 found = true;\r
4974                         }\r
4975 \r
4976                         @Override\r
4977                         public void finished(ReadGraphImpl graph) {\r
4978                                 try {\r
4979                                         procedure.execute(graph, found);\r
4980                                 } catch (Throwable t2) {\r
4981                                         Logger.defaultLogError(t2);\r
4982                                 }\r
4983 //                              impl.state.barrier.dec();\r
4984                         }\r
4985 \r
4986                         @Override\r
4987                         public void exception(ReadGraphImpl graph, Throwable t) {\r
4988                                 try {\r
4989                                         procedure.exception(graph, t);\r
4990                                 } catch (Throwable t2) {\r
4991                                         Logger.defaultLogError(t2);\r
4992                                 }\r
4993 //                              impl.state.barrier.dec();\r
4994                         }\r
4995 \r
4996                 });\r
4997 \r
4998         }\r
4999 \r
5000         @Override\r
5001         final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {\r
5002 \r
5003                 assert(subject != null);\r
5004                 assert(predicate != null);\r
5005                 assert(procedure != null);\r
5006 \r
5007                 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {\r
5008 \r
5009                         boolean found = false;\r
5010 \r
5011                         @Override\r
5012                         synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
5013                                 found = true;\r
5014                         }\r
5015 \r
5016                         @Override\r
5017                         synchronized public void finished(AsyncReadGraph graph) {\r
5018                                 try {\r
5019                                         procedure.execute(graph, found);\r
5020                                 } catch (Throwable t2) {\r
5021                                         Logger.defaultLogError(t2);\r
5022                                 }\r
5023 //                              impl.state.barrier.dec(this);\r
5024                         }\r
5025 \r
5026                         @Override\r
5027                         public void exception(AsyncReadGraph graph, Throwable t) {\r
5028                                 try {\r
5029                                         procedure.exception(graph, t);\r
5030                                 } catch (Throwable t2) {\r
5031                                         Logger.defaultLogError(t2);\r
5032                                 }\r
5033 //                              impl.state.barrier.dec(this);\r
5034                         }\r
5035 \r
5036                 };\r
5037 \r
5038 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);\r
5039 //              else impl.state.barrier.inc(null, null);\r
5040 \r
5041                 forEachObject(impl, subject, predicate, ip);\r
5042 \r
5043         }\r
5044 \r
5045         @Override\r
5046         final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {\r
5047 \r
5048                 assert(subject != null);\r
5049                 assert(predicate != null);\r
5050                 assert(procedure != null);\r
5051 \r
5052 //              impl.state.barrier.inc();\r
5053 \r
5054                 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {\r
5055 \r
5056                         boolean found = false;\r
5057 \r
5058                         @Override\r
5059                         synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
5060                                 if(resource.equals(object)) found = true;\r
5061                         }\r
5062 \r
5063                         @Override\r
5064                         synchronized public void finished(AsyncReadGraph graph) {\r
5065                                 try {\r
5066                                         procedure.execute(graph, found);\r
5067                                 } catch (Throwable t2) {\r
5068                                         Logger.defaultLogError(t2);\r
5069                                 }\r
5070 //                              impl.state.barrier.dec();\r
5071                         }\r
5072 \r
5073                         @Override\r
5074                         public void exception(AsyncReadGraph graph, Throwable t) {\r
5075                                 try {\r
5076                                         procedure.exception(graph, t);\r
5077                                 } catch (Throwable t2) {\r
5078                                         Logger.defaultLogError(t2);\r
5079                                 }\r
5080 //                              impl.state.barrier.dec();\r
5081                         }\r
5082 \r
5083                 });\r
5084 \r
5085         }\r
5086 \r
5087         @Override\r
5088         final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
5089 \r
5090                 assert(subject != null);\r
5091                 assert(procedure != null);\r
5092 \r
5093                 final ListenerBase listener = getListenerBase(procedure);\r
5094 \r
5095 //              impl.state.barrier.inc();\r
5096 \r
5097                 ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {\r
5098 \r
5099                         @Override\r
5100                         public void execute(ReadGraphImpl graph, byte[] object) {\r
5101                                 boolean result = object != null;\r
5102                                 try {\r
5103                                         procedure.execute(graph, result);\r
5104                                 } catch (Throwable t2) {\r
5105                                         Logger.defaultLogError(t2);\r
5106                                 }\r
5107 //                              impl.state.barrier.dec();\r
5108                         }\r
5109 \r
5110                         @Override\r
5111                         public void exception(ReadGraphImpl graph, Throwable t) {\r
5112                                 try {\r
5113                                         procedure.exception(graph, t);\r
5114                                 } catch (Throwable t2) {\r
5115                                         Logger.defaultLogError(t2);\r
5116                                 }\r
5117 //                              impl.state.barrier.dec();\r
5118                         }\r
5119 \r
5120                 });\r
5121 \r
5122         }\r
5123 \r
5124         @Override\r
5125         final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
5126 \r
5127                 assert(subject != null);\r
5128                 assert(procedure != null);\r
5129 \r
5130                 final ListenerBase listener = getListenerBase(procedure);\r
5131 \r
5132 //              impl.state.barrier.inc();\r
5133 \r
5134                 OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
5135 \r
5136                         @Override\r
5137                         public void exception(ReadGraphImpl graph, Throwable t) {\r
5138                                 try {\r
5139                                         procedure.exception(graph, t);\r
5140                                 } catch (Throwable t2) {\r
5141                                         Logger.defaultLogError(t2);\r
5142                                 }\r
5143 //                              impl.state.barrier.dec();\r
5144                         }\r
5145 \r
5146                         @Override\r
5147                         public void execute(ReadGraphImpl graph, int i) {\r
5148                                 try {\r
5149                                         procedure.execute(graph, querySupport.getResource(i));\r
5150                                 } catch (Throwable t2) {\r
5151                                         Logger.defaultLogError(t2);\r
5152                                 }\r
5153                         }\r
5154 \r
5155                         @Override\r
5156                         public void finished(ReadGraphImpl graph) {\r
5157                                 try {\r
5158                                         procedure.finished(graph);\r
5159                                 } catch (Throwable t2) {\r
5160                                         Logger.defaultLogError(t2);\r
5161                                 }\r
5162 //                              impl.state.barrier.dec();\r
5163                         }\r
5164 \r
5165                 });\r
5166 \r
5167         }\r
5168 \r
5169         @Override\r
5170         final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {\r
5171 \r
5172                 assert(request != null);\r
5173                 assert(procedure != null);\r
5174 \r
5175 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");\r
5176 //              else impl.state.barrier.inc(null, null);\r
5177 \r
5178                 runAsyncRead(impl, request, parent, listener, procedure);\r
5179 \r
5180         }\r
5181 \r
5182         @Override\r
5183         final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {\r
5184 \r
5185                 assert(graph != null);\r
5186                 assert(request != null);\r
5187 \r
5188                 final ReadEntry entry = readMap.get(request);\r
5189                 if(entry != null && entry.isReady()) {\r
5190                     return (T)entry.get(graph, this, null);\r
5191                 } else {\r
5192                         return request.perform(graph);\r
5193                 }\r
5194 \r
5195         }\r
5196 \r
5197     final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {\r
5198 \r
5199         assert(graph != null);\r
5200         assert(request != null);\r
5201 \r
5202         final ExternalReadEntry<T> entry = externalReadMap.get(request);\r
5203         if(entry != null && entry.isReady()) {\r
5204             if(entry.isExcepted()) {\r
5205                 Throwable t = (Throwable)entry.getResult();\r
5206                 if(t instanceof DatabaseException) throw (DatabaseException)t;\r
5207                 else throw new DatabaseException(t);\r
5208             } else {\r
5209                 return (T)entry.getResult();\r
5210             }            \r
5211         } else {\r
5212 \r
5213             final DataContainer<T> result = new DataContainer<T>();\r
5214             final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
5215             \r
5216             request.register(graph, new Listener<T>() {\r
5217                 \r
5218                 @Override\r
5219                 public void exception(Throwable t) {\r
5220                     exception.set(t);\r
5221                 }\r
5222 \r
5223                 @Override\r
5224                 public void execute(T t) {\r
5225                     result.set(t);\r
5226                 }\r
5227 \r
5228                 @Override\r
5229                 public boolean isDisposed() {\r
5230                     return true;\r
5231                 }\r
5232             \r
5233             });\r
5234             \r
5235             Throwable t = exception.get();\r
5236             if(t != null) {\r
5237                 if(t instanceof DatabaseException) throw (DatabaseException)t;\r
5238                 else throw new DatabaseException(t);\r
5239             }\r
5240             \r
5241             return result.get();\r
5242 \r
5243         }\r
5244 \r
5245     }\r
5246         \r
5247         @Override\r
5248         final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {\r
5249 \r
5250                 assert(graph != null);\r
5251                 assert(request != null);\r
5252 \r
5253                 final AsyncReadEntry entry = asyncReadMap.get(request);\r
5254                 if(entry != null && entry.isReady()) {\r
5255                         if(entry.isExcepted()) {\r
5256                                 procedure.exception(graph, (Throwable)entry.getResult());\r
5257                         } else {\r
5258                                 procedure.execute(graph, (T)entry.getResult());\r
5259                         }\r
5260                 } else {\r
5261                         request.perform(graph, procedure);\r
5262                 }\r
5263 \r
5264         }\r
5265 \r
5266         @Override\r
5267         final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
5268 \r
5269                 assert(request != null);\r
5270                 assert(procedure != null);\r
5271 \r
5272 //              impl.state.barrier.inc(null, null);\r
5273 \r
5274                 queryMultiRead(impl, request, parent, listener, procedure);\r
5275 \r
5276         }\r
5277 \r
5278         @Override\r
5279         final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
5280 \r
5281                 assert(request != null);\r
5282                 assert(procedure != null);\r
5283 \r
5284 //              impl.state.barrier.inc();\r
5285 \r
5286                 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {\r
5287 \r
5288                         public void execute(AsyncReadGraph graph, T result) {\r
5289 \r
5290                                 try {\r
5291                                         procedure.execute(graph, result);\r
5292                                 } catch (Throwable t2) {\r
5293                                         Logger.defaultLogError(t2);\r
5294                                 }\r
5295                         }\r
5296 \r
5297                         @Override\r
5298                         public void finished(AsyncReadGraph graph) {\r
5299 \r
5300                                 try {\r
5301                                         procedure.finished(graph);\r
5302                                 } catch (Throwable t2) {\r
5303                                         Logger.defaultLogError(t2);\r
5304                                 }\r
5305 \r
5306 //                              impl.state.barrier.dec();\r
5307 \r
5308                         }\r
5309 \r
5310                         @Override\r
5311                         public String toString() {\r
5312                                 return procedure.toString();\r
5313                         }\r
5314 \r
5315                         @Override\r
5316                         public void exception(AsyncReadGraph graph, Throwable t) {\r
5317 \r
5318                                 try {\r
5319                                         procedure.exception(graph, t);\r
5320                                 } catch (Throwable t2) {\r
5321                                         Logger.defaultLogError(t2);\r
5322                                 }\r
5323 \r
5324 //                              impl.state.barrier.dec();\r
5325 \r
5326                         }\r
5327 \r
5328                 });\r
5329 \r
5330         }\r
5331 \r
5332         @Override\r
5333         final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {\r
5334 \r
5335                 assert(request != null);\r
5336                 assert(procedure != null);\r
5337 \r
5338                 queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {\r
5339 \r
5340                         @Override\r
5341                         public void execute(T result) {\r
5342                                 try {\r
5343                                         procedure.execute(result);\r
5344                                 } catch (Throwable t2) {\r
5345                                         Logger.defaultLogError(t2);\r
5346                                 }\r
5347                         }\r
5348 \r
5349                         @Override\r
5350                         public String toString() {\r
5351                                 return procedure.toString();\r
5352                         }\r
5353 \r
5354                         @Override\r
5355                         public void exception(Throwable t) {\r
5356                                 try {\r
5357                                         procedure.exception(t);\r
5358                                 } catch (Throwable t2) {\r
5359                                         Logger.defaultLogError(t2);\r
5360                                 }\r
5361                         }\r
5362 \r
5363                 });\r
5364 \r
5365         }\r
5366 \r
5367         @Override\r
5368         public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {\r
5369                 \r
5370                 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
5371                 \r
5372         }\r
5373         \r
5374         @Override\r
5375         public VirtualGraph getProvider(Resource subject, Resource predicate) {\r
5376                 \r
5377                 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));\r
5378                 \r
5379         }\r
5380 \r
5381         @Override\r
5382         public VirtualGraph getValueProvider(Resource subject) {\r
5383                 \r
5384                 return querySupport.getValueProvider(querySupport.getId(subject));\r
5385                 \r
5386         }\r
5387 \r
5388         public boolean resumeTasks(ReadGraphImpl graph) {\r
5389 \r
5390                 return querySupport.resume(graph);\r
5391 \r
5392         }\r
5393         \r
5394         public boolean isImmutable(int resourceId) {\r
5395                 return querySupport.isImmutable(resourceId);\r
5396         }\r
5397 \r
5398         public boolean isImmutable(Resource resource) {\r
5399                 ResourceImpl impl = (ResourceImpl)resource;\r
5400                 return isImmutable(impl.id);\r
5401         }\r
5402         \r
5403         private Layer0 L0;\r
5404         \r
5405         public Layer0 getL0(ReadGraph graph) {\r
5406                 if(L0 == null) {\r
5407                         L0 = Layer0.getInstance(graph);\r
5408                 }\r
5409                 return L0;\r
5410         }\r
5411         \r
5412 }\r