/*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
*******************************************************************************/
package org.simantics.db.impl.query;
-import java.util.ArrayList;
import java.util.LinkedList;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.procedure.Procedure;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
import org.simantics.db.request.ExternalRead;
import org.simantics.db.request.RequestFlags;
-final public class ExternalReadEntry<T> extends CacheEntryBase {
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
final LinkedList<T> items = new LinkedList<T>();
-
-// public ArrayList<Procedure<T>> procs;
protected ExternalRead<T> request;
+ protected ReadGraphImpl graph;
+ protected boolean registered = false;
@Override
int makeHash() {
public void discard() {
request.unregistered();
request = null;
+ graph = null;
super.discard();
}
+
+ @Override
+ public void setPending() {
+ //if(result != NO_RESULT) {
+ //new Exception("result = " + result).printStackTrace();
+ //}
+ statusOrException = PENDING;
+ result = REQUIRES_COMPUTATION;
+ }
- public ExternalReadEntry(ExternalRead<T> request) {
+ public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
assert request != null;
this.request = request;
+ this.graph = graph;
}
- final public void queue(T item) {
- synchronized(items) {
- items.addLast(item);
- // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
- // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
- }
+ @Override
+ public void except(Throwable t) {
+
+ if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
+ if(statusOrException != DISCARDED) {
+ statusOrException = EXCEPTED;
+ result = t;
+ } else {
+ result = t;
+ }
+
+ assert(isExcepted());
+
}
- final public void addOrSet(QueryProcessor processor, Object item) {
-
- try {
-
- assert(isPending());
-
- ArrayList<Procedure<T>> p = null;
-
- synchronized(this) {
-
- setResult(item);
- setReady();
-// p = procs;
-// procs = null;
-
- }
-
-// if(p != null)
-// for(Procedure proc : p) {
-// proc.execute((T)item);
-// }
+ @Override
+ public void setResult(Object result) {
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ super.setResult(result);
+ assert(!(result instanceof Throwable));
+ assert(!isExcepted());
}
+ @Override
+ public void setReady() {
+ super.setReady();
+ }
+
@Override
final public Query getQuery() {
return new Query() {
@Override
- public void recompute(ReadGraphImpl graph, Object provider, CacheEntry entry) {
-
- final QueryProcessor qp = (QueryProcessor)provider;
- synchronized(items) {
-
- if(entry.isExcepted()) {
-
- // Exception persists
-
- } else {
-
- // Update
- if(!items.isEmpty()) {
- setResult(items.removeFirst());
- }
- // Reschedule
- if(!items.isEmpty()) {
- qp.updatePrimitive(request);
- }
-
+ public void recompute(ReadGraphImpl graph) {
+
+ synchronized(items) {
+
+ // Update
+ if(!items.isEmpty()) {
setReady();
-
- }
-
- }
+ setResult(items.removeFirst());
+ }
+ // Reschedule
+ if(!items.isEmpty()) {
+ graph.processor.updatePrimitive(request);
+ }
+
+ }
}
@Override
public void removeEntry(QueryProcessor processor) {
- processor.externalReadMap.remove(request);
+ processor.cache.remove(ExternalReadEntry.this);
}
@Override
}
- public void performFromCache(Object procedure) {
-
- Procedure<T> proc = (Procedure<T>)procedure;
+ @Override
+ public String toString() {
+ if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
+ else return request.toString() + " " + + System.identityHashCode(this);
+ }
+
+ @Override
+ public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
+
+ AsyncProcedure<T> proc = (AsyncProcedure<T>)procedure;
if(isExcepted()) {
- proc.exception((Throwable)getResult());
+ proc.exception(graph, (Throwable)getResult());
} else {
- proc.execute((T)getResult());
+ proc.execute(graph, (T)getResult());
}
-
- }
-
- @Override
- public String toString() {
- if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
- else return request.toString() + " " + + System.identityHashCode(this);
- }
-
- @Override
- public void performFromCache(ReadGraphImpl graph, Object provider, Object procedure) {
- performFromCache(procedure);
- }
-
- @Override
- public void setReady() {
- super.setReady();
+
+ return getResult();
+
}
@Override
// Do nothing - the state is already set and cannot be recomputed on demand
}
+ public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
+
+ try {
+
+ ReadGraphImpl queryGraph = graph.withParent(this);
+
+ if(!registered) {
+ request.register(graph, this);
+ registered = true;
+ }
+
+ queryGraph.asyncBarrier.waitBarrier(request, graph);
+
+ } catch (Throwable t) {
+
+ except(t);
+
+ }
+
+ performFromCache(graph, procedure);
+
+ return getResult();
+
+ }
+
+ @Override
+ public void execute(T result) {
+
+ if(this.result == REQUIRES_COMPUTATION) {
+
+ setResult(result);
+ setReady();
+
+ } else {
+
+ synchronized(items) {
+ items.addLast(result);
+ graph.processor.updatePrimitive(request);
+ // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+ // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+ }
+
+ }
+
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ except(t);
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return registered && (isDiscarded() || !graph.processor.isBound(this));
+ }
+
}