]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.modeling / src / org / simantics / modeling / subscription / ModelHistoryCollector.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2011 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.modeling.subscription;\r
13 \r
14 import java.io.File;\r
15 import java.io.IOException;\r
16 import java.util.ArrayList;\r
17 import java.util.Collection;\r
18 import java.util.Collections;\r
19 import java.util.HashSet;\r
20 import java.util.List;\r
21 import java.util.Map;\r
22 import java.util.Set;\r
23 import java.util.WeakHashMap;\r
24 import java.util.concurrent.Semaphore;\r
25 import java.util.function.Supplier;\r
26 import java.util.logging.Logger;\r
27 \r
28 import org.eclipse.core.runtime.ILog;\r
29 import org.eclipse.core.runtime.IStatus;\r
30 import org.eclipse.core.runtime.Status;\r
31 import org.simantics.databoard.Bindings;\r
32 import org.simantics.databoard.Datatypes;\r
33 import org.simantics.databoard.accessor.error.AccessorException;\r
34 import org.simantics.databoard.binding.Binding;\r
35 import org.simantics.databoard.type.BooleanType;\r
36 import org.simantics.databoard.type.Datatype;\r
37 import org.simantics.databoard.type.NumberType;\r
38 import org.simantics.databoard.type.RecordType;\r
39 import org.simantics.databoard.util.Bean;\r
40 import org.simantics.db.ReadGraph;\r
41 import org.simantics.db.RequestProcessor;\r
42 import org.simantics.db.Resource;\r
43 import org.simantics.db.Session;\r
44 import org.simantics.db.exception.DatabaseException;\r
45 import org.simantics.db.request.Read;\r
46 import org.simantics.history.Collector;\r
47 import org.simantics.history.History;\r
48 import org.simantics.history.HistoryException;\r
49 import org.simantics.history.HistoryManager;\r
50 import org.simantics.history.ItemManager;\r
51 import org.simantics.history.impl.FileHistory;\r
52 import org.simantics.history.impl.FlushPolicy;\r
53 import org.simantics.history.util.subscription.SamplingFormat;\r
54 import org.simantics.history.util.subscription.SubscriptionItem;\r
55 import org.simantics.scl.runtime.tuple.Tuple3;\r
56 import org.simantics.simulation.Activator;\r
57 import org.simantics.simulation.data.Datasource;\r
58 import org.simantics.simulation.data.DatasourceAdapter;\r
59 import org.simantics.simulation.data.VariableHandle;\r
60 import org.simantics.simulation.experiment.ExperimentState;\r
61 import org.simantics.simulation.experiment.IDynamicExperiment;\r
62 import org.simantics.trend.configuration.TrendSamplingFormats;\r
63 import org.simantics.utils.datastructures.Pair;\r
64 import org.simantics.utils.threads.IThreadWorkQueue;\r
65 \r
66 /**\r
67  * @author Tuukka Lehtonen\r
68  */\r
69 public class ModelHistoryCollector {\r
70 \r
71     Logger log = Logger.getLogger( this.getClass().getName() );\r
72 \r
73     //private static final boolean DEBUG_LOAD = false;\r
74 \r
75     /**\r
76      * For IStatus logging.\r
77      */\r
78     static final String BUNDLE_ID = "org.simantics.modeling";\r
79 \r
80     /**\r
81      * @param g\r
82      * @param dynamicExperiment\r
83      * @param logger\r
84      * @param subscriptionRequestFunction\r
85      * @param loadCallback\r
86      *            a callback runnable that is invoked every time the\r
87      *            subscription changes\r
88      * @return\r
89      * @throws DatabaseException\r
90      * @throws IOException\r
91      */\r
92     public static ModelHistoryCollector createCollector(\r
93             IDynamicExperiment dynamicExperiment,\r
94             HistoryManager history,\r
95             ILog log,\r
96             Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
97             Runnable loadCallback)\r
98             throws DatabaseException, IOException\r
99     {\r
100         return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);\r
101     }\r
102 \r
103     /**\r
104      * @param g\r
105      * @param dynamicExperiment\r
106      * @param logger\r
107      * @param subscriptionRequestFunction\r
108      * @param loadCallback\r
109      *            a callback runnable that is invoked every time the\r
110      *            subscription changes\r
111      * @param loadThread\r
112      *            a possible thread to perform the subscription loads in or\r
113      *            <code>null</code> to use any thread\r
114      * @return\r
115      * @throws DatabaseException\r
116      * @throws IOException\r
117      */\r
118     public static ModelHistoryCollector createCollector(\r
119             IDynamicExperiment dynamicExperiment,\r
120             HistoryManager history,\r
121             ILog log,\r
122             Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
123             Runnable loadCallback,\r
124             IThreadWorkQueue loadThread)\r
125             throws DatabaseException, IOException\r
126     {\r
127         ModelHistoryCollector data = new ModelHistoryCollector();\r
128         data.history = history;\r
129         data.model = dynamicExperiment.getModel();\r
130         data.dynamicExperiment = dynamicExperiment;\r
131         data.logger = log;\r
132         data.subscriptionFunction = subscriptionRequestFunction;\r
133         data.loadCallback = loadCallback;\r
134         data.loadThread = loadThread;\r
135         return data;\r
136     }\r
137 \r
138     private Session                                      session;\r
139     private HistoryManager                               history;\r
140     private Resource                                     model;\r
141     private IDynamicExperiment                           dynamicExperiment;\r
142     ILog                                         logger;\r
143     private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;\r
144     Runnable                                     loadCallback;\r
145     IThreadWorkQueue                             loadThread;\r
146 \r
147     private VariableSetListener                          variableListener;\r
148 \r
149     /**\r
150      * Released every time {@link #variableListener} either completes reloading\r
151      * subscriptions or fails with an exception. Used for synchronizing listener-based initialization \r
152      */\r
153     Semaphore                                    initMutex       = new Semaphore(0);\r
154 \r
155     public static class ItemCollector {\r
156 \r
157         /** The experiment to collect the data from. */\r
158         IDynamicExperiment experiment;\r
159 \r
160         /** The source of the collected data, retrieved from the experiment. */\r
161         Datasource         source;\r
162 \r
163         /** History manager that handles stream files */\r
164         HistoryManager     history;\r
165 \r
166         /** Collectors for each subscription */\r
167         Collector          collector;\r
168 \r
169         DatasourceAdapter  adapter;\r
170 \r
171         Resource           model;\r
172 \r
173         public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {\r
174             this.experiment = experiment;\r
175             this.source = experiment.getDatasource();\r
176             this.history = history; \r
177             //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));\r
178             this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);            \r
179             this.model = model;\r
180         }\r
181 \r
182         /**\r
183          * Load collector with new settings\r
184          * \r
185          * @param workarea \r
186          * @param dynamicExperiment\r
187          * @param variables\r
188          * @throws HistoryException \r
189          */\r
190         public synchronized void load(List<SubscriptionItem> items)\r
191                 throws HistoryException, DatabaseException {\r
192 \r
193             if (isDisposed())\r
194                 return;\r
195 \r
196             ExperimentState oldState = experiment.getState();\r
197             if (oldState == ExperimentState.RUNNING)\r
198                 experiment.changeState(ExperimentState.STOPPED);\r
199 \r
200             try {\r
201                 ItemManager im = new ItemManager(items);\r
202                 \r
203                 // Create and/or Update collector\r
204                 if (collector==null) collector = History.createCollector(history);\r
205                 \r
206                 // Update old items\r
207                 for (SubscriptionItem oldItem : collector.getItems() )\r
208                 {\r
209                         String id = (String) oldItem.getFieldUnchecked("id");\r
210                         Bean newItem = im.get(id);\r
211                         if ( newItem == null ) {\r
212                                 oldItem.enabled = false;\r
213                                 collector.setItem(oldItem);\r
214                                 //collector.removeItem(id);\r
215                         }\r
216                 }    \r
217                 // Add new items\r
218                 for (Bean newItem : items) {\r
219                         collector.setItem( newItem );\r
220                 }\r
221                 \r
222                 // Create files to history\r
223                 history.modify(items.toArray( new Bean[items.size()] ));\r
224                 \r
225                 if (adapter==null) {\r
226                         adapter = new DatasourceAdapter( collector );\r
227                     source.addListener( adapter );\r
228                 } else {\r
229                         adapter.reset();\r
230                 }\r
231             } finally {\r
232                 experiment.changeState(oldState);\r
233             }\r
234         }\r
235 \r
236         public synchronized void dispose() {\r
237                 if (source!=null && adapter!=null) {\r
238                         source.removeListener(adapter);\r
239                 }\r
240                 if (adapter!=null) adapter.reset();\r
241                 if (collector!=null) collector.close();\r
242                 if (history!=null) history.close();\r
243             // Mark disposed\r
244             experiment = null;\r
245         }\r
246 \r
247         public HistoryManager getCollector() {\r
248             return history;\r
249         }\r
250 \r
251         public boolean isDisposed() {\r
252             return experiment == null;\r
253         }\r
254 \r
255     }\r
256 \r
257     final ItemCollector itemCollector = new ItemCollector();\r
258 \r
259     private ModelHistoryCollector() {\r
260     }\r
261 \r
262     /**\r
263      * @param graph\r
264      * @param startTime\r
265      * @throws DatabaseException\r
266      * @throws HistoryException\r
267      * @throws InterruptedException\r
268      */\r
269     public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {\r
270         initialize(processor, startTime, false);\r
271     }\r
272 \r
273     /**\r
274      * Initialize is invoked during starting of an experiment (\r
275      * {@link ExperimentState#INITIALIZING}).\r
276      * \r
277      * @param graph\r
278      * @param startTime\r
279      * @param listenToVariableSet true to actively listen to changes in the set\r
280      *        of history collected variables.\r
281      * @throws DatabaseException\r
282      * @throws HistoryException\r
283      * @throws InterruptedException\r
284      */\r
285     public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {\r
286         this.session = processor.getSession();\r
287         itemCollector.init(history, dynamicExperiment, startTime, model);\r
288         refresh0(processor, listenToVariableSet);\r
289     }\r
290 \r
291     /**\r
292      * @throws HistoryException\r
293      * @throws DatabaseException\r
294      * @throws InterruptedException \r
295      */\r
296     public void refresh() throws HistoryException, DatabaseException, InterruptedException {\r
297         refresh0(session, false);\r
298     }\r
299 \r
300     /**\r
301      * @return\r
302      */\r
303     public File getWorkarea() {\r
304         return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;\r
305     }\r
306 \r
307     /**\r
308      * @return\r
309      */\r
310     public HistoryManager getHistoryManager() {\r
311         return itemCollector.history;\r
312     }\r
313 \r
314     /**\r
315      * Queries subscriptions, sets listener, loads the subscriptions. \r
316      * \r
317      * @param listenToVariableSet \r
318      * @throws HistoryException\r
319      * @throws DatabaseException\r
320      * @throws InterruptedException \r
321      */\r
322     private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {\r
323         if (listenToVariableSet) {\r
324             if (variableListener == null) {\r
325                 variableListener = new VariableSetListener(this);\r
326                 initMutex = new Semaphore(0);\r
327                 processor.asyncRequest( subscriptionFunction.get(), variableListener );\r
328                 // Force synchronous initialization.\r
329                 initMutex.acquire();\r
330             }\r
331         } else {\r
332             // Only refresh if there's no listener since the listener\r
333             // will take care of that.\r
334             if (variableListener == null) {\r
335                 SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );\r
336                 if (!variables.getStatus().isOK() && logger != null)\r
337                     logger.log(variables.getStatus());\r
338                 itemCollector.load(variables.getSubscriptions());\r
339                 if (loadCallback != null)\r
340                     loadCallback.run();\r
341             }\r
342         }\r
343     }\r
344 \r
345     /**\r
346      * Stops history collection and disposes of all related resources. Collected\r
347      * data is left on disk.\r
348      */\r
349     public void dispose() {\r
350         if (variableListener != null) {\r
351             variableListener.dispose();\r
352             variableListener = null;\r
353         }\r
354         itemCollector.dispose();\r
355     }\r
356 \r
357     /**\r
358      * Primes the initial subscription by attempting to read everything that is\r
359      * subscribed once. Any caching and fetching performed by the data session\r
360      * implementation should at least be initiated by this invocation.\r
361      * \r
362      * @param graph\r
363      * @throws DatabaseException\r
364      */\r
365     public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {\r
366         SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );\r
367         Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();\r
368         for (Bean hi : variables.getSubscriptions()) {\r
369                 String variableId = (String) hi.getFieldUnchecked("variableId");\r
370                 variableIds.add( Pair.make(hi, variableId) );\r
371         }\r
372         \r
373         Datasource source = dynamicExperiment.getDatasource();\r
374         for (Pair<Bean, String> ids : variableIds) {\r
375             Datatype type = source.getType( ids.second );\r
376             Binding valueBinding = Bindings.getBinding(type);\r
377             VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);\r
378             try {\r
379                 handle.getValue();\r
380             } catch (AccessorException e) {\r
381                logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));\r
382             } finally {\r
383                 handle.dispose();\r
384             }\r
385         }\r
386     }\r
387 \r
388     private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {\r
389         if ( type instanceof BooleanType ) {\r
390             Double cacheKey = item.interval;\r
391             Collection<SamplingFormat> formats = cache.get(cacheKey);\r
392             if (formats == null) {\r
393                 formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);\r
394                 cache.put(cacheKey, formats);\r
395             }\r
396             return formats;\r
397         } else if ( type instanceof NumberType ) {\r
398             Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);\r
399             Collection<SamplingFormat> formats = cache.get(cacheKey);\r
400             if (formats == null) {\r
401                 formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);\r
402                 cache.put(cacheKey, formats);\r
403             }\r
404             return formats;\r
405         }\r
406 \r
407         SamplingFormat format = new SamplingFormat();\r
408         format.formatId = "custom";\r
409         RecordType f = new RecordType();\r
410         format.format = f;\r
411         f.addComponent("time", Datatypes.DOUBLE);\r
412         f.addComponent("endTime", Datatypes.DOUBLE);\r
413         f.addComponent("quality", Datatypes.BYTE);\r
414         f.addComponent("value", type);\r
415 \r
416         format.interval = item.interval;\r
417         format.deadband = item.deadband;\r
418         return Collections.singleton(format);\r
419     }\r
420     \r
421     private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {\r
422         @Override\r
423         protected Map<Object, Collection<SamplingFormat>> initialValue() {\r
424             return new WeakHashMap<Object, Collection<SamplingFormat>>();\r
425         }\r
426     };\r
427 \r
428     public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {\r
429         Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();\r
430 \r
431         List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);\r
432         for (SubscriptionItem item : rawItems) {\r
433                 \r
434             // Apros-specific assumption: the unsampled item's id field contains the subscription item's name\r
435             String groupItemId = item.id;\r
436             // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)\r
437             Datatype type = item.format;\r
438             // Apros-specific assumption: the unsampled item's formatId subscription item's unit\r
439             String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;\r
440 \r
441             Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);\r
442             SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);\r
443             for (SubscriptionItem hi : his) {\r
444                 hi.groupItemId = groupItemId;\r
445                 result.add(hi);\r
446             }\r
447         }\r
448         return result;\r
449     }\r
450     \r
451     public Collector getCollector() {\r
452                 return itemCollector.collector;\r
453         }\r
454 \r
455 }\r