-/*******************************************************************************\r
- * Copyright (c) 2007, 2011 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.modeling.subscription;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashSet;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.WeakHashMap;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.function.Supplier;\r
-import java.util.logging.Logger;\r
-\r
-import org.eclipse.core.runtime.ILog;\r
-import org.eclipse.core.runtime.IStatus;\r
-import org.eclipse.core.runtime.Status;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.Datatypes;\r
-import org.simantics.databoard.accessor.error.AccessorException;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.type.BooleanType;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.databoard.type.NumberType;\r
-import org.simantics.databoard.type.RecordType;\r
-import org.simantics.databoard.util.Bean;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.history.Collector;\r
-import org.simantics.history.History;\r
-import org.simantics.history.HistoryException;\r
-import org.simantics.history.HistoryManager;\r
-import org.simantics.history.ItemManager;\r
-import org.simantics.history.impl.FileHistory;\r
-import org.simantics.history.impl.FlushPolicy;\r
-import org.simantics.history.util.subscription.SamplingFormat;\r
-import org.simantics.history.util.subscription.SubscriptionItem;\r
-import org.simantics.scl.runtime.tuple.Tuple3;\r
-import org.simantics.simulation.Activator;\r
-import org.simantics.simulation.data.Datasource;\r
-import org.simantics.simulation.data.DatasourceAdapter;\r
-import org.simantics.simulation.data.VariableHandle;\r
-import org.simantics.simulation.experiment.ExperimentState;\r
-import org.simantics.simulation.experiment.IDynamicExperiment;\r
-import org.simantics.trend.configuration.TrendSamplingFormats;\r
-import org.simantics.utils.datastructures.Pair;\r
-import org.simantics.utils.threads.IThreadWorkQueue;\r
-\r
-/**\r
- * @author Tuukka Lehtonen\r
- */\r
-public class ModelHistoryCollector {\r
-\r
- Logger log = Logger.getLogger( this.getClass().getName() );\r
-\r
- //private static final boolean DEBUG_LOAD = false;\r
-\r
- /**\r
- * For IStatus logging.\r
- */\r
- static final String BUNDLE_ID = "org.simantics.modeling";\r
-\r
- /**\r
- * @param g\r
- * @param dynamicExperiment\r
- * @param logger\r
- * @param subscriptionRequestFunction\r
- * @param loadCallback\r
- * a callback runnable that is invoked every time the\r
- * subscription changes\r
- * @return\r
- * @throws DatabaseException\r
- * @throws IOException\r
- */\r
- public static ModelHistoryCollector createCollector(\r
- IDynamicExperiment dynamicExperiment,\r
- HistoryManager history,\r
- ILog log,\r
- Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
- Runnable loadCallback)\r
- throws DatabaseException, IOException\r
- {\r
- return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);\r
- }\r
-\r
- /**\r
- * @param g\r
- * @param dynamicExperiment\r
- * @param logger\r
- * @param subscriptionRequestFunction\r
- * @param loadCallback\r
- * a callback runnable that is invoked every time the\r
- * subscription changes\r
- * @param loadThread\r
- * a possible thread to perform the subscription loads in or\r
- * <code>null</code> to use any thread\r
- * @return\r
- * @throws DatabaseException\r
- * @throws IOException\r
- */\r
- public static ModelHistoryCollector createCollector(\r
- IDynamicExperiment dynamicExperiment,\r
- HistoryManager history,\r
- ILog log,\r
- Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
- Runnable loadCallback,\r
- IThreadWorkQueue loadThread)\r
- throws DatabaseException, IOException\r
- {\r
- ModelHistoryCollector data = new ModelHistoryCollector();\r
- data.history = history;\r
- data.model = dynamicExperiment.getModel();\r
- data.dynamicExperiment = dynamicExperiment;\r
- data.logger = log;\r
- data.subscriptionFunction = subscriptionRequestFunction;\r
- data.loadCallback = loadCallback;\r
- data.loadThread = loadThread;\r
- return data;\r
- }\r
-\r
- private Session session;\r
- private HistoryManager history;\r
- private Resource model;\r
- private IDynamicExperiment dynamicExperiment;\r
- ILog logger;\r
- private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;\r
- Runnable loadCallback;\r
- IThreadWorkQueue loadThread;\r
-\r
- private VariableSetListener variableListener;\r
-\r
- /**\r
- * Released every time {@link #variableListener} either completes reloading\r
- * subscriptions or fails with an exception. Used for synchronizing listener-based initialization \r
- */\r
- Semaphore initMutex = new Semaphore(0);\r
-\r
- public static class ItemCollector {\r
-\r
- /** The experiment to collect the data from. */\r
- IDynamicExperiment experiment;\r
-\r
- /** The source of the collected data, retrieved from the experiment. */\r
- Datasource source;\r
-\r
- /** History manager that handles stream files */\r
- HistoryManager history;\r
-\r
- /** Collectors for each subscription */\r
- Collector collector;\r
-\r
- DatasourceAdapter adapter;\r
-\r
- Resource model;\r
-\r
- public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {\r
- this.experiment = experiment;\r
- this.source = experiment.getDatasource();\r
- this.history = history; \r
- //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));\r
- this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); \r
- this.model = model;\r
- }\r
-\r
- /**\r
- * Load collector with new settings\r
- * \r
- * @param workarea \r
- * @param dynamicExperiment\r
- * @param variables\r
- * @throws HistoryException \r
- */\r
- public synchronized void load(List<SubscriptionItem> items)\r
- throws HistoryException, DatabaseException {\r
-\r
- if (isDisposed())\r
- return;\r
-\r
- ExperimentState oldState = experiment.getState();\r
- if (oldState == ExperimentState.RUNNING)\r
- experiment.changeState(ExperimentState.STOPPED);\r
-\r
- try {\r
- ItemManager im = new ItemManager(items);\r
- \r
- // Create and/or Update collector\r
- if (collector==null) collector = History.createCollector(history);\r
- \r
- // Update old items\r
- for (SubscriptionItem oldItem : collector.getItems() )\r
- {\r
- String id = (String) oldItem.getFieldUnchecked("id");\r
- Bean newItem = im.get(id);\r
- if ( newItem == null ) {\r
- oldItem.enabled = false;\r
- collector.setItem(oldItem);\r
- //collector.removeItem(id);\r
- }\r
- } \r
- // Add new items\r
- for (Bean newItem : items) {\r
- collector.setItem( newItem );\r
- }\r
- \r
- // Create files to history\r
- history.modify(items.toArray( new Bean[items.size()] ));\r
- \r
- if (adapter==null) {\r
- adapter = new DatasourceAdapter( collector );\r
- source.addListener( adapter );\r
- } else {\r
- adapter.reset();\r
- }\r
- } finally {\r
- experiment.changeState(oldState);\r
- }\r
- }\r
-\r
- public synchronized void dispose() {\r
- if (source!=null && adapter!=null) {\r
- source.removeListener(adapter);\r
- }\r
- if (adapter!=null) adapter.reset();\r
- if (collector!=null) collector.close();\r
- if (history!=null) history.close();\r
- // Mark disposed\r
- experiment = null;\r
- }\r
-\r
- public HistoryManager getCollector() {\r
- return history;\r
- }\r
-\r
- public boolean isDisposed() {\r
- return experiment == null;\r
- }\r
-\r
- }\r
-\r
- final ItemCollector itemCollector = new ItemCollector();\r
-\r
- private ModelHistoryCollector() {\r
- }\r
-\r
- /**\r
- * @param graph\r
- * @param startTime\r
- * @throws DatabaseException\r
- * @throws HistoryException\r
- * @throws InterruptedException\r
- */\r
- public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {\r
- initialize(processor, startTime, false);\r
- }\r
-\r
- /**\r
- * Initialize is invoked during starting of an experiment (\r
- * {@link ExperimentState#INITIALIZING}).\r
- * \r
- * @param graph\r
- * @param startTime\r
- * @param listenToVariableSet true to actively listen to changes in the set\r
- * of history collected variables.\r
- * @throws DatabaseException\r
- * @throws HistoryException\r
- * @throws InterruptedException\r
- */\r
- public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {\r
- this.session = processor.getSession();\r
- itemCollector.init(history, dynamicExperiment, startTime, model);\r
- refresh0(processor, listenToVariableSet);\r
- }\r
-\r
- /**\r
- * @throws HistoryException\r
- * @throws DatabaseException\r
- * @throws InterruptedException \r
- */\r
- public void refresh() throws HistoryException, DatabaseException, InterruptedException {\r
- refresh0(session, false);\r
- }\r
-\r
- /**\r
- * @return\r
- */\r
- public File getWorkarea() {\r
- return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;\r
- }\r
-\r
- /**\r
- * @return\r
- */\r
- public HistoryManager getHistoryManager() {\r
- return itemCollector.history;\r
- }\r
-\r
- /**\r
- * Queries subscriptions, sets listener, loads the subscriptions. \r
- * \r
- * @param listenToVariableSet \r
- * @throws HistoryException\r
- * @throws DatabaseException\r
- * @throws InterruptedException \r
- */\r
- private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {\r
- if (listenToVariableSet) {\r
- if (variableListener == null) {\r
- variableListener = new VariableSetListener(this);\r
- initMutex = new Semaphore(0);\r
- processor.asyncRequest( subscriptionFunction.get(), variableListener );\r
- // Force synchronous initialization.\r
- initMutex.acquire();\r
- }\r
- } else {\r
- // Only refresh if there's no listener since the listener\r
- // will take care of that.\r
- if (variableListener == null) {\r
- SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );\r
- if (!variables.getStatus().isOK() && logger != null)\r
- logger.log(variables.getStatus());\r
- itemCollector.load(variables.getSubscriptions());\r
- if (loadCallback != null)\r
- loadCallback.run();\r
- }\r
- }\r
- }\r
-\r
- /**\r
- * Stops history collection and disposes of all related resources. Collected\r
- * data is left on disk.\r
- */\r
- public void dispose() {\r
- if (variableListener != null) {\r
- variableListener.dispose();\r
- variableListener = null;\r
- }\r
- itemCollector.dispose();\r
- }\r
-\r
- /**\r
- * Primes the initial subscription by attempting to read everything that is\r
- * subscribed once. Any caching and fetching performed by the data session\r
- * implementation should at least be initiated by this invocation.\r
- * \r
- * @param graph\r
- * @throws DatabaseException\r
- */\r
- public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {\r
- SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );\r
- Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();\r
- for (Bean hi : variables.getSubscriptions()) {\r
- String variableId = (String) hi.getFieldUnchecked("variableId");\r
- variableIds.add( Pair.make(hi, variableId) );\r
- }\r
- \r
- Datasource source = dynamicExperiment.getDatasource();\r
- for (Pair<Bean, String> ids : variableIds) {\r
- Datatype type = source.getType( ids.second );\r
- Binding valueBinding = Bindings.getBinding(type);\r
- VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);\r
- try {\r
- handle.getValue();\r
- } catch (AccessorException e) {\r
- logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));\r
- } finally {\r
- handle.dispose();\r
- }\r
- }\r
- }\r
-\r
- private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {\r
- if ( type instanceof BooleanType ) {\r
- Double cacheKey = item.interval;\r
- Collection<SamplingFormat> formats = cache.get(cacheKey);\r
- if (formats == null) {\r
- formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);\r
- cache.put(cacheKey, formats);\r
- }\r
- return formats;\r
- } else if ( type instanceof NumberType ) {\r
- Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);\r
- Collection<SamplingFormat> formats = cache.get(cacheKey);\r
- if (formats == null) {\r
- formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);\r
- cache.put(cacheKey, formats);\r
- }\r
- return formats;\r
- }\r
-\r
- SamplingFormat format = new SamplingFormat();\r
- format.formatId = "custom";\r
- RecordType f = new RecordType();\r
- format.format = f;\r
- f.addComponent("time", Datatypes.DOUBLE);\r
- f.addComponent("endTime", Datatypes.DOUBLE);\r
- f.addComponent("quality", Datatypes.BYTE);\r
- f.addComponent("value", type);\r
-\r
- format.interval = item.interval;\r
- format.deadband = item.deadband;\r
- return Collections.singleton(format);\r
- }\r
- \r
- private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {\r
- @Override\r
- protected Map<Object, Collection<SamplingFormat>> initialValue() {\r
- return new WeakHashMap<Object, Collection<SamplingFormat>>();\r
- }\r
- };\r
-\r
- public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {\r
- Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();\r
-\r
- List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);\r
- for (SubscriptionItem item : rawItems) {\r
- \r
- // Apros-specific assumption: the unsampled item's id field contains the subscription item's name\r
- String groupItemId = item.id;\r
- // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)\r
- Datatype type = item.format;\r
- // Apros-specific assumption: the unsampled item's formatId subscription item's unit\r
- String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;\r
-\r
- Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);\r
- SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);\r
- for (SubscriptionItem hi : his) {\r
- hi.groupItemId = groupItemId;\r
- result.add(hi);\r
- }\r
- }\r
- return result;\r
- }\r
- \r
- public Collector getCollector() {\r
- return itemCollector.collector;\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.modeling.subscription;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
+import org.eclipse.core.runtime.ILog;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.Datatypes;
+import org.simantics.databoard.accessor.error.AccessorException;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.type.BooleanType;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.databoard.type.NumberType;
+import org.simantics.databoard.type.RecordType;
+import org.simantics.databoard.util.Bean;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.RequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.request.Read;
+import org.simantics.history.Collector;
+import org.simantics.history.History;
+import org.simantics.history.HistoryException;
+import org.simantics.history.HistoryManager;
+import org.simantics.history.ItemManager;
+import org.simantics.history.impl.FileHistory;
+import org.simantics.history.impl.FlushPolicy;
+import org.simantics.history.util.subscription.SamplingFormat;
+import org.simantics.history.util.subscription.SubscriptionItem;
+import org.simantics.scl.runtime.tuple.Tuple3;
+import org.simantics.simulation.Activator;
+import org.simantics.simulation.data.Datasource;
+import org.simantics.simulation.data.DatasourceAdapter;
+import org.simantics.simulation.data.VariableHandle;
+import org.simantics.simulation.experiment.ExperimentState;
+import org.simantics.simulation.experiment.IDynamicExperiment;
+import org.simantics.trend.configuration.TrendSamplingFormats;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.IThreadWorkQueue;
+
+/**
+ * @author Tuukka Lehtonen
+ */
+public class ModelHistoryCollector {
+
+ Logger log = Logger.getLogger( this.getClass().getName() );
+
+ //private static final boolean DEBUG_LOAD = false;
+
+ /**
+ * For IStatus logging.
+ */
+ static final String BUNDLE_ID = "org.simantics.modeling";
+
+ /**
+ * @param g
+ * @param dynamicExperiment
+ * @param logger
+ * @param subscriptionRequestFunction
+ * @param loadCallback
+ * a callback runnable that is invoked every time the
+ * subscription changes
+ * @return
+ * @throws DatabaseException
+ * @throws IOException
+ */
+ public static ModelHistoryCollector createCollector(
+ IDynamicExperiment dynamicExperiment,
+ HistoryManager history,
+ ILog log,
+ Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
+ Runnable loadCallback)
+ throws DatabaseException, IOException
+ {
+ return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);
+ }
+
+ /**
+ * @param g
+ * @param dynamicExperiment
+ * @param logger
+ * @param subscriptionRequestFunction
+ * @param loadCallback
+ * a callback runnable that is invoked every time the
+ * subscription changes
+ * @param loadThread
+ * a possible thread to perform the subscription loads in or
+ * <code>null</code> to use any thread
+ * @return
+ * @throws DatabaseException
+ * @throws IOException
+ */
+ public static ModelHistoryCollector createCollector(
+ IDynamicExperiment dynamicExperiment,
+ HistoryManager history,
+ ILog log,
+ Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
+ Runnable loadCallback,
+ IThreadWorkQueue loadThread)
+ throws DatabaseException, IOException
+ {
+ ModelHistoryCollector data = new ModelHistoryCollector();
+ data.history = history;
+ data.model = dynamicExperiment.getModel();
+ data.dynamicExperiment = dynamicExperiment;
+ data.logger = log;
+ data.subscriptionFunction = subscriptionRequestFunction;
+ data.loadCallback = loadCallback;
+ data.loadThread = loadThread;
+ return data;
+ }
+
+ private Session session;
+ private HistoryManager history;
+ private Resource model;
+ private IDynamicExperiment dynamicExperiment;
+ ILog logger;
+ private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;
+ Runnable loadCallback;
+ IThreadWorkQueue loadThread;
+
+ private VariableSetListener variableListener;
+
+ /**
+ * Released every time {@link #variableListener} either completes reloading
+ * subscriptions or fails with an exception. Used for synchronizing listener-based initialization
+ */
+ Semaphore initMutex = new Semaphore(0);
+
+ public static class ItemCollector {
+
+ /** The experiment to collect the data from. */
+ IDynamicExperiment experiment;
+
+ /** The source of the collected data, retrieved from the experiment. */
+ Datasource source;
+
+ /** History manager that handles stream files */
+ HistoryManager history;
+
+ /** Collectors for each subscription */
+ Collector collector;
+
+ DatasourceAdapter adapter;
+
+ Resource model;
+
+ public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {
+ this.experiment = experiment;
+ this.source = experiment.getDatasource();
+ this.history = history;
+ //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));
+ this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);
+ this.model = model;
+ }
+
+ /**
+ * Load collector with new settings
+ *
+ * @param workarea
+ * @param dynamicExperiment
+ * @param variables
+ * @throws HistoryException
+ */
+ public synchronized void load(List<SubscriptionItem> items)
+ throws HistoryException, DatabaseException {
+
+ if (isDisposed())
+ return;
+
+ ExperimentState oldState = experiment.getState();
+ if (oldState == ExperimentState.RUNNING)
+ experiment.changeState(ExperimentState.STOPPED);
+
+ try {
+ ItemManager im = new ItemManager(items);
+
+ // Create and/or Update collector
+ if (collector==null) collector = History.createCollector(history);
+
+ // Update old items
+ for (SubscriptionItem oldItem : collector.getItems() )
+ {
+ String id = (String) oldItem.getFieldUnchecked("id");
+ Bean newItem = im.get(id);
+ if ( newItem == null ) {
+ oldItem.enabled = false;
+ collector.setItem(oldItem);
+ //collector.removeItem(id);
+ }
+ }
+ // Add new items
+ for (Bean newItem : items) {
+ collector.setItem( newItem );
+ }
+
+ // Create files to history
+ history.modify(items.toArray( new Bean[items.size()] ));
+
+ if (adapter==null) {
+ adapter = new DatasourceAdapter( collector );
+ source.addListener( adapter );
+ } else {
+ adapter.reset();
+ }
+ } finally {
+ experiment.changeState(oldState);
+ }
+ }
+
+ public synchronized void dispose() {
+ if (source!=null && adapter!=null) {
+ source.removeListener(adapter);
+ }
+ if (adapter!=null) adapter.reset();
+ if (collector!=null) collector.close();
+ if (history!=null) history.close();
+ // Mark disposed
+ experiment = null;
+ }
+
+ public HistoryManager getCollector() {
+ return history;
+ }
+
+ public boolean isDisposed() {
+ return experiment == null;
+ }
+
+ }
+
+ final ItemCollector itemCollector = new ItemCollector();
+
+ private ModelHistoryCollector() {
+ }
+
+ /**
+ * @param graph
+ * @param startTime
+ * @throws DatabaseException
+ * @throws HistoryException
+ * @throws InterruptedException
+ */
+ public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {
+ initialize(processor, startTime, false);
+ }
+
+ /**
+ * Initialize is invoked during starting of an experiment (
+ * {@link ExperimentState#INITIALIZING}).
+ *
+ * @param graph
+ * @param startTime
+ * @param listenToVariableSet true to actively listen to changes in the set
+ * of history collected variables.
+ * @throws DatabaseException
+ * @throws HistoryException
+ * @throws InterruptedException
+ */
+ public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {
+ this.session = processor.getSession();
+ itemCollector.init(history, dynamicExperiment, startTime, model);
+ refresh0(processor, listenToVariableSet);
+ }
+
+ /**
+ * @throws HistoryException
+ * @throws DatabaseException
+ * @throws InterruptedException
+ */
+ public void refresh() throws HistoryException, DatabaseException, InterruptedException {
+ refresh0(session, false);
+ }
+
+ /**
+ * @return
+ */
+ public File getWorkarea() {
+ return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;
+ }
+
+ /**
+ * @return
+ */
+ public HistoryManager getHistoryManager() {
+ return itemCollector.history;
+ }
+
+ /**
+ * Queries subscriptions, sets listener, loads the subscriptions.
+ *
+ * @param listenToVariableSet
+ * @throws HistoryException
+ * @throws DatabaseException
+ * @throws InterruptedException
+ */
+ private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {
+ if (listenToVariableSet) {
+ if (variableListener == null) {
+ variableListener = new VariableSetListener(this);
+ initMutex = new Semaphore(0);
+ processor.asyncRequest( subscriptionFunction.get(), variableListener );
+ // Force synchronous initialization.
+ initMutex.acquire();
+ }
+ } else {
+ // Only refresh if there's no listener since the listener
+ // will take care of that.
+ if (variableListener == null) {
+ SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );
+ if (!variables.getStatus().isOK() && logger != null)
+ logger.log(variables.getStatus());
+ itemCollector.load(variables.getSubscriptions());
+ if (loadCallback != null)
+ loadCallback.run();
+ }
+ }
+ }
+
+ /**
+ * Stops history collection and disposes of all related resources. Collected
+ * data is left on disk.
+ */
+ public void dispose() {
+ if (variableListener != null) {
+ variableListener.dispose();
+ variableListener = null;
+ }
+ itemCollector.dispose();
+ }
+
+ /**
+ * Primes the initial subscription by attempting to read everything that is
+ * subscribed once. Any caching and fetching performed by the data session
+ * implementation should at least be initiated by this invocation.
+ *
+ * @param graph
+ * @throws DatabaseException
+ */
+ public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {
+ SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );
+ Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();
+ for (Bean hi : variables.getSubscriptions()) {
+ String variableId = (String) hi.getFieldUnchecked("variableId");
+ variableIds.add( Pair.make(hi, variableId) );
+ }
+
+ Datasource source = dynamicExperiment.getDatasource();
+ for (Pair<Bean, String> ids : variableIds) {
+ Datatype type = source.getType( ids.second );
+ Binding valueBinding = Bindings.getBinding(type);
+ VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);
+ try {
+ handle.getValue();
+ } catch (AccessorException e) {
+ logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));
+ } finally {
+ handle.dispose();
+ }
+ }
+ }
+
+ private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {
+ if ( type instanceof BooleanType ) {
+ Double cacheKey = item.interval;
+ Collection<SamplingFormat> formats = cache.get(cacheKey);
+ if (formats == null) {
+ formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);
+ cache.put(cacheKey, formats);
+ }
+ return formats;
+ } else if ( type instanceof NumberType ) {
+ Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);
+ Collection<SamplingFormat> formats = cache.get(cacheKey);
+ if (formats == null) {
+ formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);
+ cache.put(cacheKey, formats);
+ }
+ return formats;
+ }
+
+ SamplingFormat format = new SamplingFormat();
+ format.formatId = "custom";
+ RecordType f = new RecordType();
+ format.format = f;
+ f.addComponent("time", Datatypes.DOUBLE);
+ f.addComponent("endTime", Datatypes.DOUBLE);
+ f.addComponent("quality", Datatypes.BYTE);
+ f.addComponent("value", type);
+
+ format.interval = item.interval;
+ format.deadband = item.deadband;
+ return Collections.singleton(format);
+ }
+
+ private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {
+ @Override
+ protected Map<Object, Collection<SamplingFormat>> initialValue() {
+ return new WeakHashMap<Object, Collection<SamplingFormat>>();
+ }
+ };
+
+ public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {
+ Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();
+
+ List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);
+ for (SubscriptionItem item : rawItems) {
+
+ // Apros-specific assumption: the unsampled item's id field contains the subscription item's name
+ String groupItemId = item.id;
+ // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)
+ Datatype type = item.format;
+ // Apros-specific assumption: the unsampled item's formatId subscription item's unit
+ String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;
+
+ Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);
+ SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);
+ for (SubscriptionItem hi : his) {
+ hi.groupItemId = groupItemId;
+ result.add(hi);
+ }
+ }
+ return result;
+ }
+
+ public Collector getCollector() {
+ return itemCollector.collector;
+ }
+
+}