+ ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, isSync);
+ if(entry == null) {
+ graph.processor.schedule(new SessionTask(false) {
+ @Override
+ public void run(int thread) {
+ try {
+ assert(!isSync);
+ runnerReadEntry(graph, r, parent, listener, procedure, isSync);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return;
+ }
+ AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
+ ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ else {
+ assert(entry.isPending());
+ ReadEntry.computeForEach(graph, r, entry, procedure_);
+ if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean isSync) throws DatabaseException {
+ AsyncReadEntry existing = null;
+ synchronized(asyncReadEntryMap) {
+ existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
+ if(existing == null) {
+ existing = new AsyncReadEntry(r);
+ existing.clearResult(querySupport);
+ existing.setPending();
+ asyncReadEntryMap.put(id(r), existing);
+ size++;
+ return existing;
+ }
+ if(existing.requiresComputation()) {
+ existing.setPending();
+ return existing;
+ }
+ }
+ if(existing.isPending()) {
+ if(isSync) waitPending(graph, existing);
+ else return null;
+ }
+ return existing;
+ }
+
+ void remove(AsyncReadEntry entry) {
+ synchronized(asyncReadEntryMap) {
+ asyncReadEntryMap.remove(entry.request);
+ }
+ }
+
+ public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException {