+ LOGGER.error("performFromCache proc.execute failed", t);
+ }
+
+ }
+
+ return getResult();
+
+ }
+
+ public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+ AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
+
+ AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
+
+ ReadGraphImpl queryGraph = graph.withParent(entry);
+
+ BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, null, request);
+
+ class AsyncTask extends SessionTask {
+
+ T result;
+ DatabaseException exception;
+
+ public AsyncTask(ReadGraphImpl graph) {
+ super(graph);
+ }
+
+ @Override
+ public void run(int thread) {
+ if(needsToBlock) proc.waitBarrier();
+ if(proc.isDone()) {
+ try {
+ result = (T)proc.get();
+ if(procedure != null) procedure.execute(graph, result);
+ } catch (DatabaseException e) {
+ if(procedure != null) procedure.exception(graph, e);
+ exception = e;
+ } catch (Throwable t) {
+ DatabaseException dbe = new DatabaseException(t);
+ if(procedure != null) procedure.exception(graph, dbe);
+ exception = dbe;
+ } finally {
+ if (entry != null)
+ entry.performFromCache(queryGraph, procedure_);
+ }
+ } else {
+ graph.processor.schedule(this);
+ }