X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.modeling%2Fsrc%2Forg%2Fsimantics%2Fmodeling%2Fsubscription%2FModelHistoryCollector.java;fp=bundles%2Forg.simantics.modeling%2Fsrc%2Forg%2Fsimantics%2Fmodeling%2Fsubscription%2FModelHistoryCollector.java;h=27c840467a96a9d56bbf17968b2d136cc848de7c;hb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;hp=952a1e747b921b6ef384a7a380c03213730aacc3;hpb=24e2b34260f219f0d1644ca7a138894980e25b14;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java b/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java index 952a1e747..27c840467 100644 --- a/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java +++ b/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java @@ -1,455 +1,455 @@ -/******************************************************************************* - * Copyright (c) 2007, 2011 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.modeling.subscription; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Semaphore; -import java.util.function.Supplier; -import java.util.logging.Logger; - -import org.eclipse.core.runtime.ILog; -import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.Status; -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Datatypes; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.type.BooleanType; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.type.NumberType; -import org.simantics.databoard.type.RecordType; -import org.simantics.databoard.util.Bean; -import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.request.Read; -import org.simantics.history.Collector; -import org.simantics.history.History; -import org.simantics.history.HistoryException; -import org.simantics.history.HistoryManager; -import org.simantics.history.ItemManager; -import org.simantics.history.impl.FileHistory; -import org.simantics.history.impl.FlushPolicy; -import org.simantics.history.util.subscription.SamplingFormat; -import org.simantics.history.util.subscription.SubscriptionItem; -import org.simantics.scl.runtime.tuple.Tuple3; -import org.simantics.simulation.Activator; -import org.simantics.simulation.data.Datasource; -import org.simantics.simulation.data.DatasourceAdapter; -import org.simantics.simulation.data.VariableHandle; -import org.simantics.simulation.experiment.ExperimentState; -import org.simantics.simulation.experiment.IDynamicExperiment; -import org.simantics.trend.configuration.TrendSamplingFormats; -import org.simantics.utils.datastructures.Pair; -import org.simantics.utils.threads.IThreadWorkQueue; - -/** - * @author Tuukka Lehtonen - */ -public class ModelHistoryCollector { - - Logger log = Logger.getLogger( this.getClass().getName() ); - - //private static final boolean DEBUG_LOAD = false; - - /** - * For IStatus logging. - */ - static final String BUNDLE_ID = "org.simantics.modeling"; - - /** - * @param g - * @param dynamicExperiment - * @param logger - * @param subscriptionRequestFunction - * @param loadCallback - * a callback runnable that is invoked every time the - * subscription changes - * @return - * @throws DatabaseException - * @throws IOException - */ - public static ModelHistoryCollector createCollector( - IDynamicExperiment dynamicExperiment, - HistoryManager history, - ILog log, - Supplier> subscriptionRequestFunction, - Runnable loadCallback) - throws DatabaseException, IOException - { - return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null); - } - - /** - * @param g - * @param dynamicExperiment - * @param logger - * @param subscriptionRequestFunction - * @param loadCallback - * a callback runnable that is invoked every time the - * subscription changes - * @param loadThread - * a possible thread to perform the subscription loads in or - * null to use any thread - * @return - * @throws DatabaseException - * @throws IOException - */ - public static ModelHistoryCollector createCollector( - IDynamicExperiment dynamicExperiment, - HistoryManager history, - ILog log, - Supplier> subscriptionRequestFunction, - Runnable loadCallback, - IThreadWorkQueue loadThread) - throws DatabaseException, IOException - { - ModelHistoryCollector data = new ModelHistoryCollector(); - data.history = history; - data.model = dynamicExperiment.getModel(); - data.dynamicExperiment = dynamicExperiment; - data.logger = log; - data.subscriptionFunction = subscriptionRequestFunction; - data.loadCallback = loadCallback; - data.loadThread = loadThread; - return data; - } - - private Session session; - private HistoryManager history; - private Resource model; - private IDynamicExperiment dynamicExperiment; - ILog logger; - private Supplier> subscriptionFunction; - Runnable loadCallback; - IThreadWorkQueue loadThread; - - private VariableSetListener variableListener; - - /** - * Released every time {@link #variableListener} either completes reloading - * subscriptions or fails with an exception. Used for synchronizing listener-based initialization - */ - Semaphore initMutex = new Semaphore(0); - - public static class ItemCollector { - - /** The experiment to collect the data from. */ - IDynamicExperiment experiment; - - /** The source of the collected data, retrieved from the experiment. */ - Datasource source; - - /** History manager that handles stream files */ - HistoryManager history; - - /** Collectors for each subscription */ - Collector collector; - - DatasourceAdapter adapter; - - Resource model; - - public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) { - this.experiment = experiment; - this.source = experiment.getDatasource(); - this.history = history; - //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history)); - this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); - this.model = model; - } - - /** - * Load collector with new settings - * - * @param workarea - * @param dynamicExperiment - * @param variables - * @throws HistoryException - */ - public synchronized void load(List items) - throws HistoryException, DatabaseException { - - if (isDisposed()) - return; - - ExperimentState oldState = experiment.getState(); - if (oldState == ExperimentState.RUNNING) - experiment.changeState(ExperimentState.STOPPED); - - try { - ItemManager im = new ItemManager(items); - - // Create and/or Update collector - if (collector==null) collector = History.createCollector(history); - - // Update old items - for (SubscriptionItem oldItem : collector.getItems() ) - { - String id = (String) oldItem.getFieldUnchecked("id"); - Bean newItem = im.get(id); - if ( newItem == null ) { - oldItem.enabled = false; - collector.setItem(oldItem); - //collector.removeItem(id); - } - } - // Add new items - for (Bean newItem : items) { - collector.setItem( newItem ); - } - - // Create files to history - history.modify(items.toArray( new Bean[items.size()] )); - - if (adapter==null) { - adapter = new DatasourceAdapter( collector ); - source.addListener( adapter ); - } else { - adapter.reset(); - } - } finally { - experiment.changeState(oldState); - } - } - - public synchronized void dispose() { - if (source!=null && adapter!=null) { - source.removeListener(adapter); - } - if (adapter!=null) adapter.reset(); - if (collector!=null) collector.close(); - if (history!=null) history.close(); - // Mark disposed - experiment = null; - } - - public HistoryManager getCollector() { - return history; - } - - public boolean isDisposed() { - return experiment == null; - } - - } - - final ItemCollector itemCollector = new ItemCollector(); - - private ModelHistoryCollector() { - } - - /** - * @param graph - * @param startTime - * @throws DatabaseException - * @throws HistoryException - * @throws InterruptedException - */ - public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException { - initialize(processor, startTime, false); - } - - /** - * Initialize is invoked during starting of an experiment ( - * {@link ExperimentState#INITIALIZING}). - * - * @param graph - * @param startTime - * @param listenToVariableSet true to actively listen to changes in the set - * of history collected variables. - * @throws DatabaseException - * @throws HistoryException - * @throws InterruptedException - */ - public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException { - this.session = processor.getSession(); - itemCollector.init(history, dynamicExperiment, startTime, model); - refresh0(processor, listenToVariableSet); - } - - /** - * @throws HistoryException - * @throws DatabaseException - * @throws InterruptedException - */ - public void refresh() throws HistoryException, DatabaseException, InterruptedException { - refresh0(session, false); - } - - /** - * @return - */ - public File getWorkarea() { - return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null; - } - - /** - * @return - */ - public HistoryManager getHistoryManager() { - return itemCollector.history; - } - - /** - * Queries subscriptions, sets listener, loads the subscriptions. - * - * @param listenToVariableSet - * @throws HistoryException - * @throws DatabaseException - * @throws InterruptedException - */ - private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException { - if (listenToVariableSet) { - if (variableListener == null) { - variableListener = new VariableSetListener(this); - initMutex = new Semaphore(0); - processor.asyncRequest( subscriptionFunction.get(), variableListener ); - // Force synchronous initialization. - initMutex.acquire(); - } - } else { - // Only refresh if there's no listener since the listener - // will take care of that. - if (variableListener == null) { - SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() ); - if (!variables.getStatus().isOK() && logger != null) - logger.log(variables.getStatus()); - itemCollector.load(variables.getSubscriptions()); - if (loadCallback != null) - loadCallback.run(); - } - } - } - - /** - * Stops history collection and disposes of all related resources. Collected - * data is left on disk. - */ - public void dispose() { - if (variableListener != null) { - variableListener.dispose(); - variableListener = null; - } - itemCollector.dispose(); - } - - /** - * Primes the initial subscription by attempting to read everything that is - * subscribed once. Any caching and fetching performed by the data session - * implementation should at least be initiated by this invocation. - * - * @param graph - * @throws DatabaseException - */ - public void primeInitialSubscription(ReadGraph graph) throws DatabaseException { - SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() ); - Set> variableIds = new HashSet>(); - for (Bean hi : variables.getSubscriptions()) { - String variableId = (String) hi.getFieldUnchecked("variableId"); - variableIds.add( Pair.make(hi, variableId) ); - } - - Datasource source = dynamicExperiment.getDatasource(); - for (Pair ids : variableIds) { - Datatype type = source.getType( ids.second ); - Binding valueBinding = Bindings.getBinding(type); - VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding); - try { - handle.getValue(); - } catch (AccessorException e) { - logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e)); - } finally { - handle.dispose(); - } - } - } - - private static Collection resolveSamplingFormats(Map> cache, SubscriptionItem item, Datatype type, String unit) { - if ( type instanceof BooleanType ) { - Double cacheKey = item.interval; - Collection formats = cache.get(cacheKey); - if (formats == null) { - formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval); - cache.put(cacheKey, formats); - } - return formats; - } else if ( type instanceof NumberType ) { - Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit); - Collection formats = cache.get(cacheKey); - if (formats == null) { - formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit); - cache.put(cacheKey, formats); - } - return formats; - } - - SamplingFormat format = new SamplingFormat(); - format.formatId = "custom"; - RecordType f = new RecordType(); - format.format = f; - f.addComponent("time", Datatypes.DOUBLE); - f.addComponent("endTime", Datatypes.DOUBLE); - f.addComponent("quality", Datatypes.BYTE); - f.addComponent("value", type); - - format.interval = item.interval; - format.deadband = item.deadband; - return Collections.singleton(format); - } - - private static ThreadLocal>> samplingFormatCaches = new ThreadLocal>>() { - @Override - protected Map> initialValue() { - return new WeakHashMap>(); - } - }; - - public static List sampledSubscriptionItems(List rawItems) { - Map> cache = samplingFormatCaches.get(); - - List result = new ArrayList(rawItems.size() * 6); - for (SubscriptionItem item : rawItems) { - - // Apros-specific assumption: the unsampled item's id field contains the subscription item's name - String groupItemId = item.id; - // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.) - Datatype type = item.format; - // Apros-specific assumption: the unsampled item's formatId subscription item's unit - String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId; - - Collection formats = resolveSamplingFormats(cache, item, type, unit); - SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats); - for (SubscriptionItem hi : his) { - hi.groupItemId = groupItemId; - result.add(hi); - } - } - return result; - } - - public Collector getCollector() { - return itemCollector.collector; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2011 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.modeling.subscription; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import org.eclipse.core.runtime.ILog; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Datatypes; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.type.BooleanType; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.type.NumberType; +import org.simantics.databoard.type.RecordType; +import org.simantics.databoard.util.Bean; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.request.Read; +import org.simantics.history.Collector; +import org.simantics.history.History; +import org.simantics.history.HistoryException; +import org.simantics.history.HistoryManager; +import org.simantics.history.ItemManager; +import org.simantics.history.impl.FileHistory; +import org.simantics.history.impl.FlushPolicy; +import org.simantics.history.util.subscription.SamplingFormat; +import org.simantics.history.util.subscription.SubscriptionItem; +import org.simantics.scl.runtime.tuple.Tuple3; +import org.simantics.simulation.Activator; +import org.simantics.simulation.data.Datasource; +import org.simantics.simulation.data.DatasourceAdapter; +import org.simantics.simulation.data.VariableHandle; +import org.simantics.simulation.experiment.ExperimentState; +import org.simantics.simulation.experiment.IDynamicExperiment; +import org.simantics.trend.configuration.TrendSamplingFormats; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.IThreadWorkQueue; + +/** + * @author Tuukka Lehtonen + */ +public class ModelHistoryCollector { + + Logger log = Logger.getLogger( this.getClass().getName() ); + + //private static final boolean DEBUG_LOAD = false; + + /** + * For IStatus logging. + */ + static final String BUNDLE_ID = "org.simantics.modeling"; + + /** + * @param g + * @param dynamicExperiment + * @param logger + * @param subscriptionRequestFunction + * @param loadCallback + * a callback runnable that is invoked every time the + * subscription changes + * @return + * @throws DatabaseException + * @throws IOException + */ + public static ModelHistoryCollector createCollector( + IDynamicExperiment dynamicExperiment, + HistoryManager history, + ILog log, + Supplier> subscriptionRequestFunction, + Runnable loadCallback) + throws DatabaseException, IOException + { + return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null); + } + + /** + * @param g + * @param dynamicExperiment + * @param logger + * @param subscriptionRequestFunction + * @param loadCallback + * a callback runnable that is invoked every time the + * subscription changes + * @param loadThread + * a possible thread to perform the subscription loads in or + * null to use any thread + * @return + * @throws DatabaseException + * @throws IOException + */ + public static ModelHistoryCollector createCollector( + IDynamicExperiment dynamicExperiment, + HistoryManager history, + ILog log, + Supplier> subscriptionRequestFunction, + Runnable loadCallback, + IThreadWorkQueue loadThread) + throws DatabaseException, IOException + { + ModelHistoryCollector data = new ModelHistoryCollector(); + data.history = history; + data.model = dynamicExperiment.getModel(); + data.dynamicExperiment = dynamicExperiment; + data.logger = log; + data.subscriptionFunction = subscriptionRequestFunction; + data.loadCallback = loadCallback; + data.loadThread = loadThread; + return data; + } + + private Session session; + private HistoryManager history; + private Resource model; + private IDynamicExperiment dynamicExperiment; + ILog logger; + private Supplier> subscriptionFunction; + Runnable loadCallback; + IThreadWorkQueue loadThread; + + private VariableSetListener variableListener; + + /** + * Released every time {@link #variableListener} either completes reloading + * subscriptions or fails with an exception. Used for synchronizing listener-based initialization + */ + Semaphore initMutex = new Semaphore(0); + + public static class ItemCollector { + + /** The experiment to collect the data from. */ + IDynamicExperiment experiment; + + /** The source of the collected data, retrieved from the experiment. */ + Datasource source; + + /** History manager that handles stream files */ + HistoryManager history; + + /** Collectors for each subscription */ + Collector collector; + + DatasourceAdapter adapter; + + Resource model; + + public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) { + this.experiment = experiment; + this.source = experiment.getDatasource(); + this.history = history; + //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history)); + this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); + this.model = model; + } + + /** + * Load collector with new settings + * + * @param workarea + * @param dynamicExperiment + * @param variables + * @throws HistoryException + */ + public synchronized void load(List items) + throws HistoryException, DatabaseException { + + if (isDisposed()) + return; + + ExperimentState oldState = experiment.getState(); + if (oldState == ExperimentState.RUNNING) + experiment.changeState(ExperimentState.STOPPED); + + try { + ItemManager im = new ItemManager(items); + + // Create and/or Update collector + if (collector==null) collector = History.createCollector(history); + + // Update old items + for (SubscriptionItem oldItem : collector.getItems() ) + { + String id = (String) oldItem.getFieldUnchecked("id"); + Bean newItem = im.get(id); + if ( newItem == null ) { + oldItem.enabled = false; + collector.setItem(oldItem); + //collector.removeItem(id); + } + } + // Add new items + for (Bean newItem : items) { + collector.setItem( newItem ); + } + + // Create files to history + history.modify(items.toArray( new Bean[items.size()] )); + + if (adapter==null) { + adapter = new DatasourceAdapter( collector ); + source.addListener( adapter ); + } else { + adapter.reset(); + } + } finally { + experiment.changeState(oldState); + } + } + + public synchronized void dispose() { + if (source!=null && adapter!=null) { + source.removeListener(adapter); + } + if (adapter!=null) adapter.reset(); + if (collector!=null) collector.close(); + if (history!=null) history.close(); + // Mark disposed + experiment = null; + } + + public HistoryManager getCollector() { + return history; + } + + public boolean isDisposed() { + return experiment == null; + } + + } + + final ItemCollector itemCollector = new ItemCollector(); + + private ModelHistoryCollector() { + } + + /** + * @param graph + * @param startTime + * @throws DatabaseException + * @throws HistoryException + * @throws InterruptedException + */ + public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException { + initialize(processor, startTime, false); + } + + /** + * Initialize is invoked during starting of an experiment ( + * {@link ExperimentState#INITIALIZING}). + * + * @param graph + * @param startTime + * @param listenToVariableSet true to actively listen to changes in the set + * of history collected variables. + * @throws DatabaseException + * @throws HistoryException + * @throws InterruptedException + */ + public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException { + this.session = processor.getSession(); + itemCollector.init(history, dynamicExperiment, startTime, model); + refresh0(processor, listenToVariableSet); + } + + /** + * @throws HistoryException + * @throws DatabaseException + * @throws InterruptedException + */ + public void refresh() throws HistoryException, DatabaseException, InterruptedException { + refresh0(session, false); + } + + /** + * @return + */ + public File getWorkarea() { + return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null; + } + + /** + * @return + */ + public HistoryManager getHistoryManager() { + return itemCollector.history; + } + + /** + * Queries subscriptions, sets listener, loads the subscriptions. + * + * @param listenToVariableSet + * @throws HistoryException + * @throws DatabaseException + * @throws InterruptedException + */ + private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException { + if (listenToVariableSet) { + if (variableListener == null) { + variableListener = new VariableSetListener(this); + initMutex = new Semaphore(0); + processor.asyncRequest( subscriptionFunction.get(), variableListener ); + // Force synchronous initialization. + initMutex.acquire(); + } + } else { + // Only refresh if there's no listener since the listener + // will take care of that. + if (variableListener == null) { + SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() ); + if (!variables.getStatus().isOK() && logger != null) + logger.log(variables.getStatus()); + itemCollector.load(variables.getSubscriptions()); + if (loadCallback != null) + loadCallback.run(); + } + } + } + + /** + * Stops history collection and disposes of all related resources. Collected + * data is left on disk. + */ + public void dispose() { + if (variableListener != null) { + variableListener.dispose(); + variableListener = null; + } + itemCollector.dispose(); + } + + /** + * Primes the initial subscription by attempting to read everything that is + * subscribed once. Any caching and fetching performed by the data session + * implementation should at least be initiated by this invocation. + * + * @param graph + * @throws DatabaseException + */ + public void primeInitialSubscription(ReadGraph graph) throws DatabaseException { + SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() ); + Set> variableIds = new HashSet>(); + for (Bean hi : variables.getSubscriptions()) { + String variableId = (String) hi.getFieldUnchecked("variableId"); + variableIds.add( Pair.make(hi, variableId) ); + } + + Datasource source = dynamicExperiment.getDatasource(); + for (Pair ids : variableIds) { + Datatype type = source.getType( ids.second ); + Binding valueBinding = Bindings.getBinding(type); + VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding); + try { + handle.getValue(); + } catch (AccessorException e) { + logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e)); + } finally { + handle.dispose(); + } + } + } + + private static Collection resolveSamplingFormats(Map> cache, SubscriptionItem item, Datatype type, String unit) { + if ( type instanceof BooleanType ) { + Double cacheKey = item.interval; + Collection formats = cache.get(cacheKey); + if (formats == null) { + formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval); + cache.put(cacheKey, formats); + } + return formats; + } else if ( type instanceof NumberType ) { + Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit); + Collection formats = cache.get(cacheKey); + if (formats == null) { + formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit); + cache.put(cacheKey, formats); + } + return formats; + } + + SamplingFormat format = new SamplingFormat(); + format.formatId = "custom"; + RecordType f = new RecordType(); + format.format = f; + f.addComponent("time", Datatypes.DOUBLE); + f.addComponent("endTime", Datatypes.DOUBLE); + f.addComponent("quality", Datatypes.BYTE); + f.addComponent("value", type); + + format.interval = item.interval; + format.deadband = item.deadband; + return Collections.singleton(format); + } + + private static ThreadLocal>> samplingFormatCaches = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new WeakHashMap>(); + } + }; + + public static List sampledSubscriptionItems(List rawItems) { + Map> cache = samplingFormatCaches.get(); + + List result = new ArrayList(rawItems.size() * 6); + for (SubscriptionItem item : rawItems) { + + // Apros-specific assumption: the unsampled item's id field contains the subscription item's name + String groupItemId = item.id; + // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.) + Datatype type = item.format; + // Apros-specific assumption: the unsampled item's formatId subscription item's unit + String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId; + + Collection formats = resolveSamplingFormats(cache, item, type, unit); + SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats); + for (SubscriptionItem hi : his) { + hi.groupItemId = groupItemId; + result.add(hi); + } + } + return result; + } + + public Collector getCollector() { + return itemCollector.collector; + } + +}