-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.impl.query;\r
-\r
-import java.util.ArrayList;\r
-import java.util.LinkedList;\r
-\r
-import org.simantics.db.impl.graph.ReadGraphImpl;\r
-import org.simantics.db.procedure.Procedure;\r
-import org.simantics.db.request.ExternalRead;\r
-import org.simantics.db.request.RequestFlags;\r
-\r
-final public class ExternalReadEntry<T> extends CacheEntryBase {\r
-\r
- final LinkedList<T> items = new LinkedList<T>();\r
- \r
-// public ArrayList<Procedure<T>> procs;\r
-\r
- protected ExternalRead<T> request;\r
-\r
- @Override\r
- int makeHash() {\r
- return request.hashCode();\r
- }\r
- \r
- @Override\r
- public Object getOriginalRequest() {\r
- return request;\r
- }\r
- \r
- @Override\r
- public void clearResult(QuerySupport support) {\r
- }\r
- \r
- @Override\r
- public void discard() {\r
- request.unregistered();\r
- request = null;\r
- super.discard();\r
- }\r
- \r
- public ExternalReadEntry(ExternalRead<T> request) {\r
- assert request != null;\r
- this.request = request;\r
- }\r
- \r
- final public void queue(T item) {\r
- synchronized(items) {\r
- items.addLast(item);\r
- // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated\r
- // In some cases where data is produced really fast this might be necessary but currently this queueing will do.\r
- }\r
- }\r
- \r
- final public void addOrSet(QueryProcessor processor, Object item) {\r
-\r
- try {\r
- \r
- assert(isPending());\r
-\r
- ArrayList<Procedure<T>> p = null;\r
-\r
- synchronized(this) {\r
-\r
- setResult(item);\r
- setReady();\r
-// p = procs;\r
-// procs = null;\r
-\r
- }\r
-\r
-// if(p != null)\r
-// for(Procedure proc : p) {\r
-// proc.execute((T)item);\r
-// }\r
-\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- \r
- }\r
- \r
- @Override\r
- final public Query getQuery() {\r
- \r
- return new Query() {\r
-\r
- @Override\r
- public void recompute(ReadGraphImpl graph, Object provider, CacheEntry entry) {\r
-\r
- final QueryProcessor qp = (QueryProcessor)provider;\r
- synchronized(items) {\r
-\r
- if(entry.isExcepted()) {\r
- \r
- // Exception persists\r
- \r
- } else {\r
- \r
- // Update\r
- if(!items.isEmpty()) {\r
- setResult(items.removeFirst());\r
- }\r
- // Reschedule\r
- if(!items.isEmpty()) {\r
- qp.updatePrimitive(request);\r
- }\r
- \r
- setReady();\r
- \r
- }\r
- \r
- }\r
- \r
- }\r
-\r
- @Override\r
- public void removeEntry(QueryProcessor processor) {\r
- processor.externalReadMap.remove(request);\r
- }\r
-\r
- @Override\r
- public int type() {\r
- return RequestFlags.IMMEDIATE_UPDATE;\r
- }\r
- \r
- @Override\r
- public String toString() {\r
- if(request == null) return "DISCARDED ExternalRead";\r
- else return request.toString();\r
- }\r
- \r
- };\r
- \r
- }\r
-\r
- public void performFromCache(Object procedure) {\r
- \r
- Procedure<T> proc = (Procedure<T>)procedure;\r
-\r
- if(isExcepted()) {\r
- \r
- proc.exception((Throwable)getResult());\r
- \r
- } else {\r
- \r
- proc.execute((T)getResult());\r
-\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public String toString() {\r
- if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);\r
- else return request.toString() + " " + + System.identityHashCode(this);\r
- }\r
-\r
- @Override\r
- public void performFromCache(ReadGraphImpl graph, Object provider, Object procedure) {\r
- performFromCache(procedure);\r
- }\r
- \r
- @Override\r
- public void setReady() {\r
- super.setReady();\r
- }\r
- \r
- @Override\r
- void prepareRecompute(QuerySupport querySupport) {\r
- // Do nothing - the state is already set and cannot be recomputed on demand\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.impl.query;
+
+import java.util.LinkedList;
+
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.DebugPolicy;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
+import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.RequestFlags;
+
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
+
+ final LinkedList<T> items = new LinkedList<T>();
+
+ protected ExternalRead<T> request;
+ protected ReadGraphImpl graph;
+ protected boolean registered = false;
+
+ @Override
+ int makeHash() {
+ return request.hashCode();
+ }
+
+ @Override
+ public Object getOriginalRequest() {
+ return request;
+ }
+
+ @Override
+ public void clearResult(QuerySupport support) {
+ }
+
+ @Override
+ public void discard() {
+ request.unregistered();
+ request = null;
+ graph = null;
+ super.discard();
+ }
+
+ @Override
+ public void setPending() {
+ //if(result != NO_RESULT) {
+ //new Exception("result = " + result).printStackTrace();
+ //}
+ statusOrException = PENDING;
+ result = REQUIRES_COMPUTATION;
+ }
+
+ public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
+ assert request != null;
+ this.request = request;
+ this.graph = graph;
+ }
+
+ @Override
+ public void except(Throwable t) {
+
+ if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
+ if(statusOrException != DISCARDED) {
+ statusOrException = EXCEPTED;
+ result = t;
+ } else {
+ result = t;
+ }
+
+ assert(isExcepted());
+
+ }
+
+ @Override
+ public void setResult(Object result) {
+
+ super.setResult(result);
+ assert(!(result instanceof Throwable));
+ assert(!isExcepted());
+
+ }
+
+ @Override
+ public void setReady() {
+ super.setReady();
+ }
+
+ @Override
+ final public Query getQuery() {
+
+ return new Query() {
+
+ @Override
+ public void recompute(ReadGraphImpl graph) {
+
+ synchronized(items) {
+
+ // Update
+ if(!items.isEmpty()) {
+ setReady();
+ setResult(items.removeFirst());
+ }
+ // Reschedule
+ if(!items.isEmpty()) {
+ graph.processor.updatePrimitive(request);
+ }
+
+ }
+
+ }
+
+ @Override
+ public void removeEntry(QueryProcessor processor) {
+ processor.cache.remove(ExternalReadEntry.this);
+ }
+
+ @Override
+ public int type() {
+ return RequestFlags.IMMEDIATE_UPDATE;
+ }
+
+ @Override
+ public String toString() {
+ if(request == null) return "DISCARDED ExternalRead";
+ else return request.toString();
+ }
+
+ };
+
+ }
+
+ @Override
+ public String toString() {
+ if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
+ else return request.toString() + " " + + System.identityHashCode(this);
+ }
+
+ @Override
+ public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
+
+ AsyncProcedure<T> proc = (AsyncProcedure<T>)procedure;
+
+ if(isExcepted()) {
+
+ proc.exception(graph, (Throwable)getResult());
+
+ } else {
+
+ proc.execute(graph, (T)getResult());
+
+ }
+
+ return getResult();
+
+ }
+
+ @Override
+ void prepareRecompute(QuerySupport querySupport) {
+ // Do nothing - the state is already set and cannot be recomputed on demand
+ }
+
+ public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
+
+ try {
+
+ ReadGraphImpl queryGraph = graph.withParent(this);
+
+ if(!registered) {
+ request.register(graph, this);
+ registered = true;
+ }
+
+ queryGraph.asyncBarrier.waitBarrier(request, graph);
+
+ } catch (Throwable t) {
+
+ except(t);
+
+ }
+
+ performFromCache(graph, procedure);
+
+ return getResult();
+
+ }
+
+ @Override
+ public void execute(T result) {
+
+ if(this.result == REQUIRES_COMPUTATION) {
+
+ setResult(result);
+ setReady();
+
+ } else {
+
+ synchronized(items) {
+ items.addLast(result);
+ graph.processor.updatePrimitive(request);
+ // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+ // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+ }
+
+ }
+
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ except(t);
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return registered && (isDiscarded() || !graph.processor.isBound(this));
+ }
+
+}