]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/variable/NodeValueRequest.java
Merge "QueryListening sync is slow"
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / variable / NodeValueRequest.java
1 package org.simantics.db.layer0.variable;
2
3 import org.simantics.databoard.binding.Binding;
4 import org.simantics.databoard.binding.error.BindingException;
5 import org.simantics.databoard.binding.mutable.Variant;
6 import org.simantics.databoard.util.ObjectUtils;
7 import org.simantics.db.ReadGraph;
8 import org.simantics.db.common.request.ParametrizedPrimitiveRead;
9 import org.simantics.db.exception.DatabaseException;
10 import org.simantics.db.procedure.Listener;
11 import org.simantics.simulator.variable.NodeManager;
12 import org.simantics.simulator.variable.exceptions.NodeIsNotValidAnymoreException;
13 import org.simantics.simulator.variable.exceptions.NodeManagerException;
14 import org.simantics.utils.datastructures.Pair;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 @SuppressWarnings("rawtypes")
19 class NodeValueRequest extends ParametrizedPrimitiveRead<Pair<VariableNode,Binding>, Variant> implements VariableNodeReadRunnable {
20
21     private static final Logger LOGGER = LoggerFactory.getLogger(NodeValueRequest.class);
22
23     private Listener<Variant> listener = null;
24     private Variant value = Variables.PENDING_NODE_VALUE;
25     private boolean wasRun = false;
26
27     static class Probe implements Runnable {
28
29         private Pair<VariableNode,Binding> parameter;
30         public Variant result;
31
32         public Probe(Pair<VariableNode,Binding> parameter) {
33             this.parameter = parameter;
34         }
35
36         @SuppressWarnings("unchecked")
37         @Override
38         public void run() {
39             try {
40                 result = NodeValueRequest.get(parameter);
41                 parameter.first.support.valueCache.put(parameter.first.node, result, 1000000000L);
42             } catch (NodeManagerException e) {
43                 e.printStackTrace();
44             } catch (BindingException e) {
45                 e.printStackTrace();
46             }
47         }
48
49     }
50
51     public NodeValueRequest(VariableNode node) {
52         super(Pair.<VariableNode, Binding>make(node, null));
53     }
54
55     public NodeValueRequest(VariableNode node, Binding binding) {
56         super(Pair.<VariableNode, Binding>make(node, binding));
57     }
58
59     @SuppressWarnings("unchecked")
60     @Override
61     public void register(ReadGraph graph, final Listener<Variant> procedure) {
62
63         VariableNode node = parameter.first;
64
65         if(procedure.isDisposed()) {
66
67             // We are not listening
68             Variant result = (Variant)node.support.valueCache.get(node.node);
69
70             if(result != null) {
71                 // Return cached value immediately
72                 procedure.execute(result);
73             } else {
74
75 //              
76 //            listener = procedure;
77 //            
78 //            if(graph.getSynchronous()) {
79 //                try {
80 //                    parameter.support.manager.getRealm().syncExec(this);
81 //                } catch (InterruptedException e) {
82 //                    if (!wasRun) procedure.exception(e);
83 //                } catch (Throwable e) {
84 //                    if (!wasRun) procedure.exception(e);
85 //                }
86 //            } else {
87 //                parameter.support.manager.getRealm().asyncExec(this);
88 //            
89 //                if(value == Variables.PENDING_NODE_VALUE) {
90 //                    procedure.execute(Variables.PENDING_NODE_VALUE);
91 //                }
92 //            }
93 //            return;
94
95                 NodeValueRequest.Probe probe = new Probe(parameter);
96                 node.support.manager.getRealm().asyncExec(probe);
97                 if(probe.result != null) {
98                     procedure.execute(probe.result);
99                 } else {
100                     procedure.execute(Variables.PENDING_NODE_VALUE);
101                 }
102
103             }
104
105             return;            
106         }
107
108         // We need to listen
109         listener = procedure;
110         // Register listening
111         node.support.manager.addNodeListener(node.node, this);
112         synchronized(this) {
113             if(wasRun) {
114                 procedure.execute(value);
115             } else {
116                 Variant result = (Variant)node.support.valueCache.get(node.node);
117                 if(result != null) {
118                     procedure.execute(result);
119                 } else {
120                     procedure.execute(Variables.PENDING_NODE_VALUE);
121                 }
122             }
123         }
124
125 //        if(listener != null) {
126 //            throw new UnsupportedOperationException();
127 //        }
128 //        listener = procedure;
129 //        if(graph.getSynchronous()) {
130 //            try {
131 //                parameter.support.manager.getRealm().syncExec(new VariableNodeReadRunnable() {
132 //                    @Override
133 //                    public void run() {
134 //                        parameter.support.manager.addNodeListener(parameter.node, NodeValueRequest.this);
135 //                    }
136 //                    @Override
137 //                    public String toString() {
138 //                        return "NodeValueRequest.register.sync.addNodeListener @ " + System.identityHashCode(NodeValueRequest.this);
139 //                    }
140 //                });
141 //                
142 //                if (!wasRun) procedure.exception(new InternalException("No invocation of listener from node manager " + parameter.support.manager.getClass().getName()));
143 //            } catch (InterruptedException e) {
144 //                if (!wasRun) procedure.exception(e);
145 //            } catch (Throwable e) {
146 //                if (!wasRun) procedure.exception(e);
147 //            }
148 //        } else {
149 //            parameter.support.manager.addNodeListener(parameter.node, this);
150 //            if(value == Variables.PENDING_NODE_VALUE) procedure.execute(Variables.PENDING_NODE_VALUE);
151 //        }
152
153     }
154
155     static class NodeListener implements VariableNodeReadRunnable {
156
157         private VariableNode node;
158         private NodeValueRequest request;
159
160         public NodeListener(VariableNode node, NodeValueRequest request) {
161             this.node = node;
162             this.request = request;
163         }
164
165         @SuppressWarnings("unchecked")
166         @Override
167         public void run() {
168             node.support.manager.addNodeListener(node.node, request);
169         }
170
171     }
172
173
174     @SuppressWarnings("unchecked")
175     @Override
176     public void unregistered() {
177         VariableNode node = parameter.first;
178         node.support.manager.removeNodeListener(node.node, this);
179         node.support.valueCache.removeListening(node.node);
180         listener = null;
181     }
182
183     @SuppressWarnings("unchecked")
184     public static Variant get(Pair<VariableNode,Binding> parameter) throws NodeManagerException, BindingException {
185
186         VariableNode node = parameter.first;
187         Binding binding = parameter.second;
188
189         if (binding != null) {
190             Object raw = node.support.manager.getValue(node.node, binding);
191             if(raw == null)
192                 return null;
193             else if(NodeManager.PENDING_NODE_VALUE == raw)
194                 return NodeManager.PENDING_NODE_VALUE;
195             else return new Variant(binding, raw);
196         } else {
197             return node.support.manager.getValue(node.node);
198         }
199
200     }
201
202     @SuppressWarnings("unchecked")
203     @Override
204     public synchronized void run() {
205
206         VariableNode node = parameter.first;
207
208         try {
209             Variant newValue = get(parameter);
210             if (wasRun && ObjectUtils.objectEquals(value, newValue)) {
211                 //System.out.println("CACHE VALUE MATCH (" + newValue + ") for " + node.node);
212                 return;
213             }
214             value = newValue;
215             node.support.valueCache.put(node.node, value);
216         } catch (Throwable e) {
217             // Must catch everything to prevent DB client from getting stuck.
218             if(!(e instanceof NodeIsNotValidAnymoreException))
219                 LOGGER.error("Error while computing node value", e);
220             // Invoke the exception method of the listener
221             Listener<Variant> listener = this.listener;
222             if (listener != null) listener.exception(new DatabaseException("External data access error", e));
223             wasRun = true;
224             return;
225         }
226         // Must always invoke an existing listener, regardless of earlier errors.
227         Listener<Variant> listener = this.listener;
228         if (listener != null) {
229             //System.out.println("LISTENER " + listener + " invoked with value " + value);
230             listener.execute(value);
231         }
232         wasRun = true;
233     }
234
235     @Override
236     public String toString() {
237         return "NodeValueRequest.run " + parameter.first.node + " " + parameter.first.support.manager + " " + System.identityHashCode(this);
238     }
239
240 }