]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
72582ee605802bfe32fac38d10119f6200602586
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / AsyncReadEntry.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.db.AsyncReadGraph;
16 import org.simantics.db.DevelopmentKeys;
17 import org.simantics.db.exception.DatabaseException;
18 import org.simantics.db.impl.BlockingAsyncProcedure;
19 import org.simantics.db.impl.graph.AsyncBarrierImpl;
20 import org.simantics.db.impl.graph.BarrierTracing;
21 import org.simantics.db.impl.graph.ReadGraphImpl;
22 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
23 import org.simantics.db.procedure.AsyncProcedure;
24 import org.simantics.db.request.AsyncRead;
25 import org.simantics.utils.Development;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T> {
30
31     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class);
32
33     protected AsyncRead<T> id;
34
35     AsyncReadEntry(AsyncRead<T> request) {
36         this.id = request;
37                 if (Development.DEVELOPMENT) {
38                         if(Development.<Boolean>getProperty(DevelopmentKeys.CACHE_ENTRY_STATE, Bindings.BOOLEAN)) {
39                         System.err.println("[QUERY STATE]: created " + this);
40                         }
41                 }
42     }
43
44     @Override
45     int makeHash() {
46         return id.hashCode();
47     }
48
49     @Override
50     public Object getOriginalRequest() {
51         return id;
52     }
53
54     @Override
55     public void discard() {
56         super.discard();
57         setResult(null);
58     }
59
60     public void except(AsyncReadGraph graph, Throwable t) {
61
62         assert (isPending());
63
64         synchronized (this) {
65             except(t);
66         }
67
68     }
69
70     @Override
71     final public Query getQuery() {
72
73         return new Query() {
74
75             @Override
76             public void recompute(ReadGraphImpl graph) {
77
78                 try {
79
80                     BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
81
82                         @Override
83                         public void execute(AsyncReadGraph graph, T result) {
84                             setResult(result);
85                             setReady();
86                         }
87
88                         @Override
89                         public void exception(AsyncReadGraph graph, Throwable t) {
90                             except(t);
91                         }
92
93                     }, id);
94
95                     id.perform(graph, proc);
96
97                     proc.get();
98
99                 } catch (Throwable t) {
100                     except(t);
101                 }
102
103             }
104
105             @Override
106             public void removeEntry(QueryProcessor qp) {
107                 qp.cache.remove(AsyncReadEntry.this);
108             }
109
110             @Override
111             public int type() {
112                 return id.getFlags();
113             }
114
115             @Override
116             public String toString() {
117                 if (id == null)
118                     return "DISCARDED";
119                 else if (isExcepted())
120                     return id.toString() + " " + getResult();
121                 else
122                     return id.toString() + " " + statusOrException;
123             }
124
125         };
126
127     }
128
129     @Override
130     public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> proc) {
131
132         if (isExcepted()) {
133
134             try {
135                 proc.exception(graph, (Throwable) getResult());
136             } catch (Throwable t) {
137                 LOGGER.error("performFromCache proc.exception failed", t);
138             }
139
140         } else {
141
142             try {
143                 T result = (T) getResult();
144                 proc.execute(graph, result);
145             } catch (Throwable t) {
146                 LOGGER.error("performFromCache proc.execute failed", t);
147             }
148
149         }
150
151         return getResult();
152
153     }
154
155     public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
156             AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
157
158         AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
159
160         ReadGraphImpl queryGraph = graph.withParent(entry);
161         queryGraph.asyncBarrier.inc();
162         
163         BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
164         
165         class AsyncTask extends SessionTask {
166
167             int counter = 0;
168             T result;
169             DatabaseException exception;
170             
171             public AsyncTask(ReadGraphImpl graph) {
172                 super(graph);
173             }
174
175             @Override
176             public void run0(int thread) {
177                 if(needsToBlock) proc.waitBarrier();
178                 if(proc.isDone()) {
179                     ReadGraphImpl executeGraph = graph.withParent(graph.parent);
180                     executeGraph.asyncBarrier.inc();
181                     try {
182                         result = (T)proc.get();
183                         if(procedure != null) {
184                             procedure.execute(executeGraph, result);
185                         }
186                     } catch (DatabaseException e) {
187                         if(procedure != null) procedure.exception(executeGraph, e);
188                         exception = e;
189                     } catch (Throwable t) {
190                         DatabaseException dbe = new DatabaseException(t);
191                         if(procedure != null) procedure.exception(executeGraph, dbe);
192                         exception = dbe;
193                     } finally {
194                         if (entry != null) {
195                             // This does not throw
196                             entry.performFromCache(executeGraph, procedure_);
197                         }
198                         executeGraph.asyncBarrier.dec();
199                         executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
200                     }
201                 } else {
202                     if(counter++ > 10000) {
203                         if(BarrierTracing.BOOKKEEPING) {
204                             AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
205                             AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
206                             while(caller != null) {
207                                 System.err.println("called by " + AsyncBarrierImpl.report(caller));
208                                 caller = caller.caller;
209                             }
210                             for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) {
211                                 AsyncBarrierImpl.printReverse(ab, 2);
212                             }
213                         }
214                         throw new IllegalStateException("Eternal loop in queries.");
215                     }
216                     graph.processor.schedule(new AsyncTask(graph));
217                 }
218             }
219             
220         }
221
222         try {
223             request.perform(queryGraph, proc);
224         } finally {
225             queryGraph.asyncBarrier.dec();
226         }
227
228         AsyncTask task = new AsyncTask(graph);
229
230         if(needsToBlock) task.run(0);
231         else if (proc.isDone()) task.run(0);
232         else  {
233             graph.processor.schedule(task);
234             return null;
235         }
236
237         if(task.exception != null) throw task.exception;
238         else return task.result;
239
240     }
241
242     @Override
243     public String toString() {
244         if (isDiscarded())
245             return "DISCARDED " + id.toString();
246         else if (isExcepted())
247             return id.toString() + " " + getResult();
248         else
249             return id.toString() + " " + statusOrException;
250     }
251
252     @Override
253     public void execute(AsyncReadGraph graph, T result) {
254         setResult(result);
255         setReady();
256     }
257
258     @Override
259     public void exception(AsyncReadGraph graph, Throwable throwable) {
260         except(throwable);
261     }
262
263 }