/******************************************************************************* * Copyright (c) 2007, 2011 Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.simulation.data; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.NumberBinding; import org.simantics.databoard.type.Datatype; import org.simantics.history.Collector; import org.simantics.history.HistoryException; import org.simantics.history.util.subscription.SubscriptionItem; import org.simantics.simulation.data.Datasource.DatasourceListener; import org.simantics.utils.datastructures.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This adapter reads data from Datasource and writes to an open Subscription. * This class is used as a listener. * * @author Toni Kalajainen */ public class DatasourceAdapter implements DatasourceListener { protected Logger logger = LoggerFactory.getLogger( DatasourceAdapter.class ); protected Collector session; protected boolean loaded = false; protected List handles = new ArrayList(); protected List ids = new ArrayList(); protected List bindings = new ArrayList(); /** * Variables whose {@link VariableHandle#getValue()} has previously failed * and has been reported to have failed through {@link #logger}. Resetting * the adapter will also reset this set. */ protected Set failedIds = new HashSet(); protected Lock stepLock = new ReentrantLock(); /** * Create new adapter. Subscribed items are read from collector. * * @param subscription */ public DatasourceAdapter(Collector subscription) { this.session = subscription; } public void setSubscriptionSession(Collector session) { this.session = session; } public Collector getSubscriptionSession() { return session; } public void flush() throws HistoryException { session.flush(); } /** * @return the lock that is used for synchronizing each * {@link #onStep(Datasource)} invocation. The lock can be used * elsewhere to guarantee that history collection steps are not * taken meanwhile. For example, while setting up history * collection. */ public Lock stepLock() { return stepLock; } /** * Reset internal caches. Call this when subscribed items in collector * have changed. */ public void reset() { for (VariableHandle h : handles) if (h!=null) h.dispose(); bindings.clear(); handles.clear(); ids.clear(); failedIds.clear(); loaded = false; } protected void load(Datasource source) { reset(); // Read ids SubscriptionItem[] items = session.getItems(); Set idSet = new HashSet(items.length); for (SubscriptionItem bean : items) { String variableId = (String) bean.getFieldUnchecked("variableId"); if (!idSet.add( variableId )) continue; Datatype variableType = source.getType( variableId ); if (variableType == null) continue; Binding valueBinding = Bindings.getBinding( variableType ); VariableHandle handle = source.openHandle( bean, variableId, valueBinding ); handles.add( handle ); ids.add( variableId ); bindings.add( valueBinding ); } loaded = true; } protected void list(Collection> result, Collection graphHandles) { int c = ids.size(); for (int i=0; i