]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.modeling / src / org / simantics / modeling / subscription / ModelHistoryCollector.java
diff --git a/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java b/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java
new file mode 100644 (file)
index 0000000..952a1e7
--- /dev/null
@@ -0,0 +1,455 @@
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.modeling.subscription;\r
+\r
+import java.io.File;\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.WeakHashMap;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.function.Supplier;\r
+import java.util.logging.Logger;\r
+\r
+import org.eclipse.core.runtime.ILog;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.Status;\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.Datatypes;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.type.BooleanType;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.databoard.type.NumberType;\r
+import org.simantics.databoard.type.RecordType;\r
+import org.simantics.databoard.util.Bean;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.RequestProcessor;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.request.Read;\r
+import org.simantics.history.Collector;\r
+import org.simantics.history.History;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.HistoryManager;\r
+import org.simantics.history.ItemManager;\r
+import org.simantics.history.impl.FileHistory;\r
+import org.simantics.history.impl.FlushPolicy;\r
+import org.simantics.history.util.subscription.SamplingFormat;\r
+import org.simantics.history.util.subscription.SubscriptionItem;\r
+import org.simantics.scl.runtime.tuple.Tuple3;\r
+import org.simantics.simulation.Activator;\r
+import org.simantics.simulation.data.Datasource;\r
+import org.simantics.simulation.data.DatasourceAdapter;\r
+import org.simantics.simulation.data.VariableHandle;\r
+import org.simantics.simulation.experiment.ExperimentState;\r
+import org.simantics.simulation.experiment.IDynamicExperiment;\r
+import org.simantics.trend.configuration.TrendSamplingFormats;\r
+import org.simantics.utils.datastructures.Pair;\r
+import org.simantics.utils.threads.IThreadWorkQueue;\r
+\r
+/**\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public class ModelHistoryCollector {\r
+\r
+    Logger log = Logger.getLogger( this.getClass().getName() );\r
+\r
+    //private static final boolean DEBUG_LOAD = false;\r
+\r
+    /**\r
+     * For IStatus logging.\r
+     */\r
+    static final String BUNDLE_ID = "org.simantics.modeling";\r
+\r
+    /**\r
+     * @param g\r
+     * @param dynamicExperiment\r
+     * @param logger\r
+     * @param subscriptionRequestFunction\r
+     * @param loadCallback\r
+     *            a callback runnable that is invoked every time the\r
+     *            subscription changes\r
+     * @return\r
+     * @throws DatabaseException\r
+     * @throws IOException\r
+     */\r
+    public static ModelHistoryCollector createCollector(\r
+            IDynamicExperiment dynamicExperiment,\r
+            HistoryManager history,\r
+            ILog log,\r
+            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
+            Runnable loadCallback)\r
+            throws DatabaseException, IOException\r
+    {\r
+        return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);\r
+    }\r
+\r
+    /**\r
+     * @param g\r
+     * @param dynamicExperiment\r
+     * @param logger\r
+     * @param subscriptionRequestFunction\r
+     * @param loadCallback\r
+     *            a callback runnable that is invoked every time the\r
+     *            subscription changes\r
+     * @param loadThread\r
+     *            a possible thread to perform the subscription loads in or\r
+     *            <code>null</code> to use any thread\r
+     * @return\r
+     * @throws DatabaseException\r
+     * @throws IOException\r
+     */\r
+    public static ModelHistoryCollector createCollector(\r
+            IDynamicExperiment dynamicExperiment,\r
+            HistoryManager history,\r
+            ILog log,\r
+            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
+            Runnable loadCallback,\r
+            IThreadWorkQueue loadThread)\r
+            throws DatabaseException, IOException\r
+    {\r
+        ModelHistoryCollector data = new ModelHistoryCollector();\r
+        data.history = history;\r
+        data.model = dynamicExperiment.getModel();\r
+        data.dynamicExperiment = dynamicExperiment;\r
+        data.logger = log;\r
+        data.subscriptionFunction = subscriptionRequestFunction;\r
+        data.loadCallback = loadCallback;\r
+        data.loadThread = loadThread;\r
+        return data;\r
+    }\r
+\r
+    private Session                                      session;\r
+    private HistoryManager                               history;\r
+    private Resource                                     model;\r
+    private IDynamicExperiment                           dynamicExperiment;\r
+    ILog                                         logger;\r
+    private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;\r
+    Runnable                                     loadCallback;\r
+    IThreadWorkQueue                             loadThread;\r
+\r
+    private VariableSetListener                          variableListener;\r
+\r
+    /**\r
+     * Released every time {@link #variableListener} either completes reloading\r
+     * subscriptions or fails with an exception. Used for synchronizing listener-based initialization \r
+     */\r
+    Semaphore                                    initMutex       = new Semaphore(0);\r
+\r
+    public static class ItemCollector {\r
+\r
+        /** The experiment to collect the data from. */\r
+        IDynamicExperiment experiment;\r
+\r
+        /** The source of the collected data, retrieved from the experiment. */\r
+        Datasource         source;\r
+\r
+        /** History manager that handles stream files */\r
+        HistoryManager     history;\r
+\r
+        /** Collectors for each subscription */\r
+        Collector          collector;\r
+\r
+        DatasourceAdapter  adapter;\r
+\r
+        Resource           model;\r
+\r
+        public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {\r
+            this.experiment = experiment;\r
+            this.source = experiment.getDatasource();\r
+            this.history = history; \r
+            //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));\r
+            this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);            \r
+            this.model = model;\r
+        }\r
+\r
+        /**\r
+         * Load collector with new settings\r
+         * \r
+         * @param workarea \r
+         * @param dynamicExperiment\r
+         * @param variables\r
+         * @throws HistoryException \r
+         */\r
+        public synchronized void load(List<SubscriptionItem> items)\r
+                throws HistoryException, DatabaseException {\r
+\r
+            if (isDisposed())\r
+                return;\r
+\r
+            ExperimentState oldState = experiment.getState();\r
+            if (oldState == ExperimentState.RUNNING)\r
+                experiment.changeState(ExperimentState.STOPPED);\r
+\r
+            try {\r
+               ItemManager im = new ItemManager(items);\r
+               \r
+               // Create and/or Update collector\r
+                if (collector==null) collector = History.createCollector(history);\r
+                \r
+                // Update old items\r
+               for (SubscriptionItem oldItem : collector.getItems() )\r
+               {\r
+                       String id = (String) oldItem.getFieldUnchecked("id");\r
+                       Bean newItem = im.get(id);\r
+                       if ( newItem == null ) {\r
+                               oldItem.enabled = false;\r
+                               collector.setItem(oldItem);\r
+                               //collector.removeItem(id);\r
+                       }\r
+               }    \r
+               // Add new items\r
+               for (Bean newItem : items) {\r
+                       collector.setItem( newItem );\r
+               }\r
+               \r
+               // Create files to history\r
+               history.modify(items.toArray( new Bean[items.size()] ));\r
+               \r
+                if (adapter==null) {\r
+                       adapter = new DatasourceAdapter( collector );\r
+                    source.addListener( adapter );\r
+                } else {\r
+                       adapter.reset();\r
+                }\r
+            } finally {\r
+                experiment.changeState(oldState);\r
+            }\r
+        }\r
+\r
+        public synchronized void dispose() {\r
+               if (source!=null && adapter!=null) {\r
+                       source.removeListener(adapter);\r
+               }\r
+               if (adapter!=null) adapter.reset();\r
+               if (collector!=null) collector.close();\r
+               if (history!=null) history.close();\r
+            // Mark disposed\r
+            experiment = null;\r
+        }\r
+\r
+        public HistoryManager getCollector() {\r
+            return history;\r
+        }\r
+\r
+        public boolean isDisposed() {\r
+            return experiment == null;\r
+        }\r
+\r
+    }\r
+\r
+    final ItemCollector itemCollector = new ItemCollector();\r
+\r
+    private ModelHistoryCollector() {\r
+    }\r
+\r
+    /**\r
+     * @param graph\r
+     * @param startTime\r
+     * @throws DatabaseException\r
+     * @throws HistoryException\r
+     * @throws InterruptedException\r
+     */\r
+    public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {\r
+        initialize(processor, startTime, false);\r
+    }\r
+\r
+    /**\r
+     * Initialize is invoked during starting of an experiment (\r
+     * {@link ExperimentState#INITIALIZING}).\r
+     * \r
+     * @param graph\r
+     * @param startTime\r
+     * @param listenToVariableSet true to actively listen to changes in the set\r
+     *        of history collected variables.\r
+     * @throws DatabaseException\r
+     * @throws HistoryException\r
+     * @throws InterruptedException\r
+     */\r
+    public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {\r
+        this.session = processor.getSession();\r
+        itemCollector.init(history, dynamicExperiment, startTime, model);\r
+        refresh0(processor, listenToVariableSet);\r
+    }\r
+\r
+    /**\r
+     * @throws HistoryException\r
+     * @throws DatabaseException\r
+     * @throws InterruptedException \r
+     */\r
+    public void refresh() throws HistoryException, DatabaseException, InterruptedException {\r
+        refresh0(session, false);\r
+    }\r
+\r
+    /**\r
+     * @return\r
+     */\r
+    public File getWorkarea() {\r
+        return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;\r
+    }\r
+\r
+    /**\r
+     * @return\r
+     */\r
+    public HistoryManager getHistoryManager() {\r
+        return itemCollector.history;\r
+    }\r
+\r
+    /**\r
+     * Queries subscriptions, sets listener, loads the subscriptions. \r
+     * \r
+     * @param listenToVariableSet \r
+     * @throws HistoryException\r
+     * @throws DatabaseException\r
+     * @throws InterruptedException \r
+     */\r
+    private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {\r
+        if (listenToVariableSet) {\r
+            if (variableListener == null) {\r
+                variableListener = new VariableSetListener(this);\r
+                initMutex = new Semaphore(0);\r
+                processor.asyncRequest( subscriptionFunction.get(), variableListener );\r
+                // Force synchronous initialization.\r
+                initMutex.acquire();\r
+            }\r
+        } else {\r
+            // Only refresh if there's no listener since the listener\r
+            // will take care of that.\r
+            if (variableListener == null) {\r
+                SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );\r
+                if (!variables.getStatus().isOK() && logger != null)\r
+                    logger.log(variables.getStatus());\r
+                itemCollector.load(variables.getSubscriptions());\r
+                if (loadCallback != null)\r
+                    loadCallback.run();\r
+            }\r
+        }\r
+    }\r
+\r
+    /**\r
+     * Stops history collection and disposes of all related resources. Collected\r
+     * data is left on disk.\r
+     */\r
+    public void dispose() {\r
+        if (variableListener != null) {\r
+            variableListener.dispose();\r
+            variableListener = null;\r
+        }\r
+        itemCollector.dispose();\r
+    }\r
+\r
+    /**\r
+     * Primes the initial subscription by attempting to read everything that is\r
+     * subscribed once. Any caching and fetching performed by the data session\r
+     * implementation should at least be initiated by this invocation.\r
+     * \r
+     * @param graph\r
+     * @throws DatabaseException\r
+     */\r
+    public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {\r
+        SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );\r
+        Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();\r
+        for (Bean hi : variables.getSubscriptions()) {\r
+               String variableId = (String) hi.getFieldUnchecked("variableId");\r
+               variableIds.add( Pair.make(hi, variableId) );\r
+        }\r
+        \r
+        Datasource source = dynamicExperiment.getDatasource();\r
+        for (Pair<Bean, String> ids : variableIds) {\r
+            Datatype type = source.getType( ids.second );\r
+            Binding valueBinding = Bindings.getBinding(type);\r
+            VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);\r
+            try {\r
+                handle.getValue();\r
+            } catch (AccessorException e) {\r
+               logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));\r
+            } finally {\r
+                handle.dispose();\r
+            }\r
+        }\r
+    }\r
+\r
+    private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {\r
+        if ( type instanceof BooleanType ) {\r
+            Double cacheKey = item.interval;\r
+            Collection<SamplingFormat> formats = cache.get(cacheKey);\r
+            if (formats == null) {\r
+                formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);\r
+                cache.put(cacheKey, formats);\r
+            }\r
+            return formats;\r
+        } else if ( type instanceof NumberType ) {\r
+            Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);\r
+            Collection<SamplingFormat> formats = cache.get(cacheKey);\r
+            if (formats == null) {\r
+                formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);\r
+                cache.put(cacheKey, formats);\r
+            }\r
+            return formats;\r
+        }\r
+\r
+        SamplingFormat format = new SamplingFormat();\r
+        format.formatId = "custom";\r
+        RecordType f = new RecordType();\r
+        format.format = f;\r
+        f.addComponent("time", Datatypes.DOUBLE);\r
+        f.addComponent("endTime", Datatypes.DOUBLE);\r
+        f.addComponent("quality", Datatypes.BYTE);\r
+        f.addComponent("value", type);\r
+\r
+        format.interval = item.interval;\r
+        format.deadband = item.deadband;\r
+        return Collections.singleton(format);\r
+    }\r
+    \r
+    private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {\r
+        @Override\r
+        protected Map<Object, Collection<SamplingFormat>> initialValue() {\r
+            return new WeakHashMap<Object, Collection<SamplingFormat>>();\r
+        }\r
+    };\r
+\r
+    public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {\r
+        Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();\r
+\r
+        List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);\r
+        for (SubscriptionItem item : rawItems) {\r
+               \r
+            // Apros-specific assumption: the unsampled item's id field contains the subscription item's name\r
+            String groupItemId = item.id;\r
+            // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)\r
+            Datatype type = item.format;\r
+            // Apros-specific assumption: the unsampled item's formatId subscription item's unit\r
+            String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;\r
+\r
+            Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);\r
+            SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);\r
+            for (SubscriptionItem hi : his) {\r
+                hi.groupItemId = groupItemId;\r
+                result.add(hi);\r
+            }\r
+        }\r
+        return result;\r
+    }\r
+    \r
+    public Collector getCollector() {\r
+               return itemCollector.collector;\r
+       }\r
+\r
+}\r