]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java
Merge "Multiple reader thread support for db client"
[simantics/platform.git] / bundles / org.simantics.modeling / src / org / simantics / modeling / subscription / ModelHistoryCollector.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2011 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.modeling.subscription;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.WeakHashMap;
24 import java.util.concurrent.Semaphore;
25 import java.util.function.Supplier;
26 import java.util.logging.Logger;
27
28 import org.eclipse.core.runtime.ILog;
29 import org.eclipse.core.runtime.IStatus;
30 import org.eclipse.core.runtime.Status;
31 import org.simantics.databoard.Bindings;
32 import org.simantics.databoard.Datatypes;
33 import org.simantics.databoard.accessor.error.AccessorException;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.type.BooleanType;
36 import org.simantics.databoard.type.Datatype;
37 import org.simantics.databoard.type.NumberType;
38 import org.simantics.databoard.type.RecordType;
39 import org.simantics.databoard.util.Bean;
40 import org.simantics.db.ReadGraph;
41 import org.simantics.db.RequestProcessor;
42 import org.simantics.db.Resource;
43 import org.simantics.db.Session;
44 import org.simantics.db.exception.DatabaseException;
45 import org.simantics.db.request.Read;
46 import org.simantics.history.Collector;
47 import org.simantics.history.History;
48 import org.simantics.history.HistoryException;
49 import org.simantics.history.HistoryManager;
50 import org.simantics.history.ItemManager;
51 import org.simantics.history.impl.FileHistory;
52 import org.simantics.history.impl.FlushPolicy;
53 import org.simantics.history.util.subscription.SamplingFormat;
54 import org.simantics.history.util.subscription.SubscriptionItem;
55 import org.simantics.scl.runtime.tuple.Tuple3;
56 import org.simantics.simulation.Activator;
57 import org.simantics.simulation.data.Datasource;
58 import org.simantics.simulation.data.DatasourceAdapter;
59 import org.simantics.simulation.data.VariableHandle;
60 import org.simantics.simulation.experiment.ExperimentState;
61 import org.simantics.simulation.experiment.IDynamicExperiment;
62 import org.simantics.trend.configuration.TrendSamplingFormats;
63 import org.simantics.utils.datastructures.Pair;
64 import org.simantics.utils.threads.IThreadWorkQueue;
65
66 /**
67  * @author Tuukka Lehtonen
68  */
69 public class ModelHistoryCollector {
70
71     Logger log = Logger.getLogger( this.getClass().getName() );
72
73     //private static final boolean DEBUG_LOAD = false;
74
75     /**
76      * For IStatus logging.
77      */
78     static final String BUNDLE_ID = "org.simantics.modeling";
79
80     /**
81      * @param g
82      * @param dynamicExperiment
83      * @param logger
84      * @param subscriptionRequestFunction
85      * @param loadCallback
86      *            a callback runnable that is invoked every time the
87      *            subscription changes
88      * @return
89      * @throws DatabaseException
90      * @throws IOException
91      */
92     public static ModelHistoryCollector createCollector(
93             IDynamicExperiment dynamicExperiment,
94             HistoryManager history,
95             ILog log,
96             Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
97             Runnable loadCallback)
98             throws DatabaseException, IOException
99     {
100         return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);
101     }
102
103     /**
104      * @param g
105      * @param dynamicExperiment
106      * @param logger
107      * @param subscriptionRequestFunction
108      * @param loadCallback
109      *            a callback runnable that is invoked every time the
110      *            subscription changes
111      * @param loadThread
112      *            a possible thread to perform the subscription loads in or
113      *            <code>null</code> to use any thread
114      * @return
115      * @throws DatabaseException
116      * @throws IOException
117      */
118     public static ModelHistoryCollector createCollector(
119             IDynamicExperiment dynamicExperiment,
120             HistoryManager history,
121             ILog log,
122             Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
123             Runnable loadCallback,
124             IThreadWorkQueue loadThread)
125             throws DatabaseException, IOException
126     {
127         ModelHistoryCollector data = new ModelHistoryCollector();
128         data.history = history;
129         data.model = dynamicExperiment.getModel();
130         data.dynamicExperiment = dynamicExperiment;
131         data.logger = log;
132         data.subscriptionFunction = subscriptionRequestFunction;
133         data.loadCallback = loadCallback;
134         data.loadThread = loadThread;
135         return data;
136     }
137
138     private Session                                      session;
139     private HistoryManager                               history;
140     private Resource                                     model;
141     private IDynamicExperiment                           dynamicExperiment;
142     ILog                                         logger;
143     private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;
144     Runnable                                     loadCallback;
145     IThreadWorkQueue                             loadThread;
146
147     private VariableSetListener                          variableListener;
148
149     /**
150      * Released every time {@link #variableListener} either completes reloading
151      * subscriptions or fails with an exception. Used for synchronizing listener-based initialization 
152      */
153     Semaphore                                    initMutex       = new Semaphore(0);
154
155     public static class ItemCollector {
156
157         /** The experiment to collect the data from. */
158         IDynamicExperiment experiment;
159
160         /** The source of the collected data, retrieved from the experiment. */
161         Datasource         source;
162
163         /** History manager that handles stream files */
164         HistoryManager     history;
165
166         /** Collectors for each subscription */
167         Collector          collector;
168
169         DatasourceAdapter  adapter;
170
171         Resource           model;
172
173         public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {
174             this.experiment = experiment;
175             this.source = experiment.getDatasource();
176             this.history = history; 
177             //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));
178             this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);            
179             this.model = model;
180         }
181
182         /**
183          * Load collector with new settings
184          * 
185          * @param workarea 
186          * @param dynamicExperiment
187          * @param variables
188          * @throws HistoryException 
189          */
190         public synchronized void load(List<SubscriptionItem> items)
191                 throws HistoryException, DatabaseException {
192
193             if (isDisposed())
194                 return;
195
196             ExperimentState oldState = experiment.getState();
197             if (oldState == ExperimentState.RUNNING)
198                 experiment.changeState(ExperimentState.STOPPED);
199
200             try {
201                 ItemManager im = new ItemManager(items);
202                 
203                 // Create and/or Update collector
204                 if (collector==null) collector = History.createCollector(history);
205                 
206                 // Update old items
207                 for (SubscriptionItem oldItem : collector.getItems() )
208                 {
209                         String id = (String) oldItem.getFieldUnchecked("id");
210                         Bean newItem = im.get(id);
211                         if ( newItem == null ) {
212                                 oldItem.enabled = false;
213                                 collector.setItem(oldItem);
214                                 //collector.removeItem(id);
215                         }
216                 }    
217                 // Add new items
218                 for (Bean newItem : items) {
219                         collector.setItem( newItem );
220                 }
221                 
222                 // Create files to history
223                 history.modify(items.toArray( new Bean[items.size()] ));
224                 
225                 if (adapter==null) {
226                         adapter = new DatasourceAdapter( collector );
227                     source.addListener( adapter );
228                 } else {
229                         adapter.reset();
230                 }
231             } finally {
232                 experiment.changeState(oldState);
233             }
234         }
235
236         public synchronized void dispose() {
237                 if (source!=null && adapter!=null) {
238                         source.removeListener(adapter);
239                 }
240                 if (adapter!=null) adapter.reset();
241                 if (collector!=null) collector.close();
242                 if (history!=null) history.close();
243             // Mark disposed
244             experiment = null;
245         }
246
247         public HistoryManager getCollector() {
248             return history;
249         }
250
251         public boolean isDisposed() {
252             return experiment == null;
253         }
254
255     }
256
257     final ItemCollector itemCollector = new ItemCollector();
258
259     private ModelHistoryCollector() {
260     }
261
262     /**
263      * @param graph
264      * @param startTime
265      * @throws DatabaseException
266      * @throws HistoryException
267      * @throws InterruptedException
268      */
269     public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {
270         initialize(processor, startTime, false);
271     }
272
273     /**
274      * Initialize is invoked during starting of an experiment (
275      * {@link ExperimentState#INITIALIZING}).
276      * 
277      * @param graph
278      * @param startTime
279      * @param listenToVariableSet true to actively listen to changes in the set
280      *        of history collected variables.
281      * @throws DatabaseException
282      * @throws HistoryException
283      * @throws InterruptedException
284      */
285     public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {
286         this.session = processor.getSession();
287         itemCollector.init(history, dynamicExperiment, startTime, model);
288         refresh0(processor, listenToVariableSet);
289     }
290
291     /**
292      * @throws HistoryException
293      * @throws DatabaseException
294      * @throws InterruptedException 
295      */
296     public void refresh() throws HistoryException, DatabaseException, InterruptedException {
297         refresh0(session, false);
298     }
299
300     /**
301      * @return
302      */
303     public File getWorkarea() {
304         return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;
305     }
306
307     /**
308      * @return
309      */
310     public HistoryManager getHistoryManager() {
311         return itemCollector.history;
312     }
313
314     /**
315      * Queries subscriptions, sets listener, loads the subscriptions. 
316      * 
317      * @param listenToVariableSet 
318      * @throws HistoryException
319      * @throws DatabaseException
320      * @throws InterruptedException 
321      */
322     private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {
323         if (listenToVariableSet) {
324             if (variableListener == null) {
325                 variableListener = new VariableSetListener(this);
326                 initMutex = new Semaphore(0);
327                 processor.syncRequest( subscriptionFunction.get(), variableListener );
328                 // Force synchronous initialization.
329                 initMutex.acquire();
330             }
331         } else {
332             // Only refresh if there's no listener since the listener
333             // will take care of that.
334             if (variableListener == null) {
335                 SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );
336                 if (!variables.getStatus().isOK() && logger != null)
337                     logger.log(variables.getStatus());
338                 itemCollector.load(variables.getSubscriptions());
339                 if (loadCallback != null)
340                     loadCallback.run();
341             }
342         }
343     }
344
345     /**
346      * Stops history collection and disposes of all related resources. Collected
347      * data is left on disk.
348      */
349     public void dispose() {
350         if (variableListener != null) {
351             variableListener.dispose();
352             variableListener = null;
353         }
354         itemCollector.dispose();
355     }
356
357     /**
358      * Primes the initial subscription by attempting to read everything that is
359      * subscribed once. Any caching and fetching performed by the data session
360      * implementation should at least be initiated by this invocation.
361      * 
362      * @param graph
363      * @throws DatabaseException
364      */
365     public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {
366         SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );
367         Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();
368         for (Bean hi : variables.getSubscriptions()) {
369                 String variableId = (String) hi.getFieldUnchecked("variableId");
370                 variableIds.add( Pair.make(hi, variableId) );
371         }
372         
373         Datasource source = dynamicExperiment.getDatasource();
374         for (Pair<Bean, String> ids : variableIds) {
375             Datatype type = source.getType( ids.second );
376             Binding valueBinding = Bindings.getBinding(type);
377             VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);
378             try {
379                 handle.getValue();
380             } catch (AccessorException e) {
381                logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));
382             } finally {
383                 handle.dispose();
384             }
385         }
386     }
387
388     private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {
389         if ( type instanceof BooleanType ) {
390             Double cacheKey = item.interval;
391             Collection<SamplingFormat> formats = cache.get(cacheKey);
392             if (formats == null) {
393                 formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);
394                 cache.put(cacheKey, formats);
395             }
396             return formats;
397         } else if ( type instanceof NumberType ) {
398             Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);
399             Collection<SamplingFormat> formats = cache.get(cacheKey);
400             if (formats == null) {
401                 formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);
402                 cache.put(cacheKey, formats);
403             }
404             return formats;
405         }
406
407         SamplingFormat format = new SamplingFormat();
408         format.formatId = "custom";
409         RecordType f = new RecordType();
410         format.format = f;
411         f.addComponent("time", Datatypes.DOUBLE);
412         f.addComponent("endTime", Datatypes.DOUBLE);
413         f.addComponent("quality", Datatypes.BYTE);
414         f.addComponent("value", type);
415
416         format.interval = item.interval;
417         format.deadband = item.deadband;
418         return Collections.singleton(format);
419     }
420     
421     private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {
422         @Override
423         protected Map<Object, Collection<SamplingFormat>> initialValue() {
424             return new WeakHashMap<Object, Collection<SamplingFormat>>();
425         }
426     };
427
428     public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {
429         Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();
430
431         List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);
432         for (SubscriptionItem item : rawItems) {
433                 
434             // Apros-specific assumption: the unsampled item's id field contains the subscription item's name
435             String groupItemId = item.id;
436             // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)
437             Datatype type = item.format;
438             // Apros-specific assumption: the unsampled item's formatId subscription item's unit
439             String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;
440
441             Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);
442             SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);
443             for (SubscriptionItem hi : his) {
444                 hi.groupItemId = groupItemId;
445                 result.add(hi);
446             }
447         }
448         return result;
449     }
450     
451     public Collector getCollector() {
452                 return itemCollector.collector;
453         }
454
455 }