/******************************************************************************* * Copyright (c) 2007, 2011 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.modeling.subscription; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.Semaphore; import java.util.function.Supplier; import java.util.logging.Logger; import org.eclipse.core.runtime.ILog; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; import org.simantics.databoard.Bindings; import org.simantics.databoard.Datatypes; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.type.BooleanType; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.type.NumberType; import org.simantics.databoard.type.RecordType; import org.simantics.databoard.util.Bean; import org.simantics.db.ReadGraph; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.exception.DatabaseException; import org.simantics.db.request.Read; import org.simantics.history.Collector; import org.simantics.history.History; import org.simantics.history.HistoryException; import org.simantics.history.HistoryManager; import org.simantics.history.ItemManager; import org.simantics.history.impl.FileHistory; import org.simantics.history.impl.FlushPolicy; import org.simantics.history.util.subscription.SamplingFormat; import org.simantics.history.util.subscription.SubscriptionItem; import org.simantics.scl.runtime.tuple.Tuple3; import org.simantics.simulation.Activator; import org.simantics.simulation.data.Datasource; import org.simantics.simulation.data.DatasourceAdapter; import org.simantics.simulation.data.VariableHandle; import org.simantics.simulation.experiment.ExperimentState; import org.simantics.simulation.experiment.IDynamicExperiment; import org.simantics.trend.configuration.TrendSamplingFormats; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.threads.IThreadWorkQueue; /** * @author Tuukka Lehtonen */ public class ModelHistoryCollector { Logger log = Logger.getLogger( this.getClass().getName() ); //private static final boolean DEBUG_LOAD = false; /** * For IStatus logging. */ static final String BUNDLE_ID = "org.simantics.modeling"; /** * @param g * @param dynamicExperiment * @param logger * @param subscriptionRequestFunction * @param loadCallback * a callback runnable that is invoked every time the * subscription changes * @return * @throws DatabaseException * @throws IOException */ public static ModelHistoryCollector createCollector( IDynamicExperiment dynamicExperiment, HistoryManager history, ILog log, Supplier> subscriptionRequestFunction, Runnable loadCallback) throws DatabaseException, IOException { return createCollector(dynamicExperiment, history, log, subscriptionRequestFunction, loadCallback, null); } /** * @param g * @param dynamicExperiment * @param logger * @param subscriptionRequestFunction * @param loadCallback * a callback runnable that is invoked every time the * subscription changes * @param loadThread * a possible thread to perform the subscription loads in or * null to use any thread * @return * @throws DatabaseException * @throws IOException */ public static ModelHistoryCollector createCollector( IDynamicExperiment dynamicExperiment, HistoryManager history, ILog log, Supplier> subscriptionRequestFunction, Runnable loadCallback, IThreadWorkQueue loadThread) throws DatabaseException, IOException { ModelHistoryCollector data = new ModelHistoryCollector(); data.history = history; data.model = dynamicExperiment.getModel(); data.dynamicExperiment = dynamicExperiment; data.logger = log; data.subscriptionFunction = subscriptionRequestFunction; data.loadCallback = loadCallback; data.loadThread = loadThread; return data; } private Session session; private HistoryManager history; private Resource model; private IDynamicExperiment dynamicExperiment; ILog logger; private Supplier> subscriptionFunction; Runnable loadCallback; IThreadWorkQueue loadThread; private VariableSetListener variableListener; /** * Released every time {@link #variableListener} either completes reloading * subscriptions or fails with an exception. Used for synchronizing listener-based initialization */ Semaphore initMutex = new Semaphore(0); public static class ItemCollector { /** The experiment to collect the data from. */ IDynamicExperiment experiment; /** The source of the collected data, retrieved from the experiment. */ Datasource source; /** History manager that handles stream files */ HistoryManager history; /** Collectors for each subscription */ Collector collector; DatasourceAdapter adapter; Resource model; public synchronized void init(HistoryManager history, IDynamicExperiment experiment, double startTime, Resource model) { this.experiment = experiment; this.source = experiment.getDatasource(); this.history = history; //System.out.println("Debug: ModelHistoryCollector history="+System.identityHashCode(this.history)); this.collector = History.createCollector(this.history, FlushPolicy.NoFlush); this.model = model; } /** * Load collector with new settings * * @param workarea * @param dynamicExperiment * @param variables * @throws HistoryException */ public synchronized void load(List items) throws HistoryException, DatabaseException { if (isDisposed()) return; ExperimentState oldState = experiment.getState(); if (oldState == ExperimentState.RUNNING) experiment.changeState(ExperimentState.STOPPED); try { ItemManager im = new ItemManager(items); // Create and/or Update collector if (collector==null) collector = History.createCollector(history); // Update old items for (SubscriptionItem oldItem : collector.getItems() ) { String id = (String) oldItem.getFieldUnchecked("id"); Bean newItem = im.get(id); if ( newItem == null ) { oldItem.enabled = false; collector.setItem(oldItem); //collector.removeItem(id); } } // Add new items for (Bean newItem : items) { collector.setItem( newItem ); } // Create files to history history.modify(items.toArray( new Bean[items.size()] )); if (adapter==null) { adapter = new DatasourceAdapter( collector ); source.addListener( adapter ); } else { adapter.reset(); } } finally { experiment.changeState(oldState); } } public synchronized void dispose() { if (source!=null && adapter!=null) { source.removeListener(adapter); } if (adapter!=null) adapter.reset(); if (collector!=null) collector.close(); if (history!=null) history.close(); // Mark disposed experiment = null; } public HistoryManager getCollector() { return history; } public boolean isDisposed() { return experiment == null; } } final ItemCollector itemCollector = new ItemCollector(); private ModelHistoryCollector() { } /** * @param graph * @param startTime * @throws DatabaseException * @throws HistoryException * @throws InterruptedException */ public void initialize(RequestProcessor processor, double startTime) throws DatabaseException, HistoryException, InterruptedException { initialize(processor, startTime, false); } /** * Initialize is invoked during starting of an experiment ( * {@link ExperimentState#INITIALIZING}). * * @param graph * @param startTime * @param listenToVariableSet true to actively listen to changes in the set * of history collected variables. * @throws DatabaseException * @throws HistoryException * @throws InterruptedException */ public void initialize(RequestProcessor processor, double startTime, boolean listenToVariableSet) throws DatabaseException, HistoryException, InterruptedException { this.session = processor.getSession(); itemCollector.init(history, dynamicExperiment, startTime, model); refresh0(processor, listenToVariableSet); } /** * @throws HistoryException * @throws DatabaseException * @throws InterruptedException */ public void refresh() throws HistoryException, DatabaseException, InterruptedException { refresh0(session, false); } /** * @return */ public File getWorkarea() { return history instanceof FileHistory ? ((FileHistory) history).getWorkarea() : null; } /** * @return */ public HistoryManager getHistoryManager() { return itemCollector.history; } /** * Queries subscriptions, sets listener, loads the subscriptions. * * @param listenToVariableSet * @throws HistoryException * @throws DatabaseException * @throws InterruptedException */ private void refresh0(RequestProcessor processor, boolean listenToVariableSet) throws HistoryException, DatabaseException, InterruptedException { if (listenToVariableSet) { if (variableListener == null) { variableListener = new VariableSetListener(this); initMutex = new Semaphore(0); processor.syncRequest( subscriptionFunction.get(), variableListener ); // Force synchronous initialization. initMutex.acquire(); } } else { // Only refresh if there's no listener since the listener // will take care of that. if (variableListener == null) { SubscriptionCollectionResult variables = processor.syncRequest( subscriptionFunction.get() ); if (!variables.getStatus().isOK() && logger != null) logger.log(variables.getStatus()); itemCollector.load(variables.getSubscriptions()); if (loadCallback != null) loadCallback.run(); } } } /** * Stops history collection and disposes of all related resources. Collected * data is left on disk. */ public void dispose() { if (variableListener != null) { variableListener.dispose(); variableListener = null; } itemCollector.dispose(); } /** * Primes the initial subscription by attempting to read everything that is * subscribed once. Any caching and fetching performed by the data session * implementation should at least be initiated by this invocation. * * @param graph * @throws DatabaseException */ public void primeInitialSubscription(ReadGraph graph) throws DatabaseException { SubscriptionCollectionResult variables = graph.syncRequest( subscriptionFunction.get() ); Set> variableIds = new HashSet>(); for (Bean hi : variables.getSubscriptions()) { String variableId = (String) hi.getFieldUnchecked("variableId"); variableIds.add( Pair.make(hi, variableId) ); } Datasource source = dynamicExperiment.getDatasource(); for (Pair ids : variableIds) { Datatype type = source.getType( ids.second ); Binding valueBinding = Bindings.getBinding(type); VariableHandle handle = source.openHandle(ids.first, ids.second, valueBinding); try { handle.getValue(); } catch (AccessorException e) { logger.log(new Status(IStatus.ERROR, Activator.PLUGIN_ID, e.getLocalizedMessage(), e)); } finally { handle.dispose(); } } } private static Collection resolveSamplingFormats(Map> cache, SubscriptionItem item, Datatype type, String unit) { if ( type instanceof BooleanType ) { Double cacheKey = item.interval; Collection formats = cache.get(cacheKey); if (formats == null) { formats = TrendSamplingFormats.createBinarySamplingFormats(item.interval); cache.put(cacheKey, formats); } return formats; } else if ( type instanceof NumberType ) { Tuple3 cacheKey = new Tuple3(item.interval, item.deadband, unit); Collection formats = cache.get(cacheKey); if (formats == null) { formats = TrendSamplingFormats.createAnalogSamplingFormats(item.interval, item.deadband, unit); cache.put(cacheKey, formats); } return formats; } SamplingFormat format = new SamplingFormat(); format.formatId = "custom"; RecordType f = new RecordType(); format.format = f; f.addComponent("time", Datatypes.DOUBLE); f.addComponent("endTime", Datatypes.DOUBLE); f.addComponent("quality", Datatypes.BYTE); f.addComponent("value", type); format.interval = item.interval; format.deadband = item.deadband; return Collections.singleton(format); } private static ThreadLocal>> samplingFormatCaches = new ThreadLocal>>() { @Override protected Map> initialValue() { return new WeakHashMap>(); } }; public static List sampledSubscriptionItems(List rawItems) { Map> cache = samplingFormatCaches.get(); List result = new ArrayList(rawItems.size() * 6); for (SubscriptionItem item : rawItems) { // Apros-specific assumption: the unsampled item's id field contains the subscription item's name String groupItemId = item.id; // Apros-specific assumption: the unsampled item's format field contains the subscription item's basic datatype (boolean, number, etc.) Datatype type = item.format; // Apros-specific assumption: the unsampled item's formatId subscription item's unit String unit = /*item.formatId.isEmpty() ? null :*/ item.formatId; Collection formats = resolveSamplingFormats(cache, item, type, unit); SubscriptionItem[] his = SubscriptionItem.createItems(item, groupItemId, item.groupId, formats); for (SubscriptionItem hi : his) { hi.groupItemId = groupItemId; result.add(hi); } } return result; } public Collector getCollector() { return itemCollector.collector; } }