--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.history.impl;\r
+\r
+import gnu.trove.set.hash.THashSet;\r
+\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.Set;\r
+import java.util.UUID;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.accessor.StreamAccessor;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.adapter.AdaptException;\r
+import org.simantics.databoard.adapter.Adapter;\r
+import org.simantics.databoard.adapter.AdapterConstructionException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.DoubleBinding;\r
+import org.simantics.databoard.binding.FloatBinding;\r
+import org.simantics.databoard.binding.NumberBinding;\r
+import org.simantics.databoard.binding.RecordBinding;\r
+import org.simantics.databoard.binding.error.BindingConstructionException;\r
+import org.simantics.databoard.binding.error.BindingException;\r
+import org.simantics.databoard.binding.error.RuntimeBindingException;\r
+import org.simantics.databoard.binding.impl.DoubleBindingDefault;\r
+import org.simantics.databoard.binding.mutable.MutableVariant;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.binding.reflection.VoidBinding;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.type.Component;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.databoard.type.NumberType;\r
+import org.simantics.databoard.type.RecordType;\r
+import org.simantics.databoard.util.Bean;\r
+import org.simantics.history.Collector;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.HistoryManager;\r
+import org.simantics.history.impl.CollectorState.Item;\r
+import org.simantics.history.impl.CollectorState.VariableState;\r
+import org.simantics.history.util.ValueBand;\r
+import org.simantics.history.util.WeightedMedian;\r
+import org.simantics.history.util.subscription.SubscriptionItem;\r
+\r
+/**\r
+ * Collector is used to record data to streams in history manager.\r
+ * \r
+ * For every item there is collector item state {@link CollectorState.Item}, \r
+ * that contains data required in writing process. This state is read and \r
+ * written to history manager item (bean) as meta-data. \r
+ * \r
+ * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. \r
+ * \r
+ * @author toni.kalajainen\r
+ *\r
+ */\r
+public class CollectorImpl implements Collector {\r
+\r
+ /** Logger */\r
+ static Logger logger = Logger.getLogger(CollectorImpl.class.getName());\r
+ \r
+ public static final int MEDIAN_LIMIT = 500;\r
+ \r
+ public static final double EPSILON = 1e-2;\r
+ \r
+ /** Collector state */\r
+ CollectorState state;\r
+\r
+ /** Open streams, corresponds to subscription items */\r
+ CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();\r
+ \r
+ /** Associated history state */ \r
+ HistoryManager history;\r
+ \r
+ /** True when all items are open */\r
+ boolean itemsOpen = true;\r
+ \r
+ public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;\r
+ \r
+ public FlushPolicy getFlushPolicy() {\r
+ return flushPolicy;\r
+ }\r
+\r
+ public void setFlushPolicy(FlushPolicy flushPolicy) {\r
+ this.flushPolicy = flushPolicy;\r
+ }\r
+\r
+ /**\r
+ * Create a new collector that is associated with a history manager.\r
+ * \r
+ * @param history\r
+ * @throws HistoryException\r
+ */\r
+ public CollectorImpl(HistoryManager history) {\r
+ this.history = history;\r
+ this.state = new CollectorState();\r
+ this.state.init();\r
+ this.state.id = UUID.randomUUID().toString();\r
+ }\r
+\r
+ /**\r
+ * Create a new collector that is associated with a history manager.\r
+ * \r
+ * @param history\r
+ * @param previousState\r
+ * @throws HistoryException\r
+ */\r
+ public CollectorImpl(HistoryManager history, CollectorState previousState) {\r
+ this.history = history;\r
+ this.state = (CollectorState) previousState.clone();\r
+ }\r
+ \r
+ @Override\r
+ public synchronized void addItem(Bean item) throws HistoryException {\r
+ try {\r
+ String id = (String) item.getField("id");\r
+ \r
+ // Item State\r
+ CollectorState.Item itemState = null;\r
+ if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM); \r
+ if ( itemState == null ) itemState = state.itemStates.get( id );\r
+ if ( itemState == null ) {\r
+ // Create new item\r
+ itemState = new CollectorState.Item();\r
+ itemState.init();\r
+ itemState.readAvailableFields( item );\r
+ // Guess values from the existing data \r
+ if (history.exists(id)) {\r
+ readItemState(history, item, itemState);\r
+ }\r
+ }\r
+ \r
+ addItem(id, itemState); \r
+ } catch (BindingException e) {\r
+ throw new HistoryException( e ); \r
+ }\r
+ }\r
+ \r
+ public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {\r
+ try { \r
+ // Item State\r
+ state.itemStates.put(id, itemState);\r
+ itemsOpen = false;\r
+ \r
+ // Time \r
+ if (state.time.type() instanceof NumberType) { \r
+ double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);\r
+ if ( !Double.isNaN(itemState.currentTime) ) {\r
+ if ( itemState.currentTime > time ) {\r
+ state.time.setValue(Bindings.DOUBLE, itemState.currentTime);\r
+ }\r
+ } \r
+ }\r
+ } catch (AdaptException e) {\r
+ throw new HistoryException( e ); \r
+ }\r
+ }\r
+ \r
+ public synchronized void addItems(Bean...items) throws HistoryException {\r
+ for (Bean item : items) addItem(item);\r
+ }\r
+ \r
+ @Override\r
+ public synchronized void removeItem(String id) throws HistoryException {\r
+ CollectorState.Item itemState = state.itemStates.remove( id );\r
+ if (itemState!=null && history.exists(id)) {\r
+ Bean bean = history.getItem(id);\r
+ bean.readAvailableFields(itemState);\r
+ try {\r
+ bean = setItemState(bean, itemState);\r
+ history.modify(bean);\r
+ } catch (BindingException e) {\r
+ // Unexpected\r
+ logger.log(Level.FINE, "Failed to update bean", e); \r
+ } catch (BindingConstructionException e) {\r
+ // Unexpected\r
+ logger.log(Level.FINE, "Failed to update bean", e);\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.FINE, "Failed to update history item meta-data", e);\r
+ }\r
+ }\r
+ \r
+ ActiveStream as = getStream(id);\r
+ if ( as != null ) {\r
+ items.remove( as );\r
+ as.close();\r
+ }\r
+ }\r
+ \r
+ ActiveStream getStream(String id) {\r
+ for (ActiveStream as : items) {\r
+ if (as.itemState.id.equals(id)) return as;\r
+ }\r
+ return null;\r
+ }\r
+ \r
+ /**\r
+ * Set (or Add) item's configuration. \r
+ * \r
+ * @param item item\r
+ * @throws HistoryException\r
+ */\r
+ public synchronized void setItem(Bean item) throws HistoryException {\r
+ try {\r
+ String id = (String) item.getField("id");\r
+ \r
+ CollectorState.Item itemState = state.itemStates.get( id );\r
+ if ( itemState==null ) {\r
+ addItem(item);\r
+ return;\r
+ }\r
+ itemState.readAvailableFields(item);\r
+ } catch (BindingException e) {\r
+ throw new HistoryException( e ); \r
+ }\r
+ }\r
+ \r
+ \r
+ /**\r
+ * Set item state to bean. If the bean does not have field collectorState,\r
+ * a new bean class is created, and field added.\r
+ * \r
+ * @param config\r
+ * @param state\r
+ * @return same or new bean\r
+ * @throws BindingException \r
+ * @throws BindingConstructionException \r
+ */\r
+ public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException\r
+ {\r
+ Binding binding = config.getFieldBinding("collectorState");\r
+ if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {\r
+ RecordType rt = config.getBinding().type();\r
+ RecordType newType = new RecordType();\r
+ for (int i=0; i<rt.getComponentCount(); i++) {\r
+ Component c = rt.getComponent(i);\r
+ if ( c.name.equals("collectorState") ) continue;\r
+ newType.addComponent(c.name, c.type);\r
+ }\r
+ newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());\r
+ Binding newBinding = Bindings.getBeanBinding(newType);\r
+ Bean newConfig = (Bean) newBinding.createDefault();\r
+ newConfig.readAvailableFields( config );\r
+ config = newConfig;\r
+ }\r
+ config.setField("collectorState", CollectorState.BINDING_ITEM, state);\r
+ return config;\r
+ }\r
+ \r
+ public SubscriptionItem[] getItems() {\r
+ int c = state.itemStates.size();\r
+ SubscriptionItem[] items = new SubscriptionItem[ c ];\r
+ int ix = 0;\r
+ Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );\r
+ for (CollectorState.Item item : state.itemStates.values()) {\r
+ _HistoryItem hi = new _HistoryItem();\r
+ hi.init();\r
+ hi.readAvailableFields(item);\r
+ //hi.collectorState = (Item) item.clone();\r
+ \r
+ // Workaround (clone doesn't work for WeightedMedian)\r
+ try {\r
+ byte[] data = s.serialize( item ); \r
+ hi.collectorState = (Item) s.deserialize(data);\r
+ } catch (IOException e) {\r
+ }\r
+ \r
+ items[ix++] = hi;\r
+ }\r
+ return items;\r
+ }\r
+ \r
+ public static class _HistoryItem extends SubscriptionItem {\r
+ public CollectorState.Item collectorState;\r
+ }\r
+ \r
+ /**\r
+ * Creates and loads active item. TimeSeries stream file is opened.\r
+ * \r
+ * @param itemState\r
+ * @return new active item\r
+ * @throws HistoryException\r
+ */\r
+ ActiveStream openStream(CollectorState.Item itemState) throws HistoryException \r
+ {\r
+ // New Active Item\r
+ ActiveStream x = new ActiveStream();\r
+ x.itemState = itemState;\r
+\r
+ // Stream\r
+ {\r
+ if ( !history.exists(x.itemState.id) ) {\r
+ throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");\r
+ }\r
+\r
+ x.stream = history.openStream(x.itemState.id, "rw");\r
+ }\r
+\r
+ // Sample Binding\r
+ Datatype sampleType = x.stream.type().componentType;\r
+ itemState.format = sampleType;\r
+ try { \r
+ x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );\r
+ } catch ( RuntimeException e ) {\r
+ System.err.println("Could not create bean binding for type "+sampleType);\r
+ x.binding = (RecordBinding) Bindings.getBinding( sampleType ); \r
+ }\r
+\r
+ // Value binding\r
+ x.valueBinding = x.binding.getComponentBinding("value");\r
+ x.isNumeric = x.valueBinding instanceof NumberBinding;\r
+ x.hasEndTime = x.binding.getComponentIndex("endTime")>0;\r
+\r
+ // Value adapter\r
+ if ( x.isNumeric ) \r
+ try {\r
+ x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);\r
+ x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);\r
+ } catch (AdapterConstructionException e1) {\r
+ x.valueToDouble = null;\r
+ }\r
+\r
+ // Create valueband, and read last entry into memory\r
+ try {\r
+ Object currentEntry = x.binding.createDefault();\r
+ if (x.stream.size() >= 1) {\r
+ x.stream.get(x.stream.size() - 1, x.binding, currentEntry);\r
+ }\r
+ x.current = new ValueBand(x.binding);\r
+ x.current.setSample(currentEntry);\r
+ } catch (AccessorException e) {\r
+ throw new HistoryException(\r
+ "Could not read sample from stream file, " + e, e);\r
+ } catch (BindingException e) {\r
+ throw new HistoryException(\r
+ "Could not read sample from stream file, " + e, e);\r
+ }\r
+\r
+ // Item State\r
+ x.itemState = itemState;\r
+\r
+ // Median\r
+ if (x.isNumeric && x.current.hasMedian()\r
+ && x.itemState.median == null) {\r
+ x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );\r
+ }\r
+\r
+ return x;\r
+ }\r
+ \r
+ /**\r
+ * Read itemState from history. Item must exist. \r
+ * \r
+ * @param si subscription item\r
+ * @return state item\r
+ * @throws RuntimeBindingException\r
+ * @throws HistoryException\r
+ */\r
+ static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {\r
+ // Create item state\r
+ item.readAvailableFields(si);\r
+ // Read state from items\r
+ StreamAccessor sa = history.openStream(item.id, "r");\r
+ try {\r
+ Datatype sampleType = sa.type().componentType;\r
+ \r
+ Binding sampleBinding = Bindings.getBinding(sampleType);\r
+ if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");\r
+ Object sample = sampleBinding.createDefaultUnchecked();\r
+ ValueBand v = new ValueBand(sampleBinding, sample);\r
+ item.count = sa.size();\r
+ if ( item.count>0 ) {\r
+ sa.get(0, sampleBinding, sample);\r
+ item.firstTime = v.getTimeDouble();\r
+ Binding b = v.getValueBinding();\r
+ Object value = v.getValue(b);\r
+ item.firstValue.setValue(b, value);\r
+ \r
+ sa.get(item.count-1, sampleBinding, sample);\r
+ item.currentTime = v.getTimeDouble();\r
+ b = v.getValueBinding();\r
+ value = v.getValue(b);\r
+ item.currentValue.setValue(b, value);\r
+ \r
+ if ( item.currentValue.getBinding() instanceof DoubleBinding ) {\r
+ DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();\r
+ value = item.currentValue.getValue(db);\r
+ item.isNaN = Double.isNaN( (Double) value );\r
+ }\r
+ }\r
+ \r
+ } catch (AccessorException e) {\r
+ throw new HistoryException( e );\r
+ } catch (AdaptException e) {\r
+ throw new HistoryException( e );\r
+ } finally {\r
+ try { sa.close(); } catch (AccessorException e) {}\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public CollectorState getState() {\r
+ return (CollectorState) state.clone();\r
+ }\r
+\r
+ @Override\r
+ public void setState(Bean newState) {\r
+ if (newState == null)\r
+ return;\r
+ if (state == null) {\r
+ state = new CollectorState();\r
+ state.init();\r
+ }\r
+ state.readAvailableFields(newState);\r
+\r
+ // Make sure that openAllItems ensures that all streams are opened properly.\r
+ // This is because setItem will not invoke addItem if collectorState exists\r
+ // for an item and therefore itemsOpen might not otherwise be set to false.\r
+ itemsOpen = false;\r
+ }\r
+\r
+ @Override\r
+ public Object getTime(Binding binding) throws HistoryException {\r
+ MutableVariant v = state.time;\r
+ if (v==null) return null;\r
+ try {\r
+ return v.getValue(binding);\r
+ } catch (AdaptException e) {\r
+ throw new HistoryException(e);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public Object getValue(String id, Binding binding) throws HistoryException {\r
+ VariableState vs = state.values.get(id);\r
+ if (vs==null) return null;\r
+ \r
+ if (!vs.isValid) return null;\r
+ \r
+ try {\r
+ return vs.value.getValue(binding);\r
+ } catch (AdaptException e) {\r
+ throw new HistoryException(e);\r
+ }\r
+ }\r
+ \r
+ void openAllItems() throws HistoryException {\r
+ if (itemsOpen) return;\r
+\r
+ // Assert all items are open\r
+ // 1. Collect all items that still need to be opened.\r
+ Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );\r
+ for (ActiveStream as : items)\r
+ itemsToOpen.remove(as.itemState.id);\r
+\r
+ // 2. Open missing streams\r
+ List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());\r
+ for (String itemId : itemsToOpen) {\r
+ CollectorState.Item item = state.itemStates.get( itemId );\r
+ ActiveStream as = openStream( item );\r
+ opened.add(as);\r
+ }\r
+\r
+ // 3. Update list of open streams\r
+ items.addAll(opened);\r
+ itemsOpen = true;\r
+ }\r
+\r
+ private void updateDt(NumberBinding binding, Object time) {\r
+ try {\r
+ Double current = (Double)getTime(Bindings.DOUBLE);\r
+ if(current != null) {\r
+ double t = binding.getValue(time).doubleValue();\r
+ state.dT = t-current;\r
+ if(state.dT > 0) return;\r
+ }\r
+ } catch (Throwable t) {\r
+ }\r
+ state.dT = 1.0;\r
+ }\r
+ \r
+ @Override\r
+ public void beginStep(NumberBinding binding, Object time) throws HistoryException {\r
+ openAllItems();\r
+ updateDt(binding, time);\r
+ state.time.setValue(binding, time);\r
+ }\r
+\r
+ @Override\r
+ public void setValue(String id, Binding binding, Object value) throws HistoryException {\r
+ VariableState vs = state.values.get( id );\r
+ if (vs == null) {\r
+ vs = new VariableState();\r
+ vs.value = new MutableVariant(binding, value);\r
+ state.values.put(id, vs);\r
+ }\r
+ \r
+ if (value==null) {\r
+ vs.value.setValue(VoidBinding.VOID_BINDING, null);\r
+ vs.isNan = true;\r
+ vs.isValid = false;\r
+ return;\r
+ } \r
+ \r
+ vs.value.setValue(binding, value);\r
+ vs.isValid = true;\r
+ \r
+ if (binding instanceof DoubleBinding) {\r
+ DoubleBinding db = (DoubleBinding) binding;\r
+ try {\r
+ double _v = db.getValue_(value);\r
+ vs.isNan = Double.isNaN( _v );\r
+ } catch (BindingException e) {\r
+ vs.isNan = true;\r
+ vs.isValid = false;\r
+ }\r
+ } else if (binding instanceof FloatBinding) {\r
+ FloatBinding db = (FloatBinding) binding;\r
+ try {\r
+ float _v = db.getValue_(value);\r
+ vs.isNan = Float.isNaN( _v );\r
+ } catch (BindingException e) {\r
+ vs.isNan = true;\r
+ vs.isValid = false;\r
+ }\r
+ } else {\r
+ vs.isNan = false;\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public boolean getValue(String id, MutableVariant result) {\r
+ VariableState vs = state.values.get(id);\r
+ if (vs==null) return false;\r
+ \r
+ if (!vs.isValid) return false;\r
+ \r
+ result.setValue(vs.value.getBinding(), vs.value.getValue());\r
+ return true;\r
+ }\r
+\r
+ @Override\r
+ public void endStep() throws HistoryException {\r
+ // Write values to streams\r
+ for (ActiveStream i : items)\r
+ {\r
+ try {\r
+ putValueToStream(i); \r
+ if (flushPolicy == FlushPolicy.FlushOnEveryStep) {\r
+ i.stream.flush();\r
+ }\r
+ } catch (AdaptException e) {\r
+ throw new HistoryException( e );\r
+// FileAccessor fa = (FileAccessor) i.stream;\r
+// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+ } catch (AccessorException e) {\r
+ throw new HistoryException( e );\r
+// FileAccessor fa = (FileAccessor) i.stream;\r
+// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+ } catch (BindingException e) {\r
+ throw new HistoryException( e );\r
+// FileAccessor fa = (FileAccessor) i.stream;\r
+// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+ }\r
+ }\r
+ }\r
+ \r
+ private int sampleNum(double time, double interval) {\r
+ // 1.0 is needed to cover the case of 0.0\r
+ return (int)(1.0 + (time+EPSILON*state.dT)/interval);\r
+ }\r
+\r
+ private boolean isExactInterval(double time, double interval) {\r
+ return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);\r
+ }\r
+\r
+ /**\r
+ * This method puts a time,value to a stream.\r
+ * \r
+ * The stream is "packed" when the new value is equal to previous, or the\r
+ * value is within deadband (in comparison to the first value), or if\r
+ * the interval zone is not exceeded.\r
+ * \r
+ * The stream can be packed two different ways, depending on the sample\r
+ * format. The sample format can be (a) single value or (b) a time range of\r
+ * multiple values, Ranged Sample. \r
+ * \r
+ * Single value is packed by removing middle values, in other words, \r
+ * moving the timecode of the last sample.\r
+ * \r
+ * Ranged sample is packed by extending the endTime code of the last sample.\r
+ * Other fields, such as avg, sum, count, lastValue are also updated.\r
+ * \r
+ * @param i\r
+ * @throws AdaptException\r
+ * @throws AccessorException \r
+ * @throws BindingException \r
+ * @throws HistoryException \r
+ */\r
+ private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException\r
+ {\r
+ // Current Time\r
+ Variant time = state.time;\r
+ double currentTime = Double.NaN;\r
+ if (time != null) {\r
+ Double x = (Double) time.getValue(Bindings.DOUBLE);\r
+ currentTime = x == null ? Double.NaN : x;\r
+ }\r
+\r
+ // Is first value\r
+ boolean isFirstValue = i.stream.size() == 0;\r
+ // Is this item disabled\r
+ boolean isDisabled = !i.itemState.enabled; \r
+ // Discontinuety, first value after being disabled\r
+ boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;\r
+ \r
+ // Update currently disabled status\r
+ i.itemState.isDisabled = isDisabled;\r
+\r
+ // Update time codes\r
+ if (isDisabled) {\r
+ i.itemState.count++;\r
+ i.itemState.lastDisabledTime = currentTime;\r
+ if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;\r
+ return;\r
+ }\r
+ \r
+ // Is nan value, for doubles and floats\r
+ boolean isNanValue;\r
+ // Is valid value\r
+ boolean isValidValue;\r
+ // Value as variant\r
+ MutableVariant value;\r
+ // Get current value as double\r
+ double currentValueDouble;\r
+ \r
+ VariableState vs = state.values.get( i.itemState.variableId ); \r
+ if ( vs!=null ) {\r
+ isNanValue = vs.isNan;\r
+ isValidValue = vs.isValid;\r
+ value = vs.value;\r
+ if (i.isNumeric && isValidValue && !isNanValue) {\r
+ currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );\r
+ if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {\r
+ currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;\r
+ Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);\r
+ value = new MutableVariant( i.valueBinding, x );\r
+ }\r
+ } else {\r
+ currentValueDouble = Double.NaN;\r
+ }\r
+ \r
+ } else {\r
+ isNanValue = true;\r
+ isValidValue = false;\r
+ value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );\r
+ currentValueDouble = Double.NaN;\r
+ }\r
+ \r
+ // Prev Value\r
+ double prevValueDouble = Double.NaN;\r
+ Variant pv = i.itemState.currentValue;\r
+ if (pv!=null && pv.getBinding() instanceof NumberBinding ) {\r
+ prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );\r
+ } else {\r
+ prevValueDouble = Double.NaN;\r
+ }\r
+ \r
+ // The time of the first sample of region\r
+ double firstTime = i.itemState.firstTime;\r
+ // Prev time\r
+ double prevTime = i.itemState.currentTime;\r
+ \r
+ boolean restep = prevTime == currentTime;\r
+ \r
+ // There was change in NaN\r
+ boolean isNaNChange = i.itemState.isNaN != isNanValue;\r
+ \r
+ // There was change in validity of the value\r
+ boolean isValidChange = i.itemState.isValid != isValidValue;\r
+ \r
+ //boolean wasValidValue = !i.itemState.isValid && isValidValue;\r
+ \r
+ boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );\r
+ \r
+ boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; \r
+ \r
+ boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&\r
+ i.itemState.deadband==Double.MAX_VALUE &&\r
+ i.binding.getComponentIndex("min")>=0 &&\r
+ i.binding.getComponentIndex("max")>=0;\r
+\r
+ // Deadband\r
+ if (isValidValue && i.isNumeric && i.itemState.count>=1) {\r
+ \r
+ if (i.itemState.isValid) {\r
+ double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );\r
+ double diff = Math.abs( currentValueDouble - firstValueDouble );\r
+ if( hasDeadband )\r
+ i.itemState.ooDeadband |= diff >= i.itemState.deadband;\r
+ else\r
+ i.itemState.ooDeadband |= diff != 0.0;\r
+ } else {\r
+ i.itemState.ooDeadband = true;\r
+ }\r
+ \r
+ // NaN change is deadband change\r
+ i.itemState.ooDeadband |= isNaNChange;\r
+ } else {\r
+ // The dead band cannot be have broken in first sample\r
+ i.itemState.ooDeadband = false;\r
+ }\r
+\r
+\r
+ // Interval\r
+ boolean ooInterval = true;\r
+ if ( hasInterval ) {\r
+ \r
+// System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);\r
+\r
+ // First band is a special case\r
+ if(isFirstValue) {\r
+ // If this is the first sample store the current time as first time\r
+ if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;\r
+ int sn1 = sampleNum(firstTime,i.itemState.interval);\r
+ int sn2 = sampleNum(currentTime,i.itemState.interval);\r
+ // Wait until next sample if this position is not already exactly at sample border\r
+ // Always start minmax immediately\r
+ if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;\r
+ } else {\r
+ int sn1 = sampleNum(prevTime,i.itemState.interval);\r
+ int sn2 = sampleNum(currentTime,i.itemState.interval);\r
+// System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);\r
+ ooInterval = sn2 > sn1;\r
+ }\r
+\r
+ \r
+// \r
+// double n = (currentTime+EPSILON) / i.itemState.interval;\r
+// double n2 = n-Math.floor(n);\r
+// \r
+// \r
+// double deltaTime = currentTime - firstTime;\r
+// ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;\r
+ }\r
+ \r
+ // Should new sample be created, or old sample extended / forwarded in time\r
+ boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;\r
+ \r
+ // Update item min-max, avg, median\r
+ if ( isValidValue && i.isNumeric ) {\r
+ if (i.current.hasAvg() && i.isNumeric) {\r
+ if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;\r
+ if (isValidValue && i.isNumeric) {\r
+ double timeDelta = currentTime - i.itemState.currentTime; \r
+ i.itemState.sum += timeDelta * currentValueDouble;\r
+ }\r
+ }\r
+\r
+ if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {\r
+ double deltaTime = currentTime - prevTime;\r
+ i.itemState.median.add( deltaTime, prevValueDouble );\r
+ }\r
+ }\r
+\r
+ // Increase the counter that tracks the number of samples acquired into the band\r
+ if (!restep) i.itemState.count++;\r
+\r
+ // Remember values \r
+ i.itemState.currentTime = currentTime;\r
+ i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); \r
+ i.itemState.isNaN = isNanValue;\r
+ i.itemState.isValid = isValidValue;\r
+ \r
+ // Put discontinuation marker\r
+ if ( (wasDisabled && i.current.supportsNullValue())) {\r
+ // Extend marker\r
+ if (i.current.isNullValue()) {\r
+ if (i.current.hasEndTime()) {\r
+ i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );\r
+ }\r
+ \r
+ if (i.current.hasCount()) {\r
+ i.current.setCount( i.current.getCount()+1 );\r
+ }\r
+ \r
+ // Add the band to the file stream\r
+ i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );\r
+ }\r
+ \r
+ // Add marker\r
+ i.current.reset();\r
+ i.current.setValueNull();\r
+ i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);\r
+ i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);\r
+ i.stream.addNoflush( i.binding, i.current.getSample() );\r
+ }\r
+ \r
+ boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); \r
+\r
+ if(i.stream.size() > 0 && !i.itemState.ooDeadband) {\r
+\r
+ /// Extend existing band\r
+ // Extend endTime value\r
+ if (!i.hasEndTime) {\r
+ i.current.setTime( time.getBinding(), time.getValue() );\r
+ } else {\r
+ i.current.setEndTime( time.getBinding(), time.getValue() );\r
+ }\r
+\r
+ if (isValidValue) {\r
+ // Set last value\r
+ if (i.current.hasLastValue()) {\r
+ i.current.setLastValue( value.getBinding(), value.getValue() );\r
+ }\r
+\r
+ // Add sum\r
+ if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {\r
+ double duration = (currentTime - i.itemState.firstTime);\r
+ if(duration < 1e-9) {\r
+ i.current.setAvg( Bindings.DOUBLE, 0.0 ); \r
+ } else {\r
+ i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration); \r
+ }\r
+ }\r
+\r
+ // Update min-max\r
+ if (i.current.hasMin() && i.isNumeric && !isNanValue) {\r
+ Binding minBinding = i.current.getMinBinding();\r
+ Object prevMinValue = i.current.getMin();\r
+ Object currentValueWithMinBinding = value.getValue( minBinding );\r
+ int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );\r
+ if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );\r
+ } \r
+ if (i.current.hasMax() && i.isNumeric && !isNanValue) {\r
+ Binding maxBinding = i.current.getMaxBinding();\r
+ Object prevMaxValue = i.current.getMax();\r
+ Object currentValueWithMaxBinding = value.getValue( maxBinding );\r
+ int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );\r
+ if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );\r
+ }\r
+\r
+ // Update median\r
+ if (i.current.hasMedian() && i.isNumeric && !isNanValue) {\r
+ Double median = i.itemState.median.getMedian();\r
+ i.current.setMedian( Bindings.DOUBLE, median);\r
+ } \r
+ } else {\r
+ i.current.setValueNull();\r
+ }\r
+\r
+ // Add count\r
+ if (i.current.hasCount()) {\r
+ i.current.setCount( i.itemState.count );\r
+ }\r
+\r
+ // Write entry to file stream\r
+ // if (i.current.getTimeDouble()<0) \r
+ // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));\r
+ i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() ); \r
+ }\r
+ \r
+ if (condition) {\r
+ \r
+ // Start a new value band\r
+ i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );\r
+ i.itemState.firstTime = currentTime;\r
+ i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );\r
+ i.itemState.currentTime = currentTime;\r
+ i.itemState.isNaN = isNanValue;\r
+ i.itemState.isValid = isValidValue;\r
+ i.itemState.sum = 0;\r
+ i.itemState.count = 1;\r
+ i.itemState.ooDeadband = false;\r
+ if (i.itemState.median != null) {\r
+ i.itemState.median.clear();\r
+ if (!Double.isNaN(prevValueDouble) ) {\r
+ double deltaTime = currentTime - prevTime;\r
+ i.itemState.median.add( deltaTime, prevValueDouble );\r
+ }\r
+ }\r
+ \r
+ // New Sample\r
+ i.current.reset();\r
+ \r
+ // Start new band with current value & time\r
+ i.current.setTime( time.getBinding(), time.getValue() ); \r
+\r
+ // Is valid value\r
+ if (i.current.hasEndTime()) {\r
+ i.current.setEndTime( time.getBinding(), time.getValue() );\r
+ }\r
+ \r
+ if (isValidValue) {\r
+ i.current.setValue( value.getBinding(), value.getValue() );\r
+ i.current.setMax( value.getBinding(), value.getValue() );\r
+ i.current.setMin( value.getBinding(), value.getValue() );\r
+ i.current.setLastValue( value.getBinding(), value.getValue() );\r
+ i.current.setAvg( value.getBinding(), value.getValue() );\r
+ i.current.setMedian( value.getBinding(), value.getValue() ); \r
+ } else {\r
+ i.current.setValueNull();\r
+ }\r
+ \r
+ \r
+ // Sample Count=1 for new bands (time, value) "added"\r
+ if (i.current.hasCount()) {\r
+ i.current.setCount( 1 );\r
+ }\r
+ \r
+ // Add the band to the file stream\r
+ i.stream.addNoflush( i.binding, i.current.getSample() );\r
+ \r
+ } \r
+ \r
+ }\r
+\r
+ @Override\r
+ public void flush() /*throws HistoryException*/ {\r
+ for (ActiveStream i : items) {\r
+ try { \r
+ i.stream.flush();\r
+ } catch (AccessorException e) {\r
+ logger.log(Level.FINE, "File history flush failed.", e); \r
+// throw new HistoryException(e);\r
+ }\r
+ }\r
+ }\r
+\r
+ public void flush(Set<String> ids) /*throws HistoryException*/ {\r
+ for (ActiveStream i : items) {\r
+ if (!ids.contains(i.itemState.id)) continue;\r
+ try { \r
+ i.stream.flush();\r
+ } catch (AccessorException e) {\r
+ logger.log(Level.FINE, "File history flush failed.", e); \r
+// throw new HistoryException(e);\r
+ }\r
+ }\r
+ }\r
+ \r
+ \r
+ @Override\r
+ public synchronized void close()\r
+ {\r
+ // Write items back to history\r
+ List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );\r
+ for (CollectorState.Item itemState : state.itemStates.values()) {\r
+ try {\r
+ if (!history.exists(itemState.id)) continue;\r
+ Bean bean = history.getItem(itemState.id);\r
+ bean.readAvailableFields(itemState);\r
+ beans.add( setItemState(bean, itemState) );\r
+ } catch (BindingException e) {\r
+ logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+ } catch (BindingConstructionException e) {\r
+ logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+ }\r
+ }\r
+ try {\r
+ history.modify(beans.toArray( new Bean[beans.size()] ));\r
+ } catch (HistoryException e) {\r
+ logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+ }\r
+ state.itemStates.clear();\r
+ \r
+ // Close streams\r
+ for (ActiveStream item : items) {\r
+ item.close();\r
+ }\r
+ items.clear();\r
+ }\r
+\r
+ public class ActiveStream {\r
+\r
+ /** Sample Stream */\r
+ public StreamAccessor stream;\r
+ \r
+ /** Sample binding */\r
+ public RecordBinding binding;\r
+ \r
+ /** Value binding (Generic) */\r
+ public Binding valueBinding;\r
+ \r
+ /** Set true if this item has numeric value type */\r
+ public boolean isNumeric;\r
+ \r
+ /** Set true if this item has end time */\r
+ public boolean hasEndTime;\r
+ \r
+ /** Item entry in subscription state */\r
+ public CollectorState.Item itemState;\r
+ \r
+ /** Interface to current last sample (in the stream) */\r
+ public ValueBand current;\r
+ \r
+ /** Adapter that converts value to double value, null if not numeric */\r
+ public Adapter valueToDouble, doubleToValue;\r
+ \r
+ void close() {\r
+ try {\r
+ if (stream != null) { \r
+ stream.close();\r
+ }\r
+ stream = null;\r
+ } catch (AccessorException e) {\r
+ logger.log(Level.FINE, "Error closing stream", e);\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public StreamAccessor openStream(String itemId, String mode) \r
+ throws HistoryException \r
+ {\r
+ return history.openStream(itemId, mode);\r
+ }\r
+\r
+ @Override\r
+ public HistoryManager getHistory() {\r
+ return history;\r
+ }\r
+\r
+}\r