1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2011 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.modeling.subscription;
\r
14 import java.io.File;
\r
15 import java.io.IOException;
\r
16 import java.util.ArrayList;
\r
17 import java.util.Collection;
\r
18 import java.util.Collections;
\r
19 import java.util.HashSet;
\r
20 import java.util.List;
\r
21 import java.util.Map;
\r
22 import java.util.Set;
\r
23 import java.util.WeakHashMap;
\r
24 import java.util.concurrent.Semaphore;
\r
25 import java.util.function.Supplier;
\r
26 import java.util.logging.Logger;
\r
28 import org.eclipse.core.runtime.ILog;
\r
29 import org.eclipse.core.runtime.IStatus;
\r
30 import org.eclipse.core.runtime.Status;
\r
31 import org.simantics.databoard.Bindings;
\r
32 import org.simantics.databoard.Datatypes;
\r
33 import org.simantics.databoard.accessor.error.AccessorException;
\r
34 import org.simantics.databoard.binding.Binding;
\r
35 import org.simantics.databoard.type.BooleanType;
\r
36 import org.simantics.databoard.type.Datatype;
\r
37 import org.simantics.databoard.type.NumberType;
\r
38 import org.simantics.databoard.type.RecordType;
\r
39 import org.simantics.databoard.util.Bean;
\r
40 import org.simantics.db.ReadGraph;
\r
41 import org.simantics.db.RequestProcessor;
\r
42 import org.simantics.db.Resource;
\r
43 import org.simantics.db.Session;
\r
44 import org.simantics.db.exception.DatabaseException;
\r
45 import org.simantics.db.request.Read;
\r
46 import org.simantics.history.Collector;
\r
47 import org.simantics.history.History;
\r
48 import org.simantics.history.HistoryException;
\r
49 import org.simantics.history.HistoryManager;
\r
50 import org.simantics.history.ItemManager;
\r
51 import org.simantics.history.impl.FileHistory;
\r
52 import org.simantics.history.impl.FlushPolicy;
\r
53 import org.simantics.history.util.subscription.SamplingFormat;
\r
54 import org.simantics.history.util.subscription.SubscriptionItem;
\r
55 import org.simantics.scl.runtime.tuple.Tuple3;
\r
56 import org.simantics.simulation.Activator;
\r
57 import org.simantics.simulation.data.Datasource;
\r
58 import org.simantics.simulation.data.DatasourceAdapter;
\r
59 import org.simantics.simulation.data.VariableHandle;
\r
60 import org.simantics.simulation.experiment.ExperimentState;
\r
61 import org.simantics.simulation.experiment.IDynamicExperiment;
\r
62 import org.simantics.trend.configuration.TrendSamplingFormats;
\r
63 import org.simantics.utils.datastructures.Pair;
\r
64 import org.simantics.utils.threads.IThreadWorkQueue;
\r
67 * @author Tuukka Lehtonen
\r
69 public class ModelHistoryCollector {
\r
71 Logger log = Logger.getLogger( this.getClass().getName() );
\r
73 //private static final boolean DEBUG_LOAD = false;
\r
76 * For IStatus logging.
\r
78 static final String BUNDLE_ID = "org.simantics.modeling";
\r
82 * @param dynamicExperiment
\r
84 * @param subscriptionRequestFunction
\r
85 * @param loadCallback
\r
86 * a callback runnable that is invoked every time the
\r
87 * subscription changes
\r
89 * @throws DatabaseException
\r
90 * @throws IOException
\r
92 public static ModelHistoryCollector createCollector(
\r
93 IDynamicExperiment dynamicExperiment,
\r
94 HistoryManager history,
\r
96 Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
\r
97 Runnable loadCallback)
\r
98 throws DatabaseException, IOException
\r
100 return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);
\r
105 * @param dynamicExperiment
\r
107 * @param subscriptionRequestFunction
\r
108 * @param loadCallback
\r
109 * a callback runnable that is invoked every time the
\r
110 * subscription changes
\r
111 * @param loadThread
\r
112 * a possible thread to perform the subscription loads in or
\r
113 * <code>null</code> to use any thread
\r
115 * @throws DatabaseException
\r
116 * @throws IOException
\r
118 public static ModelHistoryCollector createCollector(
\r
119 IDynamicExperiment dynamicExperiment,
\r
120 HistoryManager history,
\r
122 Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
\r
123 Runnable loadCallback,
\r
124 IThreadWorkQueue loadThread)
\r
125 throws DatabaseException, IOException
\r
127 ModelHistoryCollector data = new ModelHistoryCollector();
\r
128 data.history = history;
\r
129 data.model = dynamicExperiment.getModel();
\r
130 data.dynamicExperiment = dynamicExperiment;
\r
132 data.subscriptionFunction = subscriptionRequestFunction;
\r
133 data.loadCallback = loadCallback;
\r
134 data.loadThread = loadThread;
\r
138 private Session session;
\r
139 private HistoryManager history;
\r
140 private Resource model;
\r
141 private IDynamicExperiment dynamicExperiment;
\r
143 private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;
\r
144 Runnable loadCallback;
\r
145 IThreadWorkQueue loadThread;
\r
147 private VariableSetListener variableListener;
\r
150 * Released every time {@link #variableListener} either completes reloading
\r
151 * subscriptions or fails with an exception. Used for synchronizing listener-based initialization
\r
153 Semaphore initMutex = new Semaphore(0);
\r
155 public static class ItemCollector {
\r
157 /** The experiment to collect the data from. */
\r
158 IDynamicExperiment experiment;
\r
160 /** The source of the collected data, retrieved from the experiment. */
\r
163 /** History manager that handles stream files */
\r
164 HistoryManager history;
\r
166 /** Collectors for each subscription */
\r
167 Collector collector;
\r
169 DatasourceAdapter adapter;
\r
173 public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {
\r
174 this.experiment = experiment;
\r
175 this.source = experiment.getDatasource();
\r
176 this.history = history;
\r
177 //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));
\r
178 this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);
\r
179 this.model = model;
\r
183 * Load collector with new settings
\r
186 * @param dynamicExperiment
\r
188 * @throws HistoryException
\r
190 public synchronized void load(List<SubscriptionItem> items)
\r
191 throws HistoryException, DatabaseException {
\r
196 ExperimentState oldState = experiment.getState();
\r
197 if (oldState == ExperimentState.RUNNING)
\r
198 experiment.changeState(ExperimentState.STOPPED);
\r
201 ItemManager im = new ItemManager(items);
\r
203 // Create and/or Update collector
\r
204 if (collector==null) collector = History.createCollector(history);
\r
206 // Update old items
\r
207 for (SubscriptionItem oldItem : collector.getItems() )
\r
209 String id = (String) oldItem.getFieldUnchecked("id");
\r
210 Bean newItem = im.get(id);
\r
211 if ( newItem == null ) {
\r
212 oldItem.enabled = false;
\r
213 collector.setItem(oldItem);
\r
214 //collector.removeItem(id);
\r
218 for (Bean newItem : items) {
\r
219 collector.setItem( newItem );
\r
222 // Create files to history
\r
223 history.modify(items.toArray( new Bean[items.size()] ));
\r
225 if (adapter==null) {
\r
226 adapter = new DatasourceAdapter( collector );
\r
227 source.addListener( adapter );
\r
232 experiment.changeState(oldState);
\r
236 public synchronized void dispose() {
\r
237 if (source!=null && adapter!=null) {
\r
238 source.removeListener(adapter);
\r
240 if (adapter!=null) adapter.reset();
\r
241 if (collector!=null) collector.close();
\r
242 if (history!=null) history.close();
\r
247 public HistoryManager getCollector() {
\r
251 public boolean isDisposed() {
\r
252 return experiment == null;
\r
257 final ItemCollector itemCollector = new ItemCollector();
\r
259 private ModelHistoryCollector() {
\r
265 * @throws DatabaseException
\r
266 * @throws HistoryException
\r
267 * @throws InterruptedException
\r
269 public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {
\r
270 initialize(processor, startTime, false);
\r
274 * Initialize is invoked during starting of an experiment (
\r
275 * {@link ExperimentState#INITIALIZING}).
\r
279 * @param listenToVariableSet true to actively listen to changes in the set
\r
280 * of history collected variables.
\r
281 * @throws DatabaseException
\r
282 * @throws HistoryException
\r
283 * @throws InterruptedException
\r
285 public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {
\r
286 this.session = processor.getSession();
\r
287 itemCollector.init(history, dynamicExperiment, startTime, model);
\r
288 refresh0(processor, listenToVariableSet);
\r
292 * @throws HistoryException
\r
293 * @throws DatabaseException
\r
294 * @throws InterruptedException
\r
296 public void refresh() throws HistoryException, DatabaseException, InterruptedException {
\r
297 refresh0(session, false);
\r
303 public File getWorkarea() {
\r
304 return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;
\r
310 public HistoryManager getHistoryManager() {
\r
311 return itemCollector.history;
\r
315 * Queries subscriptions, sets listener, loads the subscriptions.
\r
317 * @param listenToVariableSet
\r
318 * @throws HistoryException
\r
319 * @throws DatabaseException
\r
320 * @throws InterruptedException
\r
322 private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {
\r
323 if (listenToVariableSet) {
\r
324 if (variableListener == null) {
\r
325 variableListener = new VariableSetListener(this);
\r
326 initMutex = new Semaphore(0);
\r
327 processor.asyncRequest( subscriptionFunction.get(), variableListener );
\r
328 // Force synchronous initialization.
\r
329 initMutex.acquire();
\r
332 // Only refresh if there's no listener since the listener
\r
333 // will take care of that.
\r
334 if (variableListener == null) {
\r
335 SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );
\r
336 if (!variables.getStatus().isOK() && logger != null)
\r
337 logger.log(variables.getStatus());
\r
338 itemCollector.load(variables.getSubscriptions());
\r
339 if (loadCallback != null)
\r
340 loadCallback.run();
\r
346 * Stops history collection and disposes of all related resources. Collected
\r
347 * data is left on disk.
\r
349 public void dispose() {
\r
350 if (variableListener != null) {
\r
351 variableListener.dispose();
\r
352 variableListener = null;
\r
354 itemCollector.dispose();
\r
358 * Primes the initial subscription by attempting to read everything that is
\r
359 * subscribed once. Any caching and fetching performed by the data session
\r
360 * implementation should at least be initiated by this invocation.
\r
363 * @throws DatabaseException
\r
365 public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {
\r
366 SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );
\r
367 Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();
\r
368 for (Bean hi : variables.getSubscriptions()) {
\r
369 String variableId = (String) hi.getFieldUnchecked("variableId");
\r
370 variableIds.add( Pair.make(hi, variableId) );
\r
373 Datasource source = dynamicExperiment.getDatasource();
\r
374 for (Pair<Bean, String> ids : variableIds) {
\r
375 Datatype type = source.getType( ids.second );
\r
376 Binding valueBinding = Bindings.getBinding(type);
\r
377 VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);
\r
380 } catch (AccessorException e) {
\r
381 logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));
\r
388 private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {
\r
389 if ( type instanceof BooleanType ) {
\r
390 Double cacheKey = item.interval;
\r
391 Collection<SamplingFormat> formats = cache.get(cacheKey);
\r
392 if (formats == null) {
\r
393 formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);
\r
394 cache.put(cacheKey, formats);
\r
397 } else if ( type instanceof NumberType ) {
\r
398 Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);
\r
399 Collection<SamplingFormat> formats = cache.get(cacheKey);
\r
400 if (formats == null) {
\r
401 formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);
\r
402 cache.put(cacheKey, formats);
\r
407 SamplingFormat format = new SamplingFormat();
\r
408 format.formatId = "custom";
\r
409 RecordType f = new RecordType();
\r
411 f.addComponent("time", Datatypes.DOUBLE);
\r
412 f.addComponent("endTime", Datatypes.DOUBLE);
\r
413 f.addComponent("quality", Datatypes.BYTE);
\r
414 f.addComponent("value", type);
\r
416 format.interval = item.interval;
\r
417 format.deadband = item.deadband;
\r
418 return Collections.singleton(format);
\r
421 private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {
\r
423 protected Map<Object, Collection<SamplingFormat>> initialValue() {
\r
424 return new WeakHashMap<Object, Collection<SamplingFormat>>();
\r
428 public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {
\r
429 Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();
\r
431 List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);
\r
432 for (SubscriptionItem item : rawItems) {
\r
434 // Apros-specific assumption: the unsampled item's id field contains the subscription item's name
\r
435 String groupItemId = item.id;
\r
436 // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)
\r
437 Datatype type = item.format;
\r
438 // Apros-specific assumption: the unsampled item's formatId subscription item's unit
\r
439 String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;
\r
441 Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);
\r
442 SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);
\r
443 for (SubscriptionItem hi : his) {
\r
444 hi.groupItemId = groupItemId;
\r
451 public Collector getCollector() {
\r
452 return itemCollector.collector;
\r