]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.modeling / src / org / simantics / modeling / subscription / ModelHistoryCollector.java
index 952a1e747b921b6ef384a7a380c03213730aacc3..27c840467a96a9d56bbf17968b2d136cc848de7c 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2011 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.modeling.subscription;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashSet;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.WeakHashMap;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.function.Supplier;\r
-import java.util.logging.Logger;\r
-\r
-import org.eclipse.core.runtime.ILog;\r
-import org.eclipse.core.runtime.IStatus;\r
-import org.eclipse.core.runtime.Status;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.Datatypes;\r
-import org.simantics.databoard.accessor.error.AccessorException;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.type.BooleanType;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.databoard.type.NumberType;\r
-import org.simantics.databoard.type.RecordType;\r
-import org.simantics.databoard.util.Bean;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.history.Collector;\r
-import org.simantics.history.History;\r
-import org.simantics.history.HistoryException;\r
-import org.simantics.history.HistoryManager;\r
-import org.simantics.history.ItemManager;\r
-import org.simantics.history.impl.FileHistory;\r
-import org.simantics.history.impl.FlushPolicy;\r
-import org.simantics.history.util.subscription.SamplingFormat;\r
-import org.simantics.history.util.subscription.SubscriptionItem;\r
-import org.simantics.scl.runtime.tuple.Tuple3;\r
-import org.simantics.simulation.Activator;\r
-import org.simantics.simulation.data.Datasource;\r
-import org.simantics.simulation.data.DatasourceAdapter;\r
-import org.simantics.simulation.data.VariableHandle;\r
-import org.simantics.simulation.experiment.ExperimentState;\r
-import org.simantics.simulation.experiment.IDynamicExperiment;\r
-import org.simantics.trend.configuration.TrendSamplingFormats;\r
-import org.simantics.utils.datastructures.Pair;\r
-import org.simantics.utils.threads.IThreadWorkQueue;\r
-\r
-/**\r
- * @author Tuukka Lehtonen\r
- */\r
-public class ModelHistoryCollector {\r
-\r
-    Logger log = Logger.getLogger( this.getClass().getName() );\r
-\r
-    //private static final boolean DEBUG_LOAD = false;\r
-\r
-    /**\r
-     * For IStatus logging.\r
-     */\r
-    static final String BUNDLE_ID = "org.simantics.modeling";\r
-\r
-    /**\r
-     * @param g\r
-     * @param dynamicExperiment\r
-     * @param logger\r
-     * @param subscriptionRequestFunction\r
-     * @param loadCallback\r
-     *            a callback runnable that is invoked every time the\r
-     *            subscription changes\r
-     * @return\r
-     * @throws DatabaseException\r
-     * @throws IOException\r
-     */\r
-    public static ModelHistoryCollector createCollector(\r
-            IDynamicExperiment dynamicExperiment,\r
-            HistoryManager history,\r
-            ILog log,\r
-            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
-            Runnable loadCallback)\r
-            throws DatabaseException, IOException\r
-    {\r
-        return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);\r
-    }\r
-\r
-    /**\r
-     * @param g\r
-     * @param dynamicExperiment\r
-     * @param logger\r
-     * @param subscriptionRequestFunction\r
-     * @param loadCallback\r
-     *            a callback runnable that is invoked every time the\r
-     *            subscription changes\r
-     * @param loadThread\r
-     *            a possible thread to perform the subscription loads in or\r
-     *            <code>null</code> to use any thread\r
-     * @return\r
-     * @throws DatabaseException\r
-     * @throws IOException\r
-     */\r
-    public static ModelHistoryCollector createCollector(\r
-            IDynamicExperiment dynamicExperiment,\r
-            HistoryManager history,\r
-            ILog log,\r
-            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,\r
-            Runnable loadCallback,\r
-            IThreadWorkQueue loadThread)\r
-            throws DatabaseException, IOException\r
-    {\r
-        ModelHistoryCollector data = new ModelHistoryCollector();\r
-        data.history = history;\r
-        data.model = dynamicExperiment.getModel();\r
-        data.dynamicExperiment = dynamicExperiment;\r
-        data.logger = log;\r
-        data.subscriptionFunction = subscriptionRequestFunction;\r
-        data.loadCallback = loadCallback;\r
-        data.loadThread = loadThread;\r
-        return data;\r
-    }\r
-\r
-    private Session                                      session;\r
-    private HistoryManager                               history;\r
-    private Resource                                     model;\r
-    private IDynamicExperiment                           dynamicExperiment;\r
-    ILog                                         logger;\r
-    private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;\r
-    Runnable                                     loadCallback;\r
-    IThreadWorkQueue                             loadThread;\r
-\r
-    private VariableSetListener                          variableListener;\r
-\r
-    /**\r
-     * Released every time {@link #variableListener} either completes reloading\r
-     * subscriptions or fails with an exception. Used for synchronizing listener-based initialization \r
-     */\r
-    Semaphore                                    initMutex       = new Semaphore(0);\r
-\r
-    public static class ItemCollector {\r
-\r
-        /** The experiment to collect the data from. */\r
-        IDynamicExperiment experiment;\r
-\r
-        /** The source of the collected data, retrieved from the experiment. */\r
-        Datasource         source;\r
-\r
-        /** History manager that handles stream files */\r
-        HistoryManager     history;\r
-\r
-        /** Collectors for each subscription */\r
-        Collector          collector;\r
-\r
-        DatasourceAdapter  adapter;\r
-\r
-        Resource           model;\r
-\r
-        public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {\r
-            this.experiment = experiment;\r
-            this.source = experiment.getDatasource();\r
-            this.history = history; \r
-            //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));\r
-            this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);            \r
-            this.model = model;\r
-        }\r
-\r
-        /**\r
-         * Load collector with new settings\r
-         * \r
-         * @param workarea \r
-         * @param dynamicExperiment\r
-         * @param variables\r
-         * @throws HistoryException \r
-         */\r
-        public synchronized void load(List<SubscriptionItem> items)\r
-                throws HistoryException, DatabaseException {\r
-\r
-            if (isDisposed())\r
-                return;\r
-\r
-            ExperimentState oldState = experiment.getState();\r
-            if (oldState == ExperimentState.RUNNING)\r
-                experiment.changeState(ExperimentState.STOPPED);\r
-\r
-            try {\r
-               ItemManager im = new ItemManager(items);\r
-               \r
-               // Create and/or Update collector\r
-                if (collector==null) collector = History.createCollector(history);\r
-                \r
-                // Update old items\r
-               for (SubscriptionItem oldItem : collector.getItems() )\r
-               {\r
-                       String id = (String) oldItem.getFieldUnchecked("id");\r
-                       Bean newItem = im.get(id);\r
-                       if ( newItem == null ) {\r
-                               oldItem.enabled = false;\r
-                               collector.setItem(oldItem);\r
-                               //collector.removeItem(id);\r
-                       }\r
-               }    \r
-               // Add new items\r
-               for (Bean newItem : items) {\r
-                       collector.setItem( newItem );\r
-               }\r
-               \r
-               // Create files to history\r
-               history.modify(items.toArray( new Bean[items.size()] ));\r
-               \r
-                if (adapter==null) {\r
-                       adapter = new DatasourceAdapter( collector );\r
-                    source.addListener( adapter );\r
-                } else {\r
-                       adapter.reset();\r
-                }\r
-            } finally {\r
-                experiment.changeState(oldState);\r
-            }\r
-        }\r
-\r
-        public synchronized void dispose() {\r
-               if (source!=null && adapter!=null) {\r
-                       source.removeListener(adapter);\r
-               }\r
-               if (adapter!=null) adapter.reset();\r
-               if (collector!=null) collector.close();\r
-               if (history!=null) history.close();\r
-            // Mark disposed\r
-            experiment = null;\r
-        }\r
-\r
-        public HistoryManager getCollector() {\r
-            return history;\r
-        }\r
-\r
-        public boolean isDisposed() {\r
-            return experiment == null;\r
-        }\r
-\r
-    }\r
-\r
-    final ItemCollector itemCollector = new ItemCollector();\r
-\r
-    private ModelHistoryCollector() {\r
-    }\r
-\r
-    /**\r
-     * @param graph\r
-     * @param startTime\r
-     * @throws DatabaseException\r
-     * @throws HistoryException\r
-     * @throws InterruptedException\r
-     */\r
-    public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {\r
-        initialize(processor, startTime, false);\r
-    }\r
-\r
-    /**\r
-     * Initialize is invoked during starting of an experiment (\r
-     * {@link ExperimentState#INITIALIZING}).\r
-     * \r
-     * @param graph\r
-     * @param startTime\r
-     * @param listenToVariableSet true to actively listen to changes in the set\r
-     *        of history collected variables.\r
-     * @throws DatabaseException\r
-     * @throws HistoryException\r
-     * @throws InterruptedException\r
-     */\r
-    public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {\r
-        this.session = processor.getSession();\r
-        itemCollector.init(history, dynamicExperiment, startTime, model);\r
-        refresh0(processor, listenToVariableSet);\r
-    }\r
-\r
-    /**\r
-     * @throws HistoryException\r
-     * @throws DatabaseException\r
-     * @throws InterruptedException \r
-     */\r
-    public void refresh() throws HistoryException, DatabaseException, InterruptedException {\r
-        refresh0(session, false);\r
-    }\r
-\r
-    /**\r
-     * @return\r
-     */\r
-    public File getWorkarea() {\r
-        return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;\r
-    }\r
-\r
-    /**\r
-     * @return\r
-     */\r
-    public HistoryManager getHistoryManager() {\r
-        return itemCollector.history;\r
-    }\r
-\r
-    /**\r
-     * Queries subscriptions, sets listener, loads the subscriptions. \r
-     * \r
-     * @param listenToVariableSet \r
-     * @throws HistoryException\r
-     * @throws DatabaseException\r
-     * @throws InterruptedException \r
-     */\r
-    private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {\r
-        if (listenToVariableSet) {\r
-            if (variableListener == null) {\r
-                variableListener = new VariableSetListener(this);\r
-                initMutex = new Semaphore(0);\r
-                processor.asyncRequest( subscriptionFunction.get(), variableListener );\r
-                // Force synchronous initialization.\r
-                initMutex.acquire();\r
-            }\r
-        } else {\r
-            // Only refresh if there's no listener since the listener\r
-            // will take care of that.\r
-            if (variableListener == null) {\r
-                SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );\r
-                if (!variables.getStatus().isOK() && logger != null)\r
-                    logger.log(variables.getStatus());\r
-                itemCollector.load(variables.getSubscriptions());\r
-                if (loadCallback != null)\r
-                    loadCallback.run();\r
-            }\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Stops history collection and disposes of all related resources. Collected\r
-     * data is left on disk.\r
-     */\r
-    public void dispose() {\r
-        if (variableListener != null) {\r
-            variableListener.dispose();\r
-            variableListener = null;\r
-        }\r
-        itemCollector.dispose();\r
-    }\r
-\r
-    /**\r
-     * Primes the initial subscription by attempting to read everything that is\r
-     * subscribed once. Any caching and fetching performed by the data session\r
-     * implementation should at least be initiated by this invocation.\r
-     * \r
-     * @param graph\r
-     * @throws DatabaseException\r
-     */\r
-    public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {\r
-        SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );\r
-        Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();\r
-        for (Bean hi : variables.getSubscriptions()) {\r
-               String variableId = (String) hi.getFieldUnchecked("variableId");\r
-               variableIds.add( Pair.make(hi, variableId) );\r
-        }\r
-        \r
-        Datasource source = dynamicExperiment.getDatasource();\r
-        for (Pair<Bean, String> ids : variableIds) {\r
-            Datatype type = source.getType( ids.second );\r
-            Binding valueBinding = Bindings.getBinding(type);\r
-            VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);\r
-            try {\r
-                handle.getValue();\r
-            } catch (AccessorException e) {\r
-               logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));\r
-            } finally {\r
-                handle.dispose();\r
-            }\r
-        }\r
-    }\r
-\r
-    private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {\r
-        if ( type instanceof BooleanType ) {\r
-            Double cacheKey = item.interval;\r
-            Collection<SamplingFormat> formats = cache.get(cacheKey);\r
-            if (formats == null) {\r
-                formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);\r
-                cache.put(cacheKey, formats);\r
-            }\r
-            return formats;\r
-        } else if ( type instanceof NumberType ) {\r
-            Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);\r
-            Collection<SamplingFormat> formats = cache.get(cacheKey);\r
-            if (formats == null) {\r
-                formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);\r
-                cache.put(cacheKey, formats);\r
-            }\r
-            return formats;\r
-        }\r
-\r
-        SamplingFormat format = new SamplingFormat();\r
-        format.formatId = "custom";\r
-        RecordType f = new RecordType();\r
-        format.format = f;\r
-        f.addComponent("time", Datatypes.DOUBLE);\r
-        f.addComponent("endTime", Datatypes.DOUBLE);\r
-        f.addComponent("quality", Datatypes.BYTE);\r
-        f.addComponent("value", type);\r
-\r
-        format.interval = item.interval;\r
-        format.deadband = item.deadband;\r
-        return Collections.singleton(format);\r
-    }\r
-    \r
-    private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {\r
-        @Override\r
-        protected Map<Object, Collection<SamplingFormat>> initialValue() {\r
-            return new WeakHashMap<Object, Collection<SamplingFormat>>();\r
-        }\r
-    };\r
-\r
-    public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {\r
-        Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();\r
-\r
-        List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);\r
-        for (SubscriptionItem item : rawItems) {\r
-               \r
-            // Apros-specific assumption: the unsampled item's id field contains the subscription item's name\r
-            String groupItemId = item.id;\r
-            // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)\r
-            Datatype type = item.format;\r
-            // Apros-specific assumption: the unsampled item's formatId subscription item's unit\r
-            String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;\r
-\r
-            Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);\r
-            SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);\r
-            for (SubscriptionItem hi : his) {\r
-                hi.groupItemId = groupItemId;\r
-                result.add(hi);\r
-            }\r
-        }\r
-        return result;\r
-    }\r
-    \r
-    public Collector getCollector() {\r
-               return itemCollector.collector;\r
-       }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.modeling.subscription;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
+import org.eclipse.core.runtime.ILog;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.Datatypes;
+import org.simantics.databoard.accessor.error.AccessorException;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.type.BooleanType;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.databoard.type.NumberType;
+import org.simantics.databoard.type.RecordType;
+import org.simantics.databoard.util.Bean;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.RequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.request.Read;
+import org.simantics.history.Collector;
+import org.simantics.history.History;
+import org.simantics.history.HistoryException;
+import org.simantics.history.HistoryManager;
+import org.simantics.history.ItemManager;
+import org.simantics.history.impl.FileHistory;
+import org.simantics.history.impl.FlushPolicy;
+import org.simantics.history.util.subscription.SamplingFormat;
+import org.simantics.history.util.subscription.SubscriptionItem;
+import org.simantics.scl.runtime.tuple.Tuple3;
+import org.simantics.simulation.Activator;
+import org.simantics.simulation.data.Datasource;
+import org.simantics.simulation.data.DatasourceAdapter;
+import org.simantics.simulation.data.VariableHandle;
+import org.simantics.simulation.experiment.ExperimentState;
+import org.simantics.simulation.experiment.IDynamicExperiment;
+import org.simantics.trend.configuration.TrendSamplingFormats;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.IThreadWorkQueue;
+
+/**
+ * @author Tuukka Lehtonen
+ */
+public class ModelHistoryCollector {
+
+    Logger log = Logger.getLogger( this.getClass().getName() );
+
+    //private static final boolean DEBUG_LOAD = false;
+
+    /**
+     * For IStatus logging.
+     */
+    static final String BUNDLE_ID = "org.simantics.modeling";
+
+    /**
+     * @param g
+     * @param dynamicExperiment
+     * @param logger
+     * @param subscriptionRequestFunction
+     * @param loadCallback
+     *            a callback runnable that is invoked every time the
+     *            subscription changes
+     * @return
+     * @throws DatabaseException
+     * @throws IOException
+     */
+    public static ModelHistoryCollector createCollector(
+            IDynamicExperiment dynamicExperiment,
+            HistoryManager history,
+            ILog log,
+            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
+            Runnable loadCallback)
+            throws DatabaseException, IOException
+    {
+        return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null);
+    }
+
+    /**
+     * @param g
+     * @param dynamicExperiment
+     * @param logger
+     * @param subscriptionRequestFunction
+     * @param loadCallback
+     *            a callback runnable that is invoked every time the
+     *            subscription changes
+     * @param loadThread
+     *            a possible thread to perform the subscription loads in or
+     *            <code>null</code> to use any thread
+     * @return
+     * @throws DatabaseException
+     * @throws IOException
+     */
+    public static ModelHistoryCollector createCollector(
+            IDynamicExperiment dynamicExperiment,
+            HistoryManager history,
+            ILog log,
+            Supplier<Read<SubscriptionCollectionResult>> subscriptionRequestFunction,
+            Runnable loadCallback,
+            IThreadWorkQueue loadThread)
+            throws DatabaseException, IOException
+    {
+        ModelHistoryCollector data = new ModelHistoryCollector();
+        data.history = history;
+        data.model = dynamicExperiment.getModel();
+        data.dynamicExperiment = dynamicExperiment;
+        data.logger = log;
+        data.subscriptionFunction = subscriptionRequestFunction;
+        data.loadCallback = loadCallback;
+        data.loadThread = loadThread;
+        return data;
+    }
+
+    private Session                                      session;
+    private HistoryManager                               history;
+    private Resource                                     model;
+    private IDynamicExperiment                           dynamicExperiment;
+    ILog                                         logger;
+    private Supplier<Read<SubscriptionCollectionResult>> subscriptionFunction;
+    Runnable                                     loadCallback;
+    IThreadWorkQueue                             loadThread;
+
+    private VariableSetListener                          variableListener;
+
+    /**
+     * Released every time {@link #variableListener} either completes reloading
+     * subscriptions or fails with an exception. Used for synchronizing listener-based initialization 
+     */
+    Semaphore                                    initMutex       = new Semaphore(0);
+
+    public static class ItemCollector {
+
+        /** The experiment to collect the data from. */
+        IDynamicExperiment experiment;
+
+        /** The source of the collected data, retrieved from the experiment. */
+        Datasource         source;
+
+        /** History manager that handles stream files */
+        HistoryManager     history;
+
+        /** Collectors for each subscription */
+        Collector          collector;
+
+        DatasourceAdapter  adapter;
+
+        Resource           model;
+
+        public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) {
+            this.experiment = experiment;
+            this.source = experiment.getDatasource();
+            this.history = history; 
+            //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history));
+            this.collector = History.createCollector(this.history, FlushPolicy.NoFlush);            
+            this.model = model;
+        }
+
+        /**
+         * Load collector with new settings
+         * 
+         * @param workarea 
+         * @param dynamicExperiment
+         * @param variables
+         * @throws HistoryException 
+         */
+        public synchronized void load(List<SubscriptionItem> items)
+                throws HistoryException, DatabaseException {
+
+            if (isDisposed())
+                return;
+
+            ExperimentState oldState = experiment.getState();
+            if (oldState == ExperimentState.RUNNING)
+                experiment.changeState(ExperimentState.STOPPED);
+
+            try {
+               ItemManager im = new ItemManager(items);
+               
+               // Create and/or Update collector
+                if (collector==null) collector = History.createCollector(history);
+                
+                // Update old items
+               for (SubscriptionItem oldItem : collector.getItems() )
+               {
+                       String id = (String) oldItem.getFieldUnchecked("id");
+                       Bean newItem = im.get(id);
+                       if ( newItem == null ) {
+                               oldItem.enabled = false;
+                               collector.setItem(oldItem);
+                               //collector.removeItem(id);
+                       }
+               }    
+               // Add new items
+               for (Bean newItem : items) {
+                       collector.setItem( newItem );
+               }
+               
+               // Create files to history
+               history.modify(items.toArray( new Bean[items.size()] ));
+               
+                if (adapter==null) {
+                       adapter = new DatasourceAdapter( collector );
+                    source.addListener( adapter );
+                } else {
+                       adapter.reset();
+                }
+            } finally {
+                experiment.changeState(oldState);
+            }
+        }
+
+        public synchronized void dispose() {
+               if (source!=null && adapter!=null) {
+                       source.removeListener(adapter);
+               }
+               if (adapter!=null) adapter.reset();
+               if (collector!=null) collector.close();
+               if (history!=null) history.close();
+            // Mark disposed
+            experiment = null;
+        }
+
+        public HistoryManager getCollector() {
+            return history;
+        }
+
+        public boolean isDisposed() {
+            return experiment == null;
+        }
+
+    }
+
+    final ItemCollector itemCollector = new ItemCollector();
+
+    private ModelHistoryCollector() {
+    }
+
+    /**
+     * @param graph
+     * @param startTime
+     * @throws DatabaseException
+     * @throws HistoryException
+     * @throws InterruptedException
+     */
+    public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException {
+        initialize(processor, startTime, false);
+    }
+
+    /**
+     * Initialize is invoked during starting of an experiment (
+     * {@link ExperimentState#INITIALIZING}).
+     * 
+     * @param graph
+     * @param startTime
+     * @param listenToVariableSet true to actively listen to changes in the set
+     *        of history collected variables.
+     * @throws DatabaseException
+     * @throws HistoryException
+     * @throws InterruptedException
+     */
+    public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException {
+        this.session = processor.getSession();
+        itemCollector.init(history, dynamicExperiment, startTime, model);
+        refresh0(processor, listenToVariableSet);
+    }
+
+    /**
+     * @throws HistoryException
+     * @throws DatabaseException
+     * @throws InterruptedException 
+     */
+    public void refresh() throws HistoryException, DatabaseException, InterruptedException {
+        refresh0(session, false);
+    }
+
+    /**
+     * @return
+     */
+    public File getWorkarea() {
+        return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null;
+    }
+
+    /**
+     * @return
+     */
+    public HistoryManager getHistoryManager() {
+        return itemCollector.history;
+    }
+
+    /**
+     * Queries subscriptions, sets listener, loads the subscriptions. 
+     * 
+     * @param listenToVariableSet 
+     * @throws HistoryException
+     * @throws DatabaseException
+     * @throws InterruptedException 
+     */
+    private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException {
+        if (listenToVariableSet) {
+            if (variableListener == null) {
+                variableListener = new VariableSetListener(this);
+                initMutex = new Semaphore(0);
+                processor.asyncRequest( subscriptionFunction.get(), variableListener );
+                // Force synchronous initialization.
+                initMutex.acquire();
+            }
+        } else {
+            // Only refresh if there's no listener since the listener
+            // will take care of that.
+            if (variableListener == null) {
+                SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() );
+                if (!variables.getStatus().isOK() && logger != null)
+                    logger.log(variables.getStatus());
+                itemCollector.load(variables.getSubscriptions());
+                if (loadCallback != null)
+                    loadCallback.run();
+            }
+        }
+    }
+
+    /**
+     * Stops history collection and disposes of all related resources. Collected
+     * data is left on disk.
+     */
+    public void dispose() {
+        if (variableListener != null) {
+            variableListener.dispose();
+            variableListener = null;
+        }
+        itemCollector.dispose();
+    }
+
+    /**
+     * Primes the initial subscription by attempting to read everything that is
+     * subscribed once. Any caching and fetching performed by the data session
+     * implementation should at least be initiated by this invocation.
+     * 
+     * @param graph
+     * @throws DatabaseException
+     */
+    public void primeInitialSubscription(ReadGraph graph) throws DatabaseException {
+        SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() );
+        Set<Pair<Bean, String>> variableIds = new HashSet<Pair<Bean, String>>();
+        for (Bean hi : variables.getSubscriptions()) {
+               String variableId = (String) hi.getFieldUnchecked("variableId");
+               variableIds.add( Pair.make(hi, variableId) );
+        }
+        
+        Datasource source = dynamicExperiment.getDatasource();
+        for (Pair<Bean, String> ids : variableIds) {
+            Datatype type = source.getType( ids.second );
+            Binding valueBinding = Bindings.getBinding(type);
+            VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding);
+            try {
+                handle.getValue();
+            } catch (AccessorException e) {
+               logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e));
+            } finally {
+                handle.dispose();
+            }
+        }
+    }
+
+    private static Collection<SamplingFormat> resolveSamplingFormats(Map<Object, Collection<SamplingFormat>> cache, SubscriptionItem item, Datatype type, String unit) {
+        if ( type instanceof BooleanType ) {
+            Double cacheKey = item.interval;
+            Collection<SamplingFormat> formats = cache.get(cacheKey);
+            if (formats == null) {
+                formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval);
+                cache.put(cacheKey, formats);
+            }
+            return formats;
+        } else if ( type instanceof NumberType ) {
+            Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit);
+            Collection<SamplingFormat> formats = cache.get(cacheKey);
+            if (formats == null) {
+                formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit);
+                cache.put(cacheKey, formats);
+            }
+            return formats;
+        }
+
+        SamplingFormat format = new SamplingFormat();
+        format.formatId = "custom";
+        RecordType f = new RecordType();
+        format.format = f;
+        f.addComponent("time", Datatypes.DOUBLE);
+        f.addComponent("endTime", Datatypes.DOUBLE);
+        f.addComponent("quality", Datatypes.BYTE);
+        f.addComponent("value", type);
+
+        format.interval = item.interval;
+        format.deadband = item.deadband;
+        return Collections.singleton(format);
+    }
+    
+    private static ThreadLocal<Map<Object, Collection<SamplingFormat>>> samplingFormatCaches = new ThreadLocal<Map<Object, Collection<SamplingFormat>>>() {
+        @Override
+        protected Map<Object, Collection<SamplingFormat>> initialValue() {
+            return new WeakHashMap<Object, Collection<SamplingFormat>>();
+        }
+    };
+
+    public static List<SubscriptionItem> sampledSubscriptionItems(List<SubscriptionItem> rawItems) {
+        Map<Object, Collection<SamplingFormat>> cache = samplingFormatCaches.get();
+
+        List<SubscriptionItem> result = new ArrayList<SubscriptionItem>(rawItems.size() * 6);
+        for (SubscriptionItem item : rawItems) {
+               
+            // Apros-specific assumption: the unsampled item's id field contains the subscription item's name
+            String groupItemId = item.id;
+            // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.)
+            Datatype type = item.format;
+            // Apros-specific assumption: the unsampled item's formatId subscription item's unit
+            String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId;
+
+            Collection<SamplingFormat> formats = resolveSamplingFormats(cache, item, type, unit);
+            SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats);
+            for (SubscriptionItem hi : his) {
+                hi.groupItemId = groupItemId;
+                result.add(hi);
+            }
+        }
+        return result;
+    }
+    
+    public Collector getCollector() {
+               return itemCollector.collector;
+       }
+
+}