]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
Merge "Multiple reader thread support for db client"
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / ExternalReadEntry.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import java.util.LinkedList;
15
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.impl.DebugPolicy;
18 import org.simantics.db.impl.graph.ReadGraphImpl;
19 import org.simantics.db.procedure.AsyncProcedure;
20 import org.simantics.db.request.ExternalRead;
21 import org.simantics.db.request.RequestFlags;
22
23 final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> {
24
25     final LinkedList<T> items = new LinkedList<T>();
26
27     protected ExternalRead<T> request;
28
29     @Override
30     int makeHash() {
31         return request.hashCode();
32     }
33     
34     @Override
35     public Object getOriginalRequest() {
36         return request;
37     }
38     
39     @Override
40     public void clearResult(QuerySupport support) {
41     }
42     
43     @Override
44     public void discard() {
45         request.unregistered();
46         request = null;
47         super.discard();
48     }
49
50     @Override
51     public void setPending() {
52         //if(result != NO_RESULT) {
53             //new Exception("result = " + result).printStackTrace();
54         //}
55         statusOrException = PENDING;
56         result = REQUIRES_COMPUTATION;
57     }
58     
59     public ExternalReadEntry(ExternalRead<T> request) {
60         assert request != null;
61         this.request = request;
62     }
63     
64     final public void queue(T item) {
65         synchronized(items) {
66                 items.addLast(item);
67                 // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
68                 // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
69         }
70     }
71     
72     final public void addOrSet(QueryProcessor processor, Object item) {
73
74         try {
75         
76             assert(isPending());
77
78             //ArrayList<Procedure<T>> p = null;
79
80             synchronized(this) {
81
82                 setResult(item);
83                 setReady();
84 //                p = procs;
85 //                procs = null;
86
87             }
88
89 //            if(p != null)
90 //                for(Procedure proc : p) {
91 //                    proc.execute((T)item);
92 //                }
93
94         } catch (Throwable t) {
95             t.printStackTrace();
96         }
97         
98     }
99     
100     @Override
101     public void except(Throwable t) {
102         if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
103         if(statusOrException != DISCARDED) {
104             statusOrException = EXCEPTED;
105             result = t;
106         } else {
107             result = t;
108         }
109         assert(isExcepted());
110     }
111     
112     @Override
113     public void setResult(Object result) {
114         super.setResult(result);
115         assert(!(result instanceof Throwable));
116         assert(!isExcepted());
117     }
118
119     @Override
120     final public Query getQuery() {
121         
122         return new Query() {
123
124                         @Override
125                         public void recompute(ReadGraphImpl graph) {
126
127                                 synchronized(items) {
128
129
130                                         // Update
131                                         if(!items.isEmpty()) {
132                                             setReady();
133                                                 setResult(items.removeFirst());
134                                         }
135                                         // Reschedule
136                                         if(!items.isEmpty()) {
137                                                 graph.processor.updatePrimitive(request);
138                                         }
139
140                                 }
141                                 
142                         }
143
144                         @Override
145                         public void removeEntry(QueryProcessor processor) {
146                                 processor.cache.remove(ExternalReadEntry.this);
147                         }
148
149                         @Override
150                         public int type() {
151                                 return RequestFlags.IMMEDIATE_UPDATE;
152                         }
153                         
154                         @Override
155                         public String toString() {
156                                 if(request == null) return "DISCARDED ExternalRead";
157                                 else return request.toString();
158                         }
159                 
160         };
161         
162     }
163
164         @Override
165         public String toString() {
166                 if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
167                 else return request.toString() + " " + + System.identityHashCode(this);
168         }
169
170     @Override
171     public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
172         
173         AsyncProcedure<T> proc = (AsyncProcedure<T>)procedure;
174
175             if(isExcepted()) {
176             
177             proc.exception(graph, (Throwable)getResult());
178             
179         } else {
180             
181             proc.execute(graph, (T)getResult());
182
183         }
184             
185             return getResult();
186         
187     }
188     
189     @Override
190     void prepareRecompute(QuerySupport querySupport) {
191         // Do nothing - the state is already set and cannot be recomputed on demand
192     }
193
194     public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
195         return graph.processor.cache.performQuery(graph, request, this, procedure);
196     }
197
198 }