X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.modeling%2Fsrc%2Forg%2Fsimantics%2Fmodeling%2Fsubscription%2FModelHistoryCollector.java;fp=bundles%2Forg.simantics.modeling%2Fsrc%2Forg%2Fsimantics%2Fmodeling%2Fsubscription%2FModelHistoryCollector.java;h=952a1e747b921b6ef384a7a380c03213730aacc3;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java b/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java new file mode 100644 index 000000000..952a1e747 --- /dev/null +++ b/bundles/org.simantics.modeling/src/org/simantics/modeling/subscription/ModelHistoryCollector.java @@ -0,0 +1,455 @@ +/******************************************************************************* + * Copyright (c) 2007, 2011 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.modeling.subscription; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import org.eclipse.core.runtime.ILog; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Datatypes; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.type.BooleanType; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.type.NumberType; +import org.simantics.databoard.type.RecordType; +import org.simantics.databoard.util.Bean; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.request.Read; +import org.simantics.history.Collector; +import org.simantics.history.History; +import org.simantics.history.HistoryException; +import org.simantics.history.HistoryManager; +import org.simantics.history.ItemManager; +import org.simantics.history.impl.FileHistory; +import org.simantics.history.impl.FlushPolicy; +import org.simantics.history.util.subscription.SamplingFormat; +import org.simantics.history.util.subscription.SubscriptionItem; +import org.simantics.scl.runtime.tuple.Tuple3; +import org.simantics.simulation.Activator; +import org.simantics.simulation.data.Datasource; +import org.simantics.simulation.data.DatasourceAdapter; +import org.simantics.simulation.data.VariableHandle; +import org.simantics.simulation.experiment.ExperimentState; +import org.simantics.simulation.experiment.IDynamicExperiment; +import org.simantics.trend.configuration.TrendSamplingFormats; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.IThreadWorkQueue; + +/** + * @author Tuukka Lehtonen + */ +public class ModelHistoryCollector { + + Logger log = Logger.getLogger( this.getClass().getName() ); + + //private static final boolean DEBUG_LOAD = false; + + /** + * For IStatus logging. + */ + static final String BUNDLE_ID = "org.simantics.modeling"; + + /** + * @param g + * @param dynamicExperiment + * @param logger + * @param subscriptionRequestFunction + * @param loadCallback + * a callback runnable that is invoked every time the + * subscription changes + * @return + * @throws DatabaseException + * @throws IOException + */ + public static ModelHistoryCollector createCollector( + IDynamicExperiment dynamicExperiment, + HistoryManager history, + ILog log, + Supplier> subscriptionRequestFunction, + Runnable loadCallback) + throws DatabaseException, IOException + { + return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null); + } + + /** + * @param g + * @param dynamicExperiment + * @param logger + * @param subscriptionRequestFunction + * @param loadCallback + * a callback runnable that is invoked every time the + * subscription changes + * @param loadThread + * a possible thread to perform the subscription loads in or + * null to use any thread + * @return + * @throws DatabaseException + * @throws IOException + */ + public static ModelHistoryCollector createCollector( + IDynamicExperiment dynamicExperiment, + HistoryManager history, + ILog log, + Supplier> subscriptionRequestFunction, + Runnable loadCallback, + IThreadWorkQueue loadThread) + throws DatabaseException, IOException + { + ModelHistoryCollector data = new ModelHistoryCollector(); + data.history = history; + data.model = dynamicExperiment.getModel(); + data.dynamicExperiment = dynamicExperiment; + data.logger = log; + data.subscriptionFunction = subscriptionRequestFunction; + data.loadCallback = loadCallback; + data.loadThread = loadThread; + return data; + } + + private Session session; + private HistoryManager history; + private Resource model; + private IDynamicExperiment dynamicExperiment; + ILog logger; + private Supplier> subscriptionFunction; + Runnable loadCallback; + IThreadWorkQueue loadThread; + + private VariableSetListener variableListener; + + /** + * Released every time {@link #variableListener} either completes reloading + * subscriptions or fails with an exception. Used for synchronizing listener-based initialization + */ + Semaphore initMutex = new Semaphore(0); + + public static class ItemCollector { + + /** The experiment to collect the data from. */ + IDynamicExperiment experiment; + + /** The source of the collected data, retrieved from the experiment. */ + Datasource source; + + /** History manager that handles stream files */ + HistoryManager history; + + /** Collectors for each subscription */ + Collector collector; + + DatasourceAdapter adapter; + + Resource model; + + public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) { + this.experiment = experiment; + this.source = experiment.getDatasource(); + this.history = history; + //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history)); + this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); + this.model = model; + } + + /** + * Load collector with new settings + * + * @param workarea + * @param dynamicExperiment + * @param variables + * @throws HistoryException + */ + public synchronized void load(List items) + throws HistoryException, DatabaseException { + + if (isDisposed()) + return; + + ExperimentState oldState = experiment.getState(); + if (oldState == ExperimentState.RUNNING) + experiment.changeState(ExperimentState.STOPPED); + + try { + ItemManager im = new ItemManager(items); + + // Create and/or Update collector + if (collector==null) collector = History.createCollector(history); + + // Update old items + for (SubscriptionItem oldItem : collector.getItems() ) + { + String id = (String) oldItem.getFieldUnchecked("id"); + Bean newItem = im.get(id); + if ( newItem == null ) { + oldItem.enabled = false; + collector.setItem(oldItem); + //collector.removeItem(id); + } + } + // Add new items + for (Bean newItem : items) { + collector.setItem( newItem ); + } + + // Create files to history + history.modify(items.toArray( new Bean[items.size()] )); + + if (adapter==null) { + adapter = new DatasourceAdapter( collector ); + source.addListener( adapter ); + } else { + adapter.reset(); + } + } finally { + experiment.changeState(oldState); + } + } + + public synchronized void dispose() { + if (source!=null && adapter!=null) { + source.removeListener(adapter); + } + if (adapter!=null) adapter.reset(); + if (collector!=null) collector.close(); + if (history!=null) history.close(); + // Mark disposed + experiment = null; + } + + public HistoryManager getCollector() { + return history; + } + + public boolean isDisposed() { + return experiment == null; + } + + } + + final ItemCollector itemCollector = new ItemCollector(); + + private ModelHistoryCollector() { + } + + /** + * @param graph + * @param startTime + * @throws DatabaseException + * @throws HistoryException + * @throws InterruptedException + */ + public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException { + initialize(processor, startTime, false); + } + + /** + * Initialize is invoked during starting of an experiment ( + * {@link ExperimentState#INITIALIZING}). + * + * @param graph + * @param startTime + * @param listenToVariableSet true to actively listen to changes in the set + * of history collected variables. + * @throws DatabaseException + * @throws HistoryException + * @throws InterruptedException + */ + public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException { + this.session = processor.getSession(); + itemCollector.init(history, dynamicExperiment, startTime, model); + refresh0(processor, listenToVariableSet); + } + + /** + * @throws HistoryException + * @throws DatabaseException + * @throws InterruptedException + */ + public void refresh() throws HistoryException, DatabaseException, InterruptedException { + refresh0(session, false); + } + + /** + * @return + */ + public File getWorkarea() { + return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null; + } + + /** + * @return + */ + public HistoryManager getHistoryManager() { + return itemCollector.history; + } + + /** + * Queries subscriptions, sets listener, loads the subscriptions. + * + * @param listenToVariableSet + * @throws HistoryException + * @throws DatabaseException + * @throws InterruptedException + */ + private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException { + if (listenToVariableSet) { + if (variableListener == null) { + variableListener = new VariableSetListener(this); + initMutex = new Semaphore(0); + processor.asyncRequest( subscriptionFunction.get(), variableListener ); + // Force synchronous initialization. + initMutex.acquire(); + } + } else { + // Only refresh if there's no listener since the listener + // will take care of that. + if (variableListener == null) { + SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() ); + if (!variables.getStatus().isOK() && logger != null) + logger.log(variables.getStatus()); + itemCollector.load(variables.getSubscriptions()); + if (loadCallback != null) + loadCallback.run(); + } + } + } + + /** + * Stops history collection and disposes of all related resources. Collected + * data is left on disk. + */ + public void dispose() { + if (variableListener != null) { + variableListener.dispose(); + variableListener = null; + } + itemCollector.dispose(); + } + + /** + * Primes the initial subscription by attempting to read everything that is + * subscribed once. Any caching and fetching performed by the data session + * implementation should at least be initiated by this invocation. + * + * @param graph + * @throws DatabaseException + */ + public void primeInitialSubscription(ReadGraph graph) throws DatabaseException { + SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() ); + Set> variableIds = new HashSet>(); + for (Bean hi : variables.getSubscriptions()) { + String variableId = (String) hi.getFieldUnchecked("variableId"); + variableIds.add( Pair.make(hi, variableId) ); + } + + Datasource source = dynamicExperiment.getDatasource(); + for (Pair ids : variableIds) { + Datatype type = source.getType( ids.second ); + Binding valueBinding = Bindings.getBinding(type); + VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding); + try { + handle.getValue(); + } catch (AccessorException e) { + logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e)); + } finally { + handle.dispose(); + } + } + } + + private static Collection resolveSamplingFormats(Map> cache, SubscriptionItem item, Datatype type, String unit) { + if ( type instanceof BooleanType ) { + Double cacheKey = item.interval; + Collection formats = cache.get(cacheKey); + if (formats == null) { + formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval); + cache.put(cacheKey, formats); + } + return formats; + } else if ( type instanceof NumberType ) { + Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit); + Collection formats = cache.get(cacheKey); + if (formats == null) { + formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit); + cache.put(cacheKey, formats); + } + return formats; + } + + SamplingFormat format = new SamplingFormat(); + format.formatId = "custom"; + RecordType f = new RecordType(); + format.format = f; + f.addComponent("time", Datatypes.DOUBLE); + f.addComponent("endTime", Datatypes.DOUBLE); + f.addComponent("quality", Datatypes.BYTE); + f.addComponent("value", type); + + format.interval = item.interval; + format.deadband = item.deadband; + return Collections.singleton(format); + } + + private static ThreadLocal>> samplingFormatCaches = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new WeakHashMap>(); + } + }; + + public static List sampledSubscriptionItems(List rawItems) { + Map> cache = samplingFormatCaches.get(); + + List result = new ArrayList(rawItems.size() * 6); + for (SubscriptionItem item : rawItems) { + + // Apros-specific assumption: the unsampled item's id field contains the subscription item's name + String groupItemId = item.id; + // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.) + Datatype type = item.format; + // Apros-specific assumption: the unsampled item's formatId subscription item's unit + String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId; + + Collection formats = resolveSamplingFormats(cache, item, type, unit); + SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats); + for (SubscriptionItem hi : his) { + hi.groupItemId = groupItemId; + result.add(hi); + } + } + return result; + } + + public Collector getCollector() { + return itemCollector.collector; + } + +}