import org.simantics.db.AsyncReadGraph;
import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.AsyncBarrierImpl;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.procedure.AsyncProcedure;
private static final Object NO_RESULT = new Object();
private final Object key;
- private final ReadGraphImpl graph;
+ private final AsyncBarrierImpl barrier;
+ private final ReadGraphImpl procedureGraph;
private final AsyncProcedure<Result> procedure;
private Object result = NO_RESULT;
private Throwable exception = null;
- public BlockingAsyncProcedure(ReadGraphImpl graph, AsyncProcedure<Result> procedure, Object key) {
+ public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure<Result> procedure, Object key) {
this.procedure = procedure;
this.key = key;
- this.graph = graph;
- this.graph.asyncBarrier.inc();
+ this.barrier = barrier;
+ this.barrier.inc();
+ this.procedureGraph = procedureGraph;
}
@Override
- public void execute(AsyncReadGraph graph, Result result) {
+ public void execute(AsyncReadGraph graph_, Result result) {
this.result = result;
- this.graph.asyncBarrier.dec();
try {
- if(procedure != null) procedure.execute(graph, result);
+ if(procedure != null) procedure.execute(procedureGraph, result);
} catch (Throwable throwable) {
Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable);
+ } finally {
+ barrier.dec();
}
}
@Override
- public void exception(AsyncReadGraph graph, Throwable t) {
+ public void exception(AsyncReadGraph graph_, Throwable t) {
this.exception = t;
try {
- if(procedure != null) procedure.exception(graph, t);
+ if(procedure != null) procedure.exception(procedureGraph, t);
} catch (Throwable throwable) {
Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable);
} finally {
- this.graph.asyncBarrier.dec();
+ barrier.dec();
}
}
public void waitBarrier() {
- graph.asyncBarrier.waitBarrier(key, graph);
+ barrier.waitBarrier(key, procedureGraph);
}
@SuppressWarnings("unchecked")
public Result get() throws DatabaseException {
- graph.asyncBarrier.waitBarrier(key, graph);
+ barrier.waitBarrier(key, procedureGraph);
if(exception != null) {
if(exception instanceof DatabaseException) throw (DatabaseException)exception;
}
public boolean isDone() {
- return graph.asyncBarrier.get() == 0;
+ return barrier.get() == 0;
}
@SuppressWarnings("unchecked")
package org.simantics.db.impl.graph;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.query.CacheEntry;
import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
private static final long serialVersionUID = 4724463372850048672L;
static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>>();
- static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
+ public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
+ static final HashMap<AsyncBarrierImpl, CacheEntry> entryMap = new HashMap<AsyncBarrierImpl, CacheEntry>();
static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<AsyncBarrierImpl, Boolean>();
static final int WAIT_TIME = 600;
- public static final boolean BOOKKEEPING = false;
+ public static final boolean BOOKKEEPING = true;
public static final boolean PRINT = false;
- static final boolean RESTART_GUARD = false;
+ static final boolean RESTART_GUARD = true;
- final private AsyncBarrierImpl caller;
+ final public AsyncBarrierImpl caller;
//private final Semaphore sema = new Semaphore(0);
- public AsyncBarrierImpl(AsyncBarrierImpl caller) {
+ public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) {
super(0);
if (BOOKKEEPING) {
+ synchronized (entryMap) {
+ entryMap.put(this, entry);
+ }
synchronized (debuggerMap) {
debuggerMap.put(this, new Debugger());
}
}
public class Debugger {
- public HashMap<Object, ArrayList<String>> infos = new HashMap<Object, ArrayList<String>>();
+
+ public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
- public synchronized void inc(Object id, String info) {
+ public synchronized void inc(AsyncBarrierImpl id, String info) {
if (id == null)
return;
- ArrayList<String> exist = infos.get(id);
- if (exist == null) {
- exist = new ArrayList<String>();
- infos.put(id, exist);
- } else {
- // System.err.println("Appending " + id + " += " + info);
- }
- exist.add(info);
- // String exist = infos.put(id, info);
- // if(exist != null) System.err.println("replacing " + exist +
- // " => " + info + " for " + id);
+ String exist = infos.get(id);
+ if (exist != null)
+ throw new IllegalStateException("Already existing info " + id + " " + info);
+ infos.put(id, exist);
}
- public synchronized void dec(Object id) {
+ public synchronized void dec(AsyncBarrierImpl id) {
if (id == null)
return;
- ArrayList<String> exist = infos.get(id);
+ String exist = infos.get(id);
if (exist == null) {
System.err.println("No data for " + id);
} else {
- exist.remove(0);
- if (exist.isEmpty())
- infos.remove(id);
+ infos.remove(id);
}
}
@Override
public synchronized String toString() {
StringBuilder b = new StringBuilder();
- for (ArrayList<String> ss : infos.values()) {
- for (String s : ss)
- b.append("info " + s + "\r\n");
+ for (String s : infos.values()) {
+ b.append("info " + s + "\r\n");
}
return b.toString();
}
-
- public synchronized void toErr(int indent) {
- char[] spaces = new char[indent];
- Arrays.fill(spaces, ' ');
- for (ArrayList<String> ss : infos.values()) {
- for (String s : ss) {
- if (!s.startsWith("#"))
- continue;
- StringBuilder b = new StringBuilder();
- b.append(spaces);
- b.append(s);
- System.err.println(b.toString());
- }
- }
+
+ public boolean isEmpty() {
+ return infos.isEmpty();
}
- }
-
- public void inc() {
-
- if (BOOKKEEPING)
- inc(new Object(), new Exception().getStackTrace()[1].toString());
- else
- inc(null, null);
}
- public void inc(String debug) {
+ public void inc() {
if (BOOKKEEPING)
- inc(new Object(), new Exception().getStackTrace()[1].toString());
+ inc(this, new Exception().getStackTrace()[2].toString());
else
inc(null, null);
+
+ if (RESTART_GUARD)
+ if(restartMap.containsKey(this))
+ throw new IllegalStateException("Unplanned restart");
}
- public void inc(Object id, String info) {
+ private void inc(Object id, String info) {
// if (PRINT) {
// if (get() < 5)
// }
if (BOOKKEEPING) {
- Debugger debugger = debuggerMap.get(this);
- if (debugger != null)
- debugger.inc(id, info);
- // StackTraceElement[] tr = new Exception().getStackTrace();
- // if(tr.length == 4)
- // debugger.inc(new String[] { debug, tr[2].toString(),
- // tr[3].toString() });
- // else if(tr.length == 5)
- // debugger.inc(new String[] { debug, tr[2].toString(),
- // tr[3].toString(), tr[4].toString() });
- // else if(tr.length == 6)
- // debugger.inc(new String[] { debug, tr[2].toString(),
- // tr[3].toString(), tr[4].toString(), tr[5].toString() });
- // else
- // debugger.inc(new String[] { debug, tr[2].toString(),
- // tr[3].toString(), tr[4].toString(), tr[5].toString(),
- // tr[6].toString() });
+// Debugger debugger = debuggerMap.get(this);
+// if (debugger != null)
+// debugger.inc(id, info);
}
- // new Exception().printStackTrace();
-
if(PRINT) {
System.err.println("inc barrier[" + get() + "] " + this);
}
if (BOOKKEEPING) {
- Debugger debugger = debuggerMap.get(this);
- if (debugger != null)
- debugger.dec(id);
- // StackTraceElement[] tr = new Exception().getStackTrace();
- // if(tr.length == 3)
- // debugger.dec(new String[] { debug, tr[2].toString() });
- // else if(tr.length == 4)
- // debugger.dec(new String[] { debug, tr[2].toString(),
- // tr[3].toString() });
- // else
- // debugger.dec(new String[] { debug, tr[2].toString(),
- // tr[3].toString(), tr[4].toString() });
+// Debugger debugger = debuggerMap.get(this);
+// if (debugger != null) {
+// debugger.dec(id);
+// if(debugger.isEmpty())
+// debuggerMap.remove(this);
+// }
}
- // System.err.println("barrier " + this);
- // StackTraceElement[] elems = new Exception().getStackTrace();
- // for(int i=0;i<3;i++) System.err.println(elems[i]);
- // new Exception().printStackTrace();
-
int count = decrementAndGet();
if (count < 1) {
if (count == 0) {
+ debuggerMap.remove(this);
if (caller != null)
caller.dec(this);
if (RESTART_GUARD)
restartMap.put(this, true);
- // sema.release();
- // if(DEBUGGER) {
- // debuggerMap.remove(this);
- // }
- // if(REVERSE_LOOKUP) {
- // reverseLookup.remove(this);
- // }
}
if (count < 0) {
Logger.defaultLogError(
"Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
new Exception());
- // String message = ;
- // System.out.println(message);
- // if (DEBUGGER) {
- // JOptionPane.showMessageDialog(null, message);
- // System.out.println(debugger);
- // }
- // sema.release();
System.exit(-1);
}
assert (count >= 0);
}
}
- private static void printReverse(AsyncBarrierImpl barrier, int indent) {
+ public static String report(AsyncBarrierImpl barrier) {
+ CacheEntry e = entryMap.get(barrier);
+ if(e != null) return e.toString();
+ else return "Barrier@" + System.identityHashCode(barrier);
+ }
+
+ public static void printReverse(AsyncBarrierImpl barrier, int indent) {
if (barrier.get() == 0)
return;
for (int i = 0; i < indent; i++)
System.err.print(" ");
- System.err.println("[" + barrier.get() + " requests]: " + barrier);
- if (BOOKKEEPING) {
- Debugger debugger = debuggerMap.get(barrier);
- debugger.toErr(indent + 2);
- }
+ System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
+// if (BOOKKEEPING) {
+// Debugger debugger = debuggerMap.get(barrier);
+// debugger.toErr(indent + 2);
+// }
Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
if (children != null) {
@Override
public String toString() {
- return "AsyncBarrierImpl@" + System.identityHashCode(this)
- + " - counter = " + get() + " - caller = " + caller;
+ return report(this);
+// return "AsyncBarrierImpl@" + System.identityHashCode(this)
+// + " - counter = " + get() + " - caller = " + caller;
}
}
}
- final private <T> void syncRequest(final AsyncRead<T> request, final AsyncReadProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = getListenerBase(procedure);
- assert(listener == null);
-
- BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
-
- QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
-
- ap.get();
-
- }
-
@Override
public <T> T syncRequest(AsyncRead<T> request,
final SyncProcedure<T> procedure) throws DatabaseException {
this.parentGraph = parentGraph;
this.parent = parent;
this.processor = support;
- this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null);
+ this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
}
ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.exception.DatabaseException;
-import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.impl.BlockingAsyncProcedure;
import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.graph.AsyncBarrierImpl;
try {
- BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph, new AsyncProcedure<T>() {
+ BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
@Override
public void execute(AsyncReadGraph graph, T result) {
ReadGraphImpl queryGraph = graph.withParent(entry);
- BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, null, request);
+ BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
class AsyncTask extends SessionTask {
+ int counter = 0;
T result;
DatabaseException exception;
exception = dbe;
} finally {
if (entry != null)
- entry.performFromCache(queryGraph, procedure_);
+ entry.performFromCache(graph, procedure_);
}
} else {
+ if(counter++ > 10000) {
+ AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
+ AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
+ while(caller != null) {
+ System.err.println("called by " + AsyncBarrierImpl.report(caller));
+ caller = caller.caller;
+ }
+ for(AsyncBarrierImpl ab : AsyncBarrierImpl.debuggerMap.keySet()) {
+ AsyncBarrierImpl.printReverse(ab, 2);
+ }
+ throw new IllegalStateException("Eternal loop in queries.");
+ }
graph.processor.schedule(this);
}
}
import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
import org.simantics.db.request.ExternalRead;
import org.simantics.db.request.RequestFlags;
-final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> {
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
final LinkedList<T> items = new LinkedList<T>();
protected ExternalRead<T> request;
+ protected ReadGraphImpl graph;
+ protected boolean registered = false;
@Override
int makeHash() {
public void discard() {
request.unregistered();
request = null;
+ graph = null;
super.discard();
}
result = REQUIRES_COMPUTATION;
}
- public ExternalReadEntry(ExternalRead<T> request) {
+ public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
assert request != null;
this.request = request;
- }
-
- final public void queue(T item) {
- synchronized(items) {
- items.addLast(item);
- // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
- // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
- }
- }
-
- final public void addOrSet(QueryProcessor processor, Object item) {
-
- try {
-
- assert(isPending());
-
- //ArrayList<Procedure<T>> p = null;
-
- synchronized(this) {
-
- setResult(item);
- setReady();
-// p = procs;
-// procs = null;
-
- }
-
-// if(p != null)
-// for(Procedure proc : p) {
-// proc.execute((T)item);
-// }
-
- } catch (Throwable t) {
- t.printStackTrace();
- }
-
+ this.graph = graph;
}
@Override
public void except(Throwable t) {
+
if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
if(statusOrException != DISCARDED) {
statusOrException = EXCEPTED;
} else {
result = t;
}
+
assert(isExcepted());
+
}
@Override
public void setResult(Object result) {
+
super.setResult(result);
assert(!(result instanceof Throwable));
assert(!isExcepted());
+
+ }
+
+ @Override
+ public void setReady() {
+ super.setReady();
}
@Override
synchronized(items) {
-
// Update
if(!items.isEmpty()) {
setReady();
}
public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
- return graph.processor.cache.performQuery(graph, request, this, procedure);
+
+ try {
+
+ ReadGraphImpl queryGraph = graph.withParent(this);
+
+ if(!registered) {
+ request.register(graph, this);
+ registered = true;
+ }
+
+ queryGraph.asyncBarrier.waitBarrier(request, graph);
+
+ } catch (Throwable t) {
+
+ except(t);
+
+ }
+
+ performFromCache(graph, procedure);
+
+ return getResult();
+
}
+ @Override
+ public void execute(T result) {
+
+ if(this.result == REQUIRES_COMPUTATION) {
+
+ setResult(result);
+ setReady();
+
+ } else {
+
+ synchronized(items) {
+ items.addLast(result);
+ graph.processor.updatePrimitive(request);
+ // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+ // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+ }
+
+ }
+
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ except(t);
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return registered && (isDiscarded() || !graph.processor.isBound(this));
+ }
+
}
synchronized(externalReadEntryMap) {
existing = (ExternalReadEntry)externalReadEntryMap.get(r);
if(existing == null) {
- existing = new ExternalReadEntry(r);
+ existing = new ExternalReadEntry(r, graph);
existing.clearResult(querySupport);
existing.setPending();
externalReadEntryMap.put(id(r), existing);
//
// }
- public <T> Object performQuery(ReadGraphImpl parentGraph, final ExternalRead<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-
- ExternalReadEntry entry = (ExternalReadEntry)entry_;
- AsyncProcedure<T> procedure = (AsyncProcedure<T>)procedure_;
-
- try {
-
- query.register(parentGraph, new Listener<T>() {
-
- AtomicBoolean used = new AtomicBoolean(false);
-
- @Override
- public void execute(T result) {
-
- // Just for safety
- if(entry.isDiscarded()) return;
-
- if(used.compareAndSet(false, true)) {
- //entry.setPending();
- entry.addOrSet(parentGraph.processor, result);
- procedure.execute(parentGraph, result);
- } else {
- entry.queue(result);
- parentGraph.processor.updatePrimitive(query);
- }
-
- }
-
- @Override
- public void exception(Throwable t) {
-
- entry.except(t);
-
- if(used.compareAndSet(false, true)) {
- procedure.exception(parentGraph, t);
- } else {
-// entry.queue(result);
- parentGraph.processor.updatePrimitive(query);
- }
-
- }
-
- @Override
- public String toString() {
- return procedure.toString();
- }
-
- @Override
- public boolean isDisposed() {
- return entry.isDiscarded() || !parentGraph.processor.isBound(entry);
- }
-
- });
-
- return entry.getResult();
-
- } catch (Throwable t) {
-
- entry.except(t);
- procedure.exception(parentGraph, t);
- return entry.getResult();
-
- }
-
- }
public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
// final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
// procedure, "request");
- BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
try {