/******************************************************************************* * Copyright (c) 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Semantum Oy - initial API and implementation *******************************************************************************/ package org.simantics.db.impl; import org.simantics.db.AsyncReadGraph; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.query.AsyncReadEntry; import org.simantics.db.impl.query.PendingTaskSupport; import org.simantics.db.procedure.AsyncProcedure; public class BlockingAsyncProcedure implements AsyncProcedure { private static final Object NO_RESULT = new Object(); public final Object key; public final ReadGraphImpl queryGraph; public final ReadGraphImpl callerGraph; public final AsyncProcedure procedure; public PendingTaskSupport pendingTaskSupport; public final boolean needsToBlock; public Object result = NO_RESULT; public Throwable exception = null; private ReadGraphImpl queryGraph() { return queryGraph; } public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry entry, AsyncProcedure procedure, Object key, boolean needsToBlock) { // A new graph for evaluating the query with correct parent and asyncBarrier queryGraph = callerGraph.withParent(entry, () -> { dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock); }, needsToBlock); queryGraph.asyncBarrier.inc(); this.procedure = procedure; this.key = key; this.queryGraph.asyncBarrier.inc(); this.callerGraph = callerGraph; this.needsToBlock = needsToBlock; if (BarrierTracing.BOOKKEEPING) { BarrierTracing.registerBAP(this); } } @Override public void execute(AsyncReadGraph graph_, Result result) { this.result = result; queryGraph.asyncBarrier.dec(); } @Override public void exception(AsyncReadGraph graph_, Throwable t) { this.exception = t; queryGraph.asyncBarrier.dec(); } public void waitBarrier() { queryGraph.asyncBarrier.waitBarrier(key, queryGraph); } public void dec() { queryGraph.asyncBarrier.dec(); } @SuppressWarnings("unchecked") public Result get() throws DatabaseException { if(needsToBlock) queryGraph.asyncBarrier.waitBarrier(key, queryGraph); if(exception != null) { if(exception instanceof DatabaseException) throw (DatabaseException)exception; throw new DatabaseException(exception); } else { return (Result)result; } } @SuppressWarnings("unchecked") public Result getResult() { return (Result)result; } public Throwable getException() { return exception; } @Override public String toString() { return "." + procedure; } private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry entry, AsyncProcedure procedure_, boolean needsToBlock) { AsyncProcedure procedure = entry != null ? entry : procedure_; ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent, null, needsToBlock); executeGraph.asyncBarrier.inc(); try { if(procedure != null) { procedure.execute(executeGraph, get()); } } catch (DatabaseException e) { if(procedure != null) procedure.exception(executeGraph, e); exception = e; } catch (Throwable t) { DatabaseException dbe = new DatabaseException(t); if(procedure != null) procedure.exception(executeGraph, dbe); exception = dbe; } finally { if (entry != null) { assert(entry.isReady()); // This does not throw entry.performFromCache(executeGraph, procedure_); } executeGraph.asyncBarrier.dec(); if(needsToBlock) executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); } if (BarrierTracing.BOOKKEEPING) { BarrierTracing.unregisterBAP(this); } } }