/******************************************************************************* * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.query; import org.simantics.db.AsyncReadGraph; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.AsyncBarrierImpl; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.AsyncRead; import org.slf4j.Logger; import org.slf4j.LoggerFactory; final public class AsyncReadEntry extends CacheEntryBase> implements AsyncProcedure { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class); protected AsyncRead request; AsyncReadEntry(AsyncRead request) { this.request = request; if (DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: created " + this); } @Override int makeHash() { return request.hashCode(); } @Override public Object getOriginalRequest() { return request; } @Override public void discard() { super.discard(); setResult(null); } public void except(AsyncReadGraph graph, Throwable t) { assert (isPending()); synchronized (this) { except(t); } } @Override final public Query getQuery() { return new Query() { @Override public void recompute(ReadGraphImpl graph) { try { BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { setResult(result); setReady(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { except(t); } }, request); request.perform(graph, proc); proc.get(); } catch (Throwable t) { except(t); } } @Override public void removeEntry(QueryProcessor qp) { qp.cache.remove(AsyncReadEntry.this); } @Override public int type() { return request.getFlags(); } @Override public String toString() { if (request == null) return "DISCARDED"; else if (isExcepted()) return request.toString() + " " + getResult(); else return request.toString() + " " + statusOrException; } }; } @Override public Object performFromCache(ReadGraphImpl graph, AsyncProcedure proc) { if (isExcepted()) { try { proc.exception(graph, (Throwable) getResult()); } catch (Throwable t) { LOGGER.error("performFromCache proc.exception failed", t); } } else { try { T result = (T) getResult(); proc.execute(graph, result); } catch (Throwable t) { LOGGER.error("performFromCache proc.execute failed", t); } } return getResult(); } public static T computeForEach(ReadGraphImpl graph, AsyncRead request, AsyncReadEntry entry, AsyncProcedure procedure_, boolean needsToBlock) throws DatabaseException { AsyncProcedure procedure = entry != null ? entry : procedure_; ReadGraphImpl queryGraph = graph.withParent(entry); queryGraph.asyncBarrier.inc(); BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request); class AsyncTask extends SessionTask { int counter = 0; T result; DatabaseException exception; public AsyncTask(ReadGraphImpl graph) { super(graph); } @Override public void run0(int thread) { if(needsToBlock) proc.waitBarrier(); if(proc.isDone()) { ReadGraphImpl executeGraph = graph.withParent(graph.parent); executeGraph.asyncBarrier.inc(); try { result = (T)proc.get(); if(procedure != null) { procedure.execute(executeGraph, result); } } catch (DatabaseException e) { if(procedure != null) procedure.exception(executeGraph, e); exception = e; } catch (Throwable t) { DatabaseException dbe = new DatabaseException(t); if(procedure != null) procedure.exception(executeGraph, dbe); exception = dbe; } finally { if (entry != null) { // This does not throw entry.performFromCache(executeGraph, procedure_); } executeGraph.asyncBarrier.dec(); executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); } } else { if(counter++ > 10000) { if(BarrierTracing.BOOKKEEPING) { AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2); AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller; while(caller != null) { System.err.println("called by " + AsyncBarrierImpl.report(caller)); caller = caller.caller; } for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) { AsyncBarrierImpl.printReverse(ab, 2); } } throw new IllegalStateException("Eternal loop in queries."); } graph.processor.schedule(new AsyncTask(graph)); } } } try { request.perform(queryGraph, proc); } finally { queryGraph.asyncBarrier.dec(); } AsyncTask task = new AsyncTask(graph); if(needsToBlock) task.run(0); else if (proc.isDone()) task.run(0); else { graph.processor.schedule(task); return null; } if(task.exception != null) throw task.exception; else return task.result; } @Override public String toString() { if (isDiscarded()) return "DISCARDED " + request.toString(); else if (isExcepted()) return request.toString() + " " + getResult(); else return request.toString() + " " + statusOrException; } @Override public void execute(AsyncReadGraph graph, T result) { setResult(result); setReady(); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { except(throwable); } }