--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.modeling.subscription;\r
+\r
+import java.io.File;\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.WeakHashMap;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.function.Supplier;\r
+import java.util.logging.Logger;\r
+\r
+import org.eclipse.core.runtime.ILog;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.Status;\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.Datatypes;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.type.BooleanType;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.databoard.type.NumberType;\r
+import org.simantics.databoard.type.RecordType;\r
+import org.simantics.databoard.util.Bean;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.RequestProcessor;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.request.Read;\r
+import org.simantics.history.Collector;\r
+import org.simantics.history.History;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.HistoryManager;\r
+import org.simantics.history.ItemManager;\r
+import org.simantics.history.impl.FileHistory;\r
+import org.simantics.history.impl.FlushPolicy;\r
+import org.simantics.history.util.subscription.SamplingFormat;\r
+import org.simantics.history.util.subscription.SubscriptionItem;\r
+import org.simantics.scl.runtime.tuple.Tuple3;\r
+import org.simantics.simulation.Activator;\r
+import org.simantics.simulation.data.Datasource;\r
+import org.simantics.simulation.data.DatasourceAdapter;\r
+import org.simantics.simulation.data.VariableHandle;\r
+import org.simantics.simulation.experiment.ExperimentState;\r
+import org.simantics.simulation.experiment.IDynamicExperiment;\r
+import org.simantics.trend.configuration.TrendSamplingFormats;\r
+import org.simantics.utils.datastructures.Pair;\r
+import org.simantics.utils.threads.IThreadWorkQueue;\r
+\r
+/**\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public class ModelHistoryCollector {\r
+\r
+ Logger log = Logger.getLogger( this.getClass().getName() );\r
+\r
+ //private static final boolean DEBUG_LOAD = false;\r
+\r
+ /**\r
+ * For IStatus logging.\r
+ */\r
+ static final String BUNDLE_ID = "org.simantics.modeling";\r
+\r
+ /**\r
+ * @param g\r
+ * @param dynamicExperiment\r
+ * @param logger\r
+ * @param subscriptionRequestFunction\r
+ * @param loadCallback\r
+ * a callback runnable that is invoked every time the\r
+ * subscription changes\r
+ * @return\r
+ * @throws DatabaseException\r
+ * @throws IOException\r
+ */\r
+ public static ModelHistoryCollector createCollector(\r
+ IDynamicExperiment dynamicExperiment,\r
+ HistoryManager history,\r
+ ILog log,\r
+ Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
+ Runnable loadCallback)\r
+ throws DatabaseException, IOException\r
+ {\r
+ return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);\r
+ }\r
+\r
+ /**\r
+ * @param g\r
+ * @param dynamicExperiment\r
+ * @param logger\r
+ * @param subscriptionRequestFunction\r
+ * @param loadCallback\r
+ * a callback runnable that is invoked every time the\r
+ * subscription changes\r
+ * @param loadThread\r
+ * a possible thread to perform the subscription loads in or\r
+ * <code>null</code> to use any thread\r
+ * @return\r
+ * @throws DatabaseException\r
+ * @throws IOException\r
+ */\r
+ public static ModelHistoryCollector createCollector(\r
+ IDynamicExperiment dynamicExperiment,\r
+ HistoryManager history,\r
+ ILog log,\r
+ Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
+ Runnable loadCallback,\r
+ IThreadWorkQueue loadThread)\r
+ throws DatabaseException, IOException\r
+ {\r
+ ModelHistoryCollector data = new ModelHistoryCollector();\r
+ data.history = history;\r
+ data.model = dynamicExperiment.getModel();\r
+ data.dynamicExperiment = dynamicExperiment;\r
+ data.logger = log;\r
+ data.subscriptionFunction = subscriptionRequestFunction;\r
+ data.loadCallback = loadCallback;\r
+ data.loadThread = loadThread;\r
+ return data;\r
+ }\r
+\r
+ private Session session;\r
+ private HistoryManager history;\r
+ private Resource model;\r
+ private IDynamicExperiment dynamicExperiment;\r
+ ILog logger;\r
+ private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;\r
+ Runnable loadCallback;\r
+ IThreadWorkQueue loadThread;\r
+\r
+ private VariableSetListener variableListener;\r
+\r
+ /**\r
+ * Released every time {@link #variableListener} either completes reloading\r
+ * subscriptions or fails with an exception. Used for synchronizing listener-based initialization \r
+ */\r
+ Semaphore initMutex = new Semaphore(0);\r
+\r
+ public static class ItemCollector {\r
+\r
+ /** The experiment to collect the data from. */\r
+ IDynamicExperiment experiment;\r
+\r
+ /** The source of the collected data, retrieved from the experiment. */\r
+ Datasource source;\r
+\r
+ /** History manager that handles stream files */\r
+ HistoryManager history;\r
+\r
+ /** Collectors for each subscription */\r
+ Collector collector;\r
+\r
+ DatasourceAdapter adapter;\r
+\r
+ Resource model;\r
+\r
+ public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {\r
+ this.experiment = experiment;\r
+ this.source = experiment.getDatasource();\r
+ this.history = history; \r
+ //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));\r
+ this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); \r
+ this.model = model;\r
+ }\r
+\r
+ /**\r
+ * Load collector with new settings\r
+ * \r
+ * @param workarea \r
+ * @param dynamicExperiment\r
+ * @param variables\r
+ * @throws HistoryException \r
+ */\r
+ public synchronized void load(List<SubscriptionItem> items)\r
+ throws HistoryException, DatabaseException {\r
+\r
+ if (isDisposed())\r
+ return;\r
+\r
+ ExperimentState oldState = experiment.getState();\r
+ if (oldState == ExperimentState.RUNNING)\r
+ experiment.changeState(ExperimentState.STOPPED);\r
+\r
+ try {\r
+ ItemManager im = new ItemManager(items);\r
+ \r
+ // Create and/or Update collector\r
+ if (collector==null) collector = History.createCollector(history);\r
+ \r
+ // Update old items\r
+ for (SubscriptionItem oldItem : collector.getItems() )\r
+ {\r
+ String id = (String) oldItem.getFieldUnchecked("id");\r
+ Bean newItem = im.get(id);\r
+ if ( newItem == null ) {\r
+ oldItem.enabled = false;\r
+ collector.setItem(oldItem);\r
+ //collector.removeItem(id);\r
+ }\r
+ } \r
+ // Add new items\r
+ for (Bean newItem : items) {\r
+ collector.setItem( newItem );\r
+ }\r
+ \r
+ // Create files to history\r
+ history.modify(items.toArray( new Bean[items.size()] ));\r
+ \r
+ if (adapter==null) {\r
+ adapter = new DatasourceAdapter( collector );\r
+ source.addListener( adapter );\r
+ } else {\r
+ adapter.reset();\r
+ }\r
+ } finally {\r
+ experiment.changeState(oldState);\r
+ }\r
+ }\r
+\r
+ public synchronized void dispose() {\r
+ if (source!=null && adapter!=null) {\r
+ source.removeListener(adapter);\r
+ }\r
+ if (adapter!=null) adapter.reset();\r
+ if (collector!=null) collector.close();\r
+ if (history!=null) history.close();\r
+ // Mark disposed\r
+ experiment = null;\r
+ }\r
+\r
+ public HistoryManager getCollector() {\r
+ return history;\r
+ }\r
+\r
+ public boolean isDisposed() {\r
+ return experiment == null;\r
+ }\r
+\r
+ }\r
+\r
+ final ItemCollector itemCollector = new ItemCollector();\r
+\r
+ private ModelHistoryCollector() {\r
+ }\r
+\r
+ /**\r
+ * @param graph\r
+ * @param startTime\r
+ * @throws DatabaseException\r
+ * @throws HistoryException\r
+ * @throws InterruptedException\r
+ */\r
+ public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {\r
+ initialize(processor, startTime, false);\r
+ }\r
+\r
+ /**\r
+ * Initialize is invoked during starting of an experiment (\r
+ * {@link ExperimentState#INITIALIZING}).\r
+ * \r
+ * @param graph\r
+ * @param startTime\r
+ * @param listenToVariableSet true to actively listen to changes in the set\r
+ * of history collected variables.\r
+ * @throws DatabaseException\r
+ * @throws HistoryException\r
+ * @throws InterruptedException\r
+ */\r
+ public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {\r
+ this.session = processor.getSession();\r
+ itemCollector.init(history, dynamicExperiment, startTime, model);\r
+ refresh0(processor, listenToVariableSet);\r
+ }\r
+\r
+ /**\r
+ * @throws HistoryException\r
+ * @throws DatabaseException\r
+ * @throws InterruptedException \r
+ */\r
+ public void refresh() throws HistoryException, DatabaseException, InterruptedException {\r
+ refresh0(session, false);\r
+ }\r
+\r
+ /**\r
+ * @return\r
+ */\r
+ public File getWorkarea() {\r
+ return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;\r
+ }\r
+\r
+ /**\r
+ * @return\r
+ */\r
+ public HistoryManager getHistoryManager() {\r
+ return itemCollector.history;\r
+ }\r
+\r
+ /**\r
+ * Queries subscriptions, sets listener, loads the subscriptions. \r
+ * \r
+ * @param listenToVariableSet \r
+ * @throws HistoryException\r
+ * @throws DatabaseException\r
+ * @throws InterruptedException \r
+ */\r
+ private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {\r
+ if (listenToVariableSet) {\r
+ if (variableListener == null) {\r
+ variableListener = new VariableSetListener(this);\r
+ initMutex = new Semaphore(0);\r
+ processor.asyncRequest( subscriptionFunction.get(), variableListener );\r
+ // Force synchronous initialization.\r
+ initMutex.acquire();\r
+ }\r
+ } else {\r
+ // Only refresh if there's no listener since the listener\r
+ // will take care of that.\r
+ if (variableListener == null) {\r
+ SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );\r
+ if (!variables.getStatus().isOK() && logger != null)\r
+ logger.log(variables.getStatus());\r
+ itemCollector.load(variables.getSubscriptions());\r
+ if (loadCallback != null)\r
+ loadCallback.run();\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Stops history collection and disposes of all related resources. Collected\r
+ * data is left on disk.\r
+ */\r
+ public void dispose() {\r
+ if (variableListener != null) {\r
+ variableListener.dispose();\r
+ variableListener = null;\r
+ }\r
+ itemCollector.dispose();\r
+ }\r
+\r
+ /**\r
+ * Primes the initial subscription by attempting to read everything that is\r
+ * subscribed once. Any caching and fetching performed by the data session\r
+ * implementation should at least be initiated by this invocation.\r
+ * \r
+ * @param graph\r
+ * @throws DatabaseException\r
+ */\r
+ public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {\r
+ SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );\r
+ Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();\r
+ for (Bean hi : variables.getSubscriptions()) {\r
+ String variableId = (String) hi.getFieldUnchecked("variableId");\r
+ variableIds.add( Pair.make(hi, variableId) );\r
+ }\r
+ \r
+ Datasource source = dynamicExperiment.getDatasource();\r
+ for (Pair<Bean, String> ids : variableIds) {\r
+ Datatype type = source.getType( ids.second );\r
+ Binding valueBinding = Bindings.getBinding(type);\r
+ VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);\r
+ try {\r
+ handle.getValue();\r
+ } catch (AccessorException e) {\r
+ logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));\r
+ } finally {\r
+ handle.dispose();\r
+ }\r
+ }\r
+ }\r
+\r
+ private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {\r
+ if ( type instanceof BooleanType ) {\r
+ Double cacheKey = item.interval;\r
+ Collection<SamplingFormat> formats = cache.get(cacheKey);\r
+ if (formats == null) {\r
+ formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);\r
+ cache.put(cacheKey, formats);\r
+ }\r
+ return formats;\r
+ } else if ( type instanceof NumberType ) {\r
+ Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);\r
+ Collection<SamplingFormat> formats = cache.get(cacheKey);\r
+ if (formats == null) {\r
+ formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);\r
+ cache.put(cacheKey, formats);\r
+ }\r
+ return formats;\r
+ }\r
+\r
+ SamplingFormat format = new SamplingFormat();\r
+ format.formatId = "custom";\r
+ RecordType f = new RecordType();\r
+ format.format = f;\r
+ f.addComponent("time", Datatypes.DOUBLE);\r
+ f.addComponent("endTime", Datatypes.DOUBLE);\r
+ f.addComponent("quality", Datatypes.BYTE);\r
+ f.addComponent("value", type);\r
+\r
+ format.interval = item.interval;\r
+ format.deadband = item.deadband;\r
+ return Collections.singleton(format);\r
+ }\r
+ \r
+ private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {\r
+ @Override\r
+ protected Map<Object, Collection<SamplingFormat>> initialValue() {\r
+ return new WeakHashMap<Object, Collection<SamplingFormat>>();\r
+ }\r
+ };\r
+\r
+ public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {\r
+ Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();\r
+\r
+ List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);\r
+ for (SubscriptionItem item : rawItems) {\r
+ \r
+ // Apros-specific assumption: the unsampled item's id field contains the subscription item's name\r
+ String groupItemId = item.id;\r
+ // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)\r
+ Datatype type = item.format;\r
+ // Apros-specific assumption: the unsampled item's formatId subscription item's unit\r
+ String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;\r
+\r
+ Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);\r
+ SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);\r
+ for (SubscriptionItem hi : his) {\r
+ hi.groupItemId = groupItemId;\r
+ result.add(hi);\r
+ }\r
+ }\r
+ return result;\r
+ }\r
+ \r
+ public Collector getCollector() {\r
+ return itemCollector.collector;\r
+ }\r
+\r
+}\r