--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.simulation.data;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.Set;\r
+import java.util.concurrent.Executor;\r
+import java.util.concurrent.locks.Lock;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.NumberBinding;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.history.Collector;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.util.subscription.SubscriptionItem;\r
+import org.simantics.simulation.data.Datasource.DatasourceListener;\r
+import org.simantics.utils.datastructures.Triple;\r
+\r
+/**\r
+ * This adapter reads data from Datasource and writes to an open Subscription.\r
+ * This class is used as a listener. \r
+ * \r
+ * @author Toni Kalajainen <toni.kalajainen@semantum.fi>\r
+ */\r
+public class DatasourceAdapter implements DatasourceListener {\r
+\r
+ protected Logger logger = Logger.getLogger( DatasourceAdapter.class.toString() );\r
+ protected Collector session;\r
+ protected boolean loaded = false;\r
+ protected List<VariableHandle> handles = new ArrayList<VariableHandle>();\r
+ protected List<String> ids = new ArrayList<String>();\r
+ protected List<Binding> bindings = new ArrayList<Binding>();\r
+\r
+ /**\r
+ * Variables whose {@link VariableHandle#getValue()} has previously failed\r
+ * and has been reported to have failed through {@link #logger}. Resetting\r
+ * the adapter will also reset this set.\r
+ */\r
+ protected Set<String> failedIds = new HashSet<String>();\r
+\r
+ protected Lock stepLock = new ReentrantLock();\r
+ \r
+ /**\r
+ * Create new adapter. Subscribed items are read from collector. \r
+ * \r
+ * @param subscription\r
+ */\r
+ public DatasourceAdapter(Collector subscription) {\r
+ this.session = subscription;\r
+ }\r
+ \r
+ public void setSubscriptionSession(Collector session) {\r
+ this.session = session;\r
+ }\r
+ \r
+ public Collector getSubscriptionSession() {\r
+ return session;\r
+ }\r
+ \r
+ public void flush() throws HistoryException {\r
+ session.flush();\r
+ }\r
+\r
+ /**\r
+ * @return the lock that is used for synchronizing each\r
+ * {@link #onStep(Datasource)} invocation. The lock can be used\r
+ * elsewhere to guarantee that history collection steps are not\r
+ * taken meanwhile. For example, while setting up history\r
+ * collection.\r
+ */\r
+ public Lock stepLock() {\r
+ return stepLock;\r
+ }\r
+\r
+ /**\r
+ * Reset internal caches. Call this when subscribed items in collector\r
+ * have changed.\r
+ */\r
+ public void reset() {\r
+ for (VariableHandle h : handles) if (h!=null) h.dispose();\r
+ bindings.clear();\r
+ handles.clear();\r
+ ids.clear();\r
+ failedIds.clear();\r
+ loaded = false;\r
+ }\r
+\r
+ protected void load(Datasource source) {\r
+ reset();\r
+\r
+ // Read ids \r
+ SubscriptionItem[] items = session.getItems();\r
+ Set<String> idSet = new HashSet<String>(items.length);\r
+ for (SubscriptionItem bean : items) {\r
+ String variableId = (String) bean.getFieldUnchecked("variableId");\r
+ if (!idSet.add( variableId )) continue;\r
+ Datatype variableType = source.getType( variableId );\r
+ if (variableType == null) continue;\r
+ Binding valueBinding = Bindings.getBinding( variableType );\r
+ VariableHandle handle = source.openHandle( bean, variableId, valueBinding );\r
+ handles.add( handle );\r
+ ids.add( variableId );\r
+ bindings.add( valueBinding );\r
+ }\r
+ loaded = true;\r
+ }\r
+\r
+ protected void list(Collection<Triple<String,Binding,Object>> result, Collection<GraphHandle> graphHandles) {\r
+ \r
+ int c = ids.size();\r
+ for (int i=0; i<c; i++) {\r
+ String key = ids.get(i);\r
+ VariableHandle handle = handles.get(i);\r
+ Object value = null;\r
+ if (handle != null) {\r
+ if (handle instanceof GraphHandle) {\r
+ graphHandles.add((GraphHandle)handle);\r
+ } else {\r
+ try {\r
+ value = handle.getValue();\r
+ Binding binding = bindings.get(i);\r
+ result.add(Triple.make(key, binding, value));\r
+ } catch (AccessorException e) {\r
+ if (failedIds.add(key))\r
+ logger.log(Level.SEVERE, e.toString(), e);\r
+ continue;\r
+ }\r
+ }\r
+ } else {\r
+ Binding binding = bindings.get(i);\r
+ result.add(Triple.make(key, binding, value));\r
+ }\r
+ }\r
+ }\r
+ \r
+ @Override\r
+ public void onStep(Datasource source) {\r
+ stepLock.lock();\r
+ try {\r
+ NumberBinding timeBinding = Bindings.DOUBLE;\r
+ Object time = source.getTime( timeBinding );\r
+ session.beginStep( timeBinding, time );\r
+\r
+ if (!loaded) load( source );\r
+\r
+ try {\r
+\r
+ int c = ids.size();\r
+ for (int i=0; i<c; i++) {\r
+ String key = ids.get(i);\r
+ VariableHandle handle = handles.get(i);\r
+ Object value = null;\r
+ if (handle != null) {\r
+ try {\r
+ value = handle.getValue();\r
+ } catch (AccessorException e) {\r
+ if (failedIds.add(key))\r
+ logger.log(Level.SEVERE, e.toString(), e);\r
+ continue;\r
+ }\r
+ Binding binding = handle.binding();\r
+ try {\r
+ session.setValue( key, binding, value );\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.SEVERE, e.toString(), e);\r
+ }\r
+ } else {\r
+ Binding binding = bindings.get(i);\r
+ if (binding != null) { \r
+ session.setValue( key, binding, value );\r
+ } \r
+ }\r
+ }\r
+\r
+ } finally {\r
+ try {\r
+ session.endStep();\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.SEVERE, e.toString(), e);\r
+ }\r
+ }\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.SEVERE, e.toString(), e);\r
+ } finally {\r
+ stepLock.unlock();\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public Executor getExecutor() {\r
+ return null;\r
+ }\r
+ \r
+\r
+}\r