]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
Fixes based on feedback
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / ExternalReadEntry.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import java.util.LinkedList;
15
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.impl.DebugPolicy;
18 import org.simantics.db.impl.graph.ReadGraphImpl;
19 import org.simantics.db.procedure.AsyncProcedure;
20 import org.simantics.db.procedure.Listener;
21 import org.simantics.db.request.ExternalRead;
22 import org.simantics.db.request.RequestFlags;
23
24 final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
25
26     final LinkedList<T> items = new LinkedList<T>();
27
28     protected ExternalRead<T> request;
29     protected ReadGraphImpl graph;
30     protected boolean registered = false;
31
32     @Override
33     int makeHash() {
34         return request.hashCode();
35     }
36     
37     @Override
38     public Object getOriginalRequest() {
39         return request;
40     }
41     
42     @Override
43     public void clearResult(QuerySupport support) {
44     }
45     
46     @Override
47     public void discard() {
48         request.unregistered();
49         request = null;
50         graph = null;
51         super.discard();
52     }
53
54     @Override
55     public void setPending() {
56         //if(result != NO_RESULT) {
57             //new Exception("result = " + result).printStackTrace();
58         //}
59         statusOrException = PENDING;
60         result = REQUIRES_COMPUTATION;
61     }
62     
63     public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
64         assert request != null;
65         this.request = request;
66         this.graph = graph;
67     }
68     
69     @Override
70     public void except(Throwable t) {
71         
72         if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
73         if(statusOrException != DISCARDED) {
74             statusOrException = EXCEPTED;
75             result = t;
76         } else {
77             result = t;
78         }
79         
80         assert(isExcepted());
81         
82     }
83     
84     @Override
85     public void setResult(Object result) {
86
87         super.setResult(result);
88         assert(!(result instanceof Throwable));
89         assert(!isExcepted());
90         
91     }
92     
93     @Override
94     public void setReady() {
95         super.setReady();
96     }
97
98     @Override
99     final public Query getQuery() {
100         
101         return new Query() {
102
103                         @Override
104                         public void recompute(ReadGraphImpl graph) {
105
106                                 synchronized(items) {
107
108                                         // Update
109                                         if(!items.isEmpty()) {
110                                             setReady();
111                                                 setResult(items.removeFirst());
112                                         }
113                                         // Reschedule
114                                         if(!items.isEmpty()) {
115                                                 graph.processor.updatePrimitive(request);
116                                         }
117
118                                 }
119                                 
120                         }
121
122                         @Override
123                         public void removeEntry(QueryProcessor processor) {
124                                 processor.cache.remove(ExternalReadEntry.this);
125                         }
126
127                         @Override
128                         public int type() {
129                                 return RequestFlags.IMMEDIATE_UPDATE;
130                         }
131                         
132                         @Override
133                         public String toString() {
134                                 if(request == null) return "DISCARDED ExternalRead";
135                                 else return request.toString();
136                         }
137                 
138         };
139         
140     }
141
142         @Override
143         public String toString() {
144                 if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
145                 else return request.toString() + " " + + System.identityHashCode(this);
146         }
147
148     @Override
149     public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
150         
151         AsyncProcedure<T> proc = (AsyncProcedure<T>)procedure;
152
153             if(isExcepted()) {
154             
155             proc.exception(graph, (Throwable)getResult());
156             
157         } else {
158             
159             proc.execute(graph, (T)getResult());
160
161         }
162             
163             return getResult();
164         
165     }
166     
167     @Override
168     void prepareRecompute(QuerySupport querySupport) {
169         // Do nothing - the state is already set and cannot be recomputed on demand
170     }
171
172     public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
173         
174                 try {
175                         
176                         ReadGraphImpl queryGraph = graph.withParent(this);
177
178                         if(!registered) {
179                                 request.register(graph, this);
180                                 registered = true;
181                         }
182                         
183                         queryGraph.asyncBarrier.waitBarrier(request, graph);
184
185                 } catch (Throwable t) {
186
187                         except(t);
188
189                 }
190
191                 performFromCache(graph, procedure);
192                 
193                 return getResult();
194
195     }
196
197         @Override
198         public void execute(T result) {
199
200         if(this.result == REQUIRES_COMPUTATION) {
201                 
202                         setResult(result);
203                         setReady();
204
205         } else {
206
207                 synchronized(items) {
208                         items.addLast(result);
209                                 graph.processor.updatePrimitive(request);
210                         // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
211                         // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
212                 }
213                 
214         }
215                 
216         }
217
218         @Override
219         public void exception(Throwable t) {
220                 except(t);
221         }
222
223         @Override
224         public boolean isDisposed() {
225                 return registered && (isDiscarded() || !graph.processor.isBound(this));
226         }
227     
228 }