1 /*******************************************************************************
2 * Copyright (c) 2007, 2011 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.modeling.subscription;
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
23 import java.util.WeakHashMap;
24 import java.util.concurrent.Semaphore;
25 import java.util.function.Supplier;
26 import java.util.logging.Logger;
28 import org.eclipse.core.runtime.ILog;
29 import org.eclipse.core.runtime.IStatus;
30 import org.eclipse.core.runtime.Status;
31 import org.simantics.databoard.Bindings;
32 import org.simantics.databoard.Datatypes;
33 import org.simantics.databoard.accessor.error.AccessorException;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.type.BooleanType;
36 import org.simantics.databoard.type.Datatype;
37 import org.simantics.databoard.type.NumberType;
38 import org.simantics.databoard.type.RecordType;
39 import org.simantics.databoard.util.Bean;
40 import org.simantics.db.ReadGraph;
41 import org.simantics.db.RequestProcessor;
42 import org.simantics.db.Resource;
43 import org.simantics.db.Session;
44 import org.simantics.db.exception.DatabaseException;
45 import org.simantics.db.request.Read;
46 import org.simantics.history.Collector;
47 import org.simantics.history.History;
48 import org.simantics.history.HistoryException;
49 import org.simantics.history.HistoryManager;
50 import org.simantics.history.ItemManager;
51 import org.simantics.history.impl.FileHistory;
52 import org.simantics.history.impl.FlushPolicy;
53 import org.simantics.history.util.subscription.SamplingFormat;
54 import org.simantics.history.util.subscription.SubscriptionItem;
55 import org.simantics.scl.runtime.tuple.Tuple3;
56 import org.simantics.simulation.Activator;
57 import org.simantics.simulation.data.Datasource;
58 import org.simantics.simulation.data.DatasourceAdapter;
59 import org.simantics.simulation.data.VariableHandle;
60 import org.simantics.simulation.experiment.ExperimentState;
61 import org.simantics.simulation.experiment.IDynamicExperiment;
62 import org.simantics.trend.configuration.TrendSamplingFormats;
63 import org.simantics.utils.datastructures.Pair;
64 import org.simantics.utils.threads.IThreadWorkQueue;
67 * @author Tuukka Lehtonen
69 public class ModelHistoryCollector {
71 Logger log = Logger.getLogger( this.getClass().getName() );
73 //private static final boolean DEBUG_LOAD = false;
76 * For IStatus logging.
78 static final String BUNDLE_ID = "org.simantics.modeling";
82 * @param dynamicExperiment
84 * @param subscriptionRequestFunction
86 * a callback runnable that is invoked every time the
87 * subscription changes
89 * @throws DatabaseException
92 public static ModelHistoryCollector createCollector(
93 IDynamicExperiment dynamicExperiment,
94 HistoryManager history,
96 Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
97 Runnable loadCallback)
98 throws DatabaseException, IOException
100 return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);
105 * @param dynamicExperiment
107 * @param subscriptionRequestFunction
108 * @param loadCallback
109 * a callback runnable that is invoked every time the
110 * subscription changes
112 * a possible thread to perform the subscription loads in or
113 * <code>null</code> to use any thread
115 * @throws DatabaseException
116 * @throws IOException
118 public static ModelHistoryCollector createCollector(
119 IDynamicExperiment dynamicExperiment,
120 HistoryManager history,
122 Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
123 Runnable loadCallback,
124 IThreadWorkQueue loadThread)
125 throws DatabaseException, IOException
127 ModelHistoryCollector data = new ModelHistoryCollector();
128 data.history = history;
129 data.model = dynamicExperiment.getModel();
130 data.dynamicExperiment = dynamicExperiment;
132 data.subscriptionFunction = subscriptionRequestFunction;
133 data.loadCallback = loadCallback;
134 data.loadThread = loadThread;
138 private Session session;
139 private HistoryManager history;
140 private Resource model;
141 private IDynamicExperiment dynamicExperiment;
143 private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;
144 Runnable loadCallback;
145 IThreadWorkQueue loadThread;
147 private VariableSetListener variableListener;
150 * Released every time {@link #variableListener} either completes reloading
151 * subscriptions or fails with an exception. Used for synchronizing listener-based initialization
153 Semaphore initMutex = new Semaphore(0);
155 public static class ItemCollector {
157 /** The experiment to collect the data from. */
158 IDynamicExperiment experiment;
160 /** The source of the collected data, retrieved from the experiment. */
163 /** History manager that handles stream files */
164 HistoryManager history;
166 /** Collectors for each subscription */
169 DatasourceAdapter adapter;
173 public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {
174 this.experiment = experiment;
175 this.source = experiment.getDatasource();
176 this.history = history;
177 //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));
178 this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);
183 * Load collector with new settings
186 * @param dynamicExperiment
188 * @throws HistoryException
190 public synchronized void load(List<SubscriptionItem> items)
191 throws HistoryException, DatabaseException {
196 ExperimentState oldState = experiment.getState();
197 if (oldState == ExperimentState.RUNNING)
198 experiment.changeState(ExperimentState.STOPPED);
201 ItemManager im = new ItemManager(items);
203 // Create and/or Update collector
204 if (collector==null) collector = History.createCollector(history);
207 for (SubscriptionItem oldItem : collector.getItems() )
209 String id = (String) oldItem.getFieldUnchecked("id");
210 Bean newItem = im.get(id);
211 if ( newItem == null ) {
212 oldItem.enabled = false;
213 collector.setItem(oldItem);
214 //collector.removeItem(id);
218 for (Bean newItem : items) {
219 collector.setItem( newItem );
222 // Create files to history
223 history.modify(items.toArray( new Bean[items.size()] ));
226 adapter = new DatasourceAdapter( collector );
227 source.addListener( adapter );
232 experiment.changeState(oldState);
236 public synchronized void dispose() {
237 if (source!=null && adapter!=null) {
238 source.removeListener(adapter);
240 if (adapter!=null) adapter.reset();
241 if (collector!=null) collector.close();
242 if (history!=null) history.close();
247 public HistoryManager getCollector() {
251 public boolean isDisposed() {
252 return experiment == null;
257 final ItemCollector itemCollector = new ItemCollector();
259 private ModelHistoryCollector() {
265 * @throws DatabaseException
266 * @throws HistoryException
267 * @throws InterruptedException
269 public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {
270 initialize(processor, startTime, false);
274 * Initialize is invoked during starting of an experiment (
275 * {@link ExperimentState#INITIALIZING}).
279 * @param listenToVariableSet true to actively listen to changes in the set
280 * of history collected variables.
281 * @throws DatabaseException
282 * @throws HistoryException
283 * @throws InterruptedException
285 public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {
286 this.session = processor.getSession();
287 itemCollector.init(history, dynamicExperiment, startTime, model);
288 refresh0(processor, listenToVariableSet);
292 * @throws HistoryException
293 * @throws DatabaseException
294 * @throws InterruptedException
296 public void refresh() throws HistoryException, DatabaseException, InterruptedException {
297 refresh0(session, false);
303 public File getWorkarea() {
304 return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;
310 public HistoryManager getHistoryManager() {
311 return itemCollector.history;
315 * Queries subscriptions, sets listener, loads the subscriptions.
317 * @param listenToVariableSet
318 * @throws HistoryException
319 * @throws DatabaseException
320 * @throws InterruptedException
322 private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {
323 if (listenToVariableSet) {
324 if (variableListener == null) {
325 variableListener = new VariableSetListener(this);
326 initMutex = new Semaphore(0);
327 processor.syncRequest( subscriptionFunction.get(), variableListener );
328 // Force synchronous initialization.
332 // Only refresh if there's no listener since the listener
333 // will take care of that.
334 if (variableListener == null) {
335 SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );
336 if (!variables.getStatus().isOK() && logger != null)
337 logger.log(variables.getStatus());
338 itemCollector.load(variables.getSubscriptions());
339 if (loadCallback != null)
346 * Stops history collection and disposes of all related resources. Collected
347 * data is left on disk.
349 public void dispose() {
350 if (variableListener != null) {
351 variableListener.dispose();
352 variableListener = null;
354 itemCollector.dispose();
358 * Primes the initial subscription by attempting to read everything that is
359 * subscribed once. Any caching and fetching performed by the data session
360 * implementation should at least be initiated by this invocation.
363 * @throws DatabaseException
365 public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {
366 SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );
367 Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();
368 for (Bean hi : variables.getSubscriptions()) {
369 String variableId = (String) hi.getFieldUnchecked("variableId");
370 variableIds.add( Pair.make(hi, variableId) );
373 Datasource source = dynamicExperiment.getDatasource();
374 for (Pair<Bean, String> ids : variableIds) {
375 Datatype type = source.getType( ids.second );
376 Binding valueBinding = Bindings.getBinding(type);
377 VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);
380 } catch (AccessorException e) {
381 logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));
388 private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {
389 if ( type instanceof BooleanType ) {
390 Double cacheKey = item.interval;
391 Collection<SamplingFormat> formats = cache.get(cacheKey);
392 if (formats == null) {
393 formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);
394 cache.put(cacheKey, formats);
397 } else if ( type instanceof NumberType ) {
398 Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);
399 Collection<SamplingFormat> formats = cache.get(cacheKey);
400 if (formats == null) {
401 formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);
402 cache.put(cacheKey, formats);
407 SamplingFormat format = new SamplingFormat();
408 format.formatId = "custom";
409 RecordType f = new RecordType();
411 f.addComponent("time", Datatypes.DOUBLE);
412 f.addComponent("endTime", Datatypes.DOUBLE);
413 f.addComponent("quality", Datatypes.BYTE);
414 f.addComponent("value", type);
416 format.interval = item.interval;
417 format.deadband = item.deadband;
418 return Collections.singleton(format);
421 private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {
423 protected Map<Object, Collection<SamplingFormat>> initialValue() {
424 return new WeakHashMap<Object, Collection<SamplingFormat>>();
428 public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {
429 Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();
431 List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);
432 for (SubscriptionItem item : rawItems) {
434 // Apros-specific assumption: the unsampled item's id field contains the subscription item's name
435 String groupItemId = item.id;
436 // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)
437 Datatype type = item.format;
438 // Apros-specific assumption: the unsampled item's formatId subscription item's unit
439 String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;
441 Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);
442 SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);
443 for (SubscriptionItem hi : his) {
444 hi.groupItemId = groupItemId;
451 public Collector getCollector() {
452 return itemCollector.collector;