}
ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.scheduleNow(new SessionTask(graph) {
+ graph.asyncBarrier.inc();
+ graph.processor.scheduleNow(new SessionTask() {
@Override
public void run0(int thread) {
try {
runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ graph.asyncBarrier.dec();
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
}
}
- AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+ AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
AsyncReadEntry existing = null;
synchronized(asyncReadEntryMap) {
existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
}
}
if(existing.isPending()) {
- if(needsToBlock)
- waitPending(graph, existing);
- else {
- return null;
- }
+ if(needsToBlock)
+ waitPending(graph, existing);
+ else {
+ existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+ @Override
+ public void run0(int thread) {
+ try {
+ runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return null;
+ }
}
return existing;
}
}
return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
}
- AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+ AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
if(entry == null) {
- graph.processor.scheduleNow(new SessionTask(graph) {
- @Override
- public void run0(int thread) {
- try {
- runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
- });
- return null;
+ // Entry was pending and this request has been queued
+ return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
if(entry.isReady()) {