import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.procedure.InternalProcedure;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
import org.simantics.db.procedure.AsyncMultiProcedure;
import org.simantics.db.procedure.AsyncProcedure;
import org.simantics.db.procedure.Listener;
import org.simantics.db.procedure.ListenerBase;
+import org.simantics.db.procedure.Procedure;
import org.simantics.db.request.AsyncMultiRead;
import org.simantics.db.request.AsyncRead;
import org.simantics.db.request.ExternalRead;
// Just for safety
if(entry.isDiscarded()) return;
- if(entry.isExcepted()) entry.setPending();
if(used.compareAndSet(false, true)) {
+ //entry.setPending();
entry.addOrSet(parentGraph.processor, result);
procedure.execute(parentGraph, result);
} else {
}
}
- public static void waitPending(CacheEntry entry) throws DatabaseException {
+ public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException {
int counter = 0;
while(entry.isPending()) {
try {
- Thread.sleep(1);
- counter++;
- if(counter > 1000) {
- CacheEntryBase base = ((CacheEntryBase)entry);
-// if(base.created != null) {
-// System.err.println("created:");
-// base.created.printStackTrace();
-// }
-// if(base.performed != null) {
-// System.err.println("performed:");
-// base.performed.printStackTrace();
-// }
-// if(base.ready != null) {
-// System.err.println("ready:");
-// base.ready.printStackTrace();
-// }
- new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
- throw new DatabaseException("Timeout waiting for request to complete.");
- //System.err.println("asd");
- //base.getQuery().recompute(null, null, entry);
+ SessionTask task = processor.getOwnTask(processor.thread.get());
+ if(task != null) {
+ task.run(processor.thread.get());
+ } else {
+ Thread.sleep(1);
+ counter++;
+ if(counter > 5000) {
+ CacheEntryBase base = ((CacheEntryBase)entry);
+// if(base.created != null) {
+// System.err.println("created:");
+// base.created.printStackTrace();
+// }
+// if(base.performed != null) {
+// System.err.println("performed:");
+// base.performed.printStackTrace();
+// }
+// if(base.ready != null) {
+// System.err.println("ready:");
+// base.ready.printStackTrace();
+// }
+ new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
+ throw new DatabaseException("Timeout waiting for request to complete.");
+ //System.err.println("asd");
+ //base.getQuery().recompute(null, null, entry);
+ }
}
} catch (InterruptedException e) {
}
private AsyncProcedure<T> procedure;
private T result = null;
private Throwable throwable = null;
+ private Semaphore s = new Semaphore(0);
AsyncProcedureWrapper(AsyncProcedure<T> procedure) {
this.procedure = procedure;
public void execute(AsyncReadGraph graph, T result) {
if(procedure != null) procedure.execute(graph, result);
this.result = result;
+ s.release();
}
@Override
public void exception(AsyncReadGraph graph, Throwable throwable) {
if(procedure != null) procedure.exception(graph, throwable);
this.throwable = throwable;
+ s.release();
}
public T get() throws DatabaseException {
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
if(throwable != null) {
if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
else throw new DatabaseException(throwable);
}
+ static class ExternalProcedureWrapper<T> implements AsyncProcedure<T> {
+
+ private Procedure<T> procedure;
+ private T result = null;
+ private Throwable throwable = null;
+
+ ExternalProcedureWrapper(Procedure<T> procedure) {
+ this.procedure = procedure;
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ if(procedure != null) procedure.execute(result);
+ this.result = result;
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ if(procedure != null) procedure.exception(throwable);
+ this.throwable = throwable;
+ }
+
+ public T get() throws DatabaseException {
+ if(throwable != null) {
+ if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+ else throw new DatabaseException(throwable);
+ } else {
+ return result;
+ }
+ }
+
+ }
+
static class InternalProcedureWrapper<T> implements InternalProcedure<T> {
}
+ public static <T> T resultExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, Procedure<T> procedure) throws DatabaseException {
+ ExternalProcedureWrapper<T> wrap = new ExternalProcedureWrapper<>(procedure);
+ QueryCache.runnerExternalReadEntry(graph, r, parent, listener, wrap);
+ return wrap.get();
+ }
+
public static <T> T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
- QueryCache.runnerReadEntry(graph, r, parent, listener, wrap);
+ QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true);
+ return wrap.get();
+ }
+
+ public static <T> T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
+ AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
+ QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true);
return wrap.get();
}