]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.history / src / org / simantics / history / impl / CollectorImpl.java
diff --git a/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java b/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java
new file mode 100644 (file)
index 0000000..c001adc
--- /dev/null
@@ -0,0 +1,1040 @@
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.history.impl;\r
+\r
+import gnu.trove.set.hash.THashSet;\r
+\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.Set;\r
+import java.util.UUID;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.accessor.StreamAccessor;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.adapter.AdaptException;\r
+import org.simantics.databoard.adapter.Adapter;\r
+import org.simantics.databoard.adapter.AdapterConstructionException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.DoubleBinding;\r
+import org.simantics.databoard.binding.FloatBinding;\r
+import org.simantics.databoard.binding.NumberBinding;\r
+import org.simantics.databoard.binding.RecordBinding;\r
+import org.simantics.databoard.binding.error.BindingConstructionException;\r
+import org.simantics.databoard.binding.error.BindingException;\r
+import org.simantics.databoard.binding.error.RuntimeBindingException;\r
+import org.simantics.databoard.binding.impl.DoubleBindingDefault;\r
+import org.simantics.databoard.binding.mutable.MutableVariant;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.binding.reflection.VoidBinding;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.type.Component;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.databoard.type.NumberType;\r
+import org.simantics.databoard.type.RecordType;\r
+import org.simantics.databoard.util.Bean;\r
+import org.simantics.history.Collector;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.HistoryManager;\r
+import org.simantics.history.impl.CollectorState.Item;\r
+import org.simantics.history.impl.CollectorState.VariableState;\r
+import org.simantics.history.util.ValueBand;\r
+import org.simantics.history.util.WeightedMedian;\r
+import org.simantics.history.util.subscription.SubscriptionItem;\r
+\r
+/**\r
+ * Collector is used to record data to streams in history manager.\r
+ * \r
+ * For every item there is collector item state {@link CollectorState.Item}, \r
+ * that contains data required in writing process. This state is read and \r
+ * written to history manager item (bean) as meta-data. \r
+ * \r
+ * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. \r
+ * \r
+ * @author toni.kalajainen\r
+ *\r
+ */\r
+public class CollectorImpl implements Collector {\r
+\r
+       /** Logger */\r
+       static Logger logger = Logger.getLogger(CollectorImpl.class.getName());\r
+       \r
+       public static final int MEDIAN_LIMIT = 500;\r
+       \r
+       public static final double EPSILON = 1e-2;\r
+       \r
+       /** Collector state */\r
+       CollectorState state;\r
+\r
+       /** Open streams, corresponds to subscription items */\r
+       CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();\r
+       \r
+       /** Associated history state */ \r
+       HistoryManager history;\r
+       \r
+       /** True when all items are open */\r
+       boolean itemsOpen = true;\r
+       \r
+       public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;\r
+       \r
+       public FlushPolicy getFlushPolicy() {\r
+               return flushPolicy;\r
+       }\r
+\r
+       public void setFlushPolicy(FlushPolicy flushPolicy) {\r
+               this.flushPolicy = flushPolicy;\r
+       }\r
+\r
+       /**\r
+        * Create a new collector that is associated with a history manager.\r
+        * \r
+        * @param history\r
+        * @throws HistoryException\r
+        */\r
+       public CollectorImpl(HistoryManager history) {\r
+               this.history = history;\r
+               this.state = new CollectorState();\r
+               this.state.init();\r
+               this.state.id = UUID.randomUUID().toString();\r
+       }\r
+\r
+       /**\r
+        * Create a new collector that is associated with a history manager.\r
+        * \r
+        * @param history\r
+        * @param previousState\r
+        * @throws HistoryException\r
+        */\r
+       public CollectorImpl(HistoryManager history, CollectorState previousState) {\r
+               this.history = history;\r
+               this.state = (CollectorState) previousState.clone();\r
+       }\r
+       \r
+       @Override\r
+       public synchronized void addItem(Bean item) throws HistoryException {\r
+               try {\r
+                       String id = (String) item.getField("id");\r
+                       \r
+                       // Item State\r
+                       CollectorState.Item itemState = null;\r
+                       if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  \r
+                       if ( itemState == null ) itemState = state.itemStates.get( id );\r
+                       if ( itemState == null ) {\r
+                               // Create new item\r
+                               itemState = new CollectorState.Item();\r
+                               itemState.init();\r
+                               itemState.readAvailableFields( item );\r
+                               // Guess values from the existing data  \r
+                               if (history.exists(id)) {\r
+                                       readItemState(history, item, itemState);\r
+                               }\r
+                       }\r
+                       \r
+                       addItem(id, itemState);                 \r
+               } catch (BindingException e) {\r
+                       throw new HistoryException( e ); \r
+               }\r
+       }\r
+       \r
+       public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {\r
+               try {                   \r
+                       // Item State\r
+                       state.itemStates.put(id, itemState);\r
+                       itemsOpen = false;\r
+                       \r
+                       // Time                 \r
+                       if (state.time.type() instanceof NumberType) {                          \r
+                               double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);\r
+                               if ( !Double.isNaN(itemState.currentTime) ) {\r
+                                       if ( itemState.currentTime > time ) {\r
+                                               state.time.setValue(Bindings.DOUBLE, itemState.currentTime);\r
+                                       }\r
+                               }                               \r
+                       }\r
+               } catch (AdaptException e) {\r
+                       throw new HistoryException( e ); \r
+               }\r
+       }\r
+       \r
+       public synchronized void addItems(Bean...items) throws HistoryException {\r
+               for (Bean item : items) addItem(item);\r
+       }\r
+       \r
+       @Override\r
+       public synchronized void removeItem(String id) throws HistoryException {\r
+               CollectorState.Item itemState = state.itemStates.remove( id );\r
+               if (itemState!=null && history.exists(id)) {\r
+                       Bean bean = history.getItem(id);\r
+                       bean.readAvailableFields(itemState);\r
+                       try {\r
+                               bean = setItemState(bean, itemState);\r
+                               history.modify(bean);\r
+                       } catch (BindingException e) {\r
+                               // Unexpected\r
+                               logger.log(Level.FINE, "Failed to update bean", e);                     \r
+                       } catch (BindingConstructionException e) {\r
+                               // Unexpected\r
+                               logger.log(Level.FINE, "Failed to update bean", e);\r
+                       } catch (HistoryException e) {\r
+                               logger.log(Level.FINE, "Failed to update history item meta-data", e);\r
+                       }\r
+               }\r
+               \r
+               ActiveStream as = getStream(id);\r
+               if ( as != null ) {\r
+                       items.remove( as );\r
+                       as.close();\r
+               }\r
+       }\r
+       \r
+       ActiveStream getStream(String id) {\r
+               for (ActiveStream as : items) {\r
+                       if (as.itemState.id.equals(id)) return as;\r
+               }\r
+               return null;\r
+       }\r
+       \r
+       /**\r
+        * Set (or Add) item's configuration. \r
+        * \r
+        * @param item item\r
+        * @throws HistoryException\r
+        */\r
+       public synchronized void setItem(Bean item) throws HistoryException {\r
+               try {\r
+                       String id = (String) item.getField("id");\r
+                       \r
+                       CollectorState.Item itemState = state.itemStates.get( id );\r
+                       if ( itemState==null ) {\r
+                               addItem(item);\r
+                               return;\r
+                       }\r
+                       itemState.readAvailableFields(item);\r
+               } catch (BindingException e) {\r
+                       throw new HistoryException( e ); \r
+               }\r
+       }\r
+       \r
+       \r
+       /**\r
+        * Set item state to bean. If the bean does not have field collectorState,\r
+        * a new bean class is created, and field added.\r
+        *  \r
+        * @param config\r
+        * @param state\r
+        * @return same or new bean\r
+        * @throws BindingException \r
+        * @throws BindingConstructionException \r
+        */\r
+       public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException\r
+       {\r
+               Binding binding = config.getFieldBinding("collectorState");\r
+               if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {\r
+                       RecordType rt = config.getBinding().type();\r
+                       RecordType newType = new RecordType();\r
+                       for (int i=0; i<rt.getComponentCount(); i++) {\r
+                               Component c = rt.getComponent(i);\r
+                               if ( c.name.equals("collectorState") ) continue;\r
+                               newType.addComponent(c.name, c.type);\r
+                       }\r
+                       newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());\r
+                       Binding newBinding = Bindings.getBeanBinding(newType);\r
+                       Bean newConfig = (Bean) newBinding.createDefault();\r
+                       newConfig.readAvailableFields( config );\r
+                       config = newConfig;\r
+               }\r
+               config.setField("collectorState", CollectorState.BINDING_ITEM, state);\r
+               return config;\r
+       }\r
+       \r
+       public SubscriptionItem[] getItems() {\r
+               int c = state.itemStates.size();\r
+               SubscriptionItem[] items = new SubscriptionItem[ c ];\r
+               int ix = 0;\r
+               Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );\r
+               for (CollectorState.Item item : state.itemStates.values()) {\r
+                       _HistoryItem hi = new _HistoryItem();\r
+                       hi.init();\r
+                       hi.readAvailableFields(item);\r
+                       //hi.collectorState = (Item) item.clone();\r
+                       \r
+                       // Workaround (clone doesn't work for WeightedMedian)\r
+                       try {\r
+                               byte[] data = s.serialize( item );                              \r
+                               hi.collectorState = (Item) s.deserialize(data);\r
+                       } catch (IOException e) {\r
+                       }\r
+                                       \r
+                       items[ix++] = hi;\r
+               }\r
+               return items;\r
+       }\r
+       \r
+       public static class _HistoryItem extends SubscriptionItem {\r
+               public CollectorState.Item collectorState;\r
+       }\r
+       \r
+       /**\r
+        * Creates and loads active item. TimeSeries stream file is opened.\r
+        * \r
+        * @param itemState\r
+        * @return new active item\r
+        * @throws HistoryException\r
+        */\r
+       ActiveStream openStream(CollectorState.Item itemState) throws HistoryException \r
+       {\r
+               // New Active Item\r
+               ActiveStream x = new ActiveStream();\r
+               x.itemState = itemState;\r
+\r
+               // Stream\r
+               {\r
+                       if ( !history.exists(x.itemState.id) ) {\r
+                               throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");\r
+                       }\r
+\r
+                       x.stream = history.openStream(x.itemState.id, "rw");\r
+               }\r
+\r
+               // Sample Binding\r
+               Datatype sampleType = x.stream.type().componentType;\r
+               itemState.format = sampleType;\r
+               try { \r
+                       x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );\r
+               } catch ( RuntimeException e ) {\r
+                       System.err.println("Could not create bean binding for type "+sampleType);\r
+                       x.binding = (RecordBinding) Bindings.getBinding( sampleType );          \r
+               }\r
+\r
+               // Value binding\r
+               x.valueBinding = x.binding.getComponentBinding("value");\r
+               x.isNumeric = x.valueBinding instanceof NumberBinding;\r
+               x.hasEndTime = x.binding.getComponentIndex("endTime")>0;\r
+\r
+               // Value adapter\r
+               if ( x.isNumeric ) \r
+               try {\r
+                       x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);\r
+                       x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);\r
+               } catch (AdapterConstructionException e1) {\r
+                       x.valueToDouble = null;\r
+               }\r
+\r
+               // Create valueband, and read last entry into memory\r
+               try {\r
+                       Object currentEntry = x.binding.createDefault();\r
+                       if (x.stream.size() >= 1) {\r
+                               x.stream.get(x.stream.size() - 1, x.binding, currentEntry);\r
+                       }\r
+                       x.current = new ValueBand(x.binding);\r
+                       x.current.setSample(currentEntry);\r
+               } catch (AccessorException e) {\r
+                       throw new HistoryException(\r
+                                       "Could not read sample from stream file, " + e, e);\r
+               } catch (BindingException e) {\r
+                       throw new HistoryException(\r
+                                       "Could not read sample from stream file, " + e, e);\r
+               }\r
+\r
+               // Item State\r
+               x.itemState = itemState;\r
+\r
+               // Median\r
+               if (x.isNumeric && x.current.hasMedian()\r
+                               && x.itemState.median == null) {\r
+                       x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );\r
+               }\r
+\r
+               return x;\r
+       }\r
+                       \r
+       /**\r
+        * Read itemState from history. Item must exist. \r
+        * \r
+        * @param si subscription item\r
+        * @return state item\r
+        * @throws RuntimeBindingException\r
+        * @throws HistoryException\r
+        */\r
+       static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {\r
+               // Create item state\r
+               item.readAvailableFields(si);\r
+               // Read state from items\r
+               StreamAccessor sa = history.openStream(item.id, "r");\r
+               try {\r
+                       Datatype sampleType = sa.type().componentType;\r
+                       \r
+                       Binding sampleBinding = Bindings.getBinding(sampleType);\r
+                       if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");\r
+                       Object sample = sampleBinding.createDefaultUnchecked();\r
+                       ValueBand v = new ValueBand(sampleBinding, sample);\r
+                       item.count = sa.size();\r
+                       if ( item.count>0 ) {\r
+                               sa.get(0, sampleBinding, sample);\r
+                               item.firstTime = v.getTimeDouble();\r
+                               Binding b = v.getValueBinding();\r
+                               Object value = v.getValue(b);\r
+                               item.firstValue.setValue(b, value);\r
+                                       \r
+                               sa.get(item.count-1, sampleBinding, sample);\r
+                               item.currentTime = v.getTimeDouble();\r
+                               b = v.getValueBinding();\r
+                               value = v.getValue(b);\r
+                               item.currentValue.setValue(b, value);\r
+                                       \r
+                               if ( item.currentValue.getBinding() instanceof DoubleBinding ) {\r
+                                       DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();\r
+                                       value = item.currentValue.getValue(db);\r
+                                       item.isNaN = Double.isNaN( (Double) value );\r
+                               }\r
+                       }\r
+                               \r
+               } catch (AccessorException e) {\r
+                       throw new HistoryException( e );\r
+               } catch (AdaptException e) {\r
+                       throw new HistoryException( e );\r
+               } finally {\r
+                       try { sa.close(); } catch (AccessorException e) {}\r
+               }\r
+       }\r
+\r
+       @Override\r
+       public CollectorState getState() {\r
+               return (CollectorState) state.clone();\r
+       }\r
+\r
+       @Override\r
+       public void setState(Bean newState) {\r
+               if (newState == null)\r
+                       return;\r
+               if (state == null) {\r
+                       state = new CollectorState();\r
+                       state.init();\r
+               }\r
+               state.readAvailableFields(newState);\r
+\r
+               // Make sure that openAllItems ensures that all streams are opened properly.\r
+               // This is because setItem will not invoke addItem if collectorState exists\r
+               // for an item and therefore itemsOpen might not otherwise be set to false.\r
+               itemsOpen = false;\r
+       }\r
+\r
+       @Override\r
+       public Object getTime(Binding binding) throws HistoryException {\r
+               MutableVariant v = state.time;\r
+               if (v==null) return null;\r
+               try {\r
+                       return v.getValue(binding);\r
+               } catch (AdaptException e) {\r
+                       throw new HistoryException(e);\r
+               }\r
+       }\r
+\r
+       @Override\r
+       public Object getValue(String id, Binding binding) throws HistoryException {\r
+               VariableState vs = state.values.get(id);\r
+               if (vs==null) return null;\r
+               \r
+               if (!vs.isValid) return null;\r
+               \r
+               try {\r
+                       return vs.value.getValue(binding);\r
+               } catch (AdaptException e) {\r
+                       throw new HistoryException(e);\r
+               }\r
+       }\r
+       \r
+       void openAllItems() throws HistoryException {\r
+               if (itemsOpen) return;\r
+\r
+               // Assert all items are open\r
+               // 1. Collect all items that still need to be opened.\r
+               Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );\r
+               for (ActiveStream as : items)\r
+                       itemsToOpen.remove(as.itemState.id);\r
+\r
+               // 2. Open missing streams\r
+               List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());\r
+               for (String itemId : itemsToOpen) {\r
+                       CollectorState.Item item = state.itemStates.get( itemId );\r
+                       ActiveStream as = openStream( item );\r
+                       opened.add(as);\r
+               }\r
+\r
+               // 3. Update list of open streams\r
+               items.addAll(opened);\r
+               itemsOpen = true;\r
+       }\r
+\r
+       private void updateDt(NumberBinding binding, Object time) {\r
+               try {\r
+                       Double current = (Double)getTime(Bindings.DOUBLE);\r
+                       if(current != null) {\r
+                               double t = binding.getValue(time).doubleValue();\r
+                               state.dT = t-current;\r
+                               if(state.dT > 0) return;\r
+                       }\r
+               } catch (Throwable t) {\r
+               }\r
+               state.dT = 1.0;\r
+       }\r
+       \r
+       @Override\r
+       public void beginStep(NumberBinding binding, Object time) throws HistoryException {\r
+               openAllItems();\r
+               updateDt(binding, time);\r
+               state.time.setValue(binding, time);\r
+       }\r
+\r
+       @Override\r
+       public void setValue(String id, Binding binding, Object value) throws HistoryException {\r
+               VariableState vs = state.values.get( id );\r
+               if (vs == null) {\r
+                       vs = new VariableState();\r
+                       vs.value = new MutableVariant(binding, value);\r
+                       state.values.put(id, vs);\r
+               }\r
+               \r
+               if (value==null) {\r
+                       vs.value.setValue(VoidBinding.VOID_BINDING, null);\r
+                       vs.isNan = true;\r
+                       vs.isValid = false;\r
+                       return;\r
+               } \r
+                                       \r
+               vs.value.setValue(binding, value);\r
+               vs.isValid = true;\r
+                       \r
+               if (binding instanceof DoubleBinding) {\r
+                       DoubleBinding db = (DoubleBinding) binding;\r
+                       try {\r
+                               double _v = db.getValue_(value);\r
+                               vs.isNan = Double.isNaN( _v );\r
+                       } catch (BindingException e) {\r
+                               vs.isNan = true;\r
+                               vs.isValid = false;\r
+                       }\r
+               } else if (binding instanceof FloatBinding) {\r
+                       FloatBinding db = (FloatBinding) binding;\r
+                       try {\r
+                               float _v = db.getValue_(value);\r
+                               vs.isNan = Float.isNaN( _v );\r
+                       } catch (BindingException e) {\r
+                               vs.isNan = true;\r
+                               vs.isValid = false;\r
+                       }\r
+               } else {\r
+                       vs.isNan = false;\r
+               }\r
+       }\r
+\r
+       @Override\r
+       public boolean getValue(String id, MutableVariant result) {\r
+               VariableState vs = state.values.get(id);\r
+               if (vs==null) return false;\r
+               \r
+               if (!vs.isValid) return false;\r
+               \r
+               result.setValue(vs.value.getBinding(), vs.value.getValue());\r
+               return true;\r
+       }\r
+\r
+       @Override\r
+       public void endStep() throws HistoryException {\r
+               // Write values to streams\r
+               for (ActiveStream i : items)\r
+               {\r
+                       try {\r
+                               putValueToStream(i);                            \r
+                               if (flushPolicy == FlushPolicy.FlushOnEveryStep) {\r
+                                       i.stream.flush();\r
+                               }\r
+                       } catch (AdaptException e) {\r
+                               throw new HistoryException( e );\r
+//                             FileAccessor fa = (FileAccessor) i.stream;\r
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+                       } catch (AccessorException e) {\r
+                               throw new HistoryException( e );\r
+//                             FileAccessor fa = (FileAccessor) i.stream;\r
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+                       } catch (BindingException e) {\r
+                               throw new HistoryException( e );\r
+//                             FileAccessor fa = (FileAccessor) i.stream;\r
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
+                       }\r
+               }\r
+       }\r
+               \r
+       private int sampleNum(double time, double interval) {\r
+               // 1.0 is needed to cover the case of 0.0\r
+               return (int)(1.0 + (time+EPSILON*state.dT)/interval);\r
+       }\r
+\r
+       private boolean isExactInterval(double time, double interval) {\r
+               return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);\r
+       }\r
+\r
+       /**\r
+        * This method puts a time,value to a stream.\r
+        * \r
+        * The stream is "packed" when the new value is equal to previous, or the\r
+        * value is within deadband (in comparison to the first value), or if\r
+        * the interval zone is not exceeded.\r
+        * \r
+        * The stream can be packed two different ways, depending on the sample\r
+        * format. The sample format can be (a) single value or (b) a time range of\r
+        * multiple values, Ranged Sample. \r
+        * \r
+        * Single value is packed by removing middle values, in other words, \r
+        * moving the timecode of the last sample.\r
+        * \r
+        * Ranged sample is packed by extending the endTime code of the last sample.\r
+        * Other fields, such as avg, sum, count, lastValue are also updated.\r
+        * \r
+        * @param i\r
+        * @throws AdaptException\r
+        * @throws AccessorException \r
+        * @throws BindingException \r
+        * @throws HistoryException \r
+        */\r
+       private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException\r
+       {\r
+               // Current Time\r
+               Variant time = state.time;\r
+               double currentTime = Double.NaN;\r
+               if (time != null) {\r
+                       Double x = (Double) time.getValue(Bindings.DOUBLE);\r
+                       currentTime = x == null ? Double.NaN : x;\r
+               }\r
+\r
+               // Is first value\r
+               boolean isFirstValue = i.stream.size() == 0;\r
+               // Is this item disabled\r
+               boolean isDisabled = !i.itemState.enabled; \r
+               // Discontinuety, first value after being disabled\r
+               boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;\r
+               \r
+               // Update currently disabled status\r
+               i.itemState.isDisabled = isDisabled;\r
+\r
+               // Update time codes\r
+               if (isDisabled) {\r
+                       i.itemState.count++;\r
+                       i.itemState.lastDisabledTime = currentTime;\r
+                       if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;\r
+                       return;\r
+               }\r
+               \r
+               // Is nan value, for doubles and floats\r
+               boolean isNanValue;\r
+               // Is valid value\r
+               boolean isValidValue;\r
+               // Value as variant\r
+               MutableVariant value;\r
+               // Get current value as double\r
+               double currentValueDouble;\r
+               \r
+               VariableState vs = state.values.get( i.itemState.variableId );                  \r
+               if ( vs!=null ) {\r
+                       isNanValue = vs.isNan;\r
+                       isValidValue = vs.isValid;\r
+                       value = vs.value;\r
+                       if (i.isNumeric && isValidValue && !isNanValue) {\r
+                               currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );\r
+                               if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {\r
+                                       currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;\r
+                                       Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);\r
+                                       value = new MutableVariant( i.valueBinding, x );\r
+                               }\r
+                       } else {\r
+                               currentValueDouble = Double.NaN;\r
+                       }\r
+                       \r
+               } else {\r
+                       isNanValue = true;\r
+                       isValidValue = false;\r
+                       value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );\r
+                       currentValueDouble = Double.NaN;\r
+               }\r
+                               \r
+               // Prev Value\r
+               double prevValueDouble = Double.NaN;\r
+               Variant pv = i.itemState.currentValue;\r
+               if (pv!=null && pv.getBinding() instanceof NumberBinding ) {\r
+                       prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );\r
+               } else {\r
+                       prevValueDouble = Double.NaN;\r
+               }\r
+               \r
+               // The time of the first sample of region\r
+               double firstTime = i.itemState.firstTime;\r
+               // Prev time\r
+               double prevTime = i.itemState.currentTime;\r
+               \r
+               boolean restep = prevTime == currentTime;\r
+               \r
+               // There was change in NaN\r
+               boolean isNaNChange = i.itemState.isNaN != isNanValue;\r
+               \r
+               // There was change in validity of the value\r
+               boolean isValidChange = i.itemState.isValid != isValidValue;\r
+               \r
+               //boolean wasValidValue = !i.itemState.isValid && isValidValue;\r
+               \r
+               boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );\r
+               \r
+               boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; \r
+               \r
+               boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&\r
+                               i.itemState.deadband==Double.MAX_VALUE &&\r
+                               i.binding.getComponentIndex("min")>=0 &&\r
+                               i.binding.getComponentIndex("max")>=0;\r
+\r
+               // Deadband\r
+               if (isValidValue && i.isNumeric && i.itemState.count>=1) {\r
+                       \r
+                       if (i.itemState.isValid) {\r
+                               double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );\r
+                               double diff = Math.abs( currentValueDouble - firstValueDouble );\r
+                               if( hasDeadband )\r
+                                       i.itemState.ooDeadband |= diff >= i.itemState.deadband;\r
+                               else\r
+                                       i.itemState.ooDeadband |= diff != 0.0;\r
+                       } else {\r
+                               i.itemState.ooDeadband = true;\r
+                       }\r
+                       \r
+                       // NaN change is deadband change\r
+                       i.itemState.ooDeadband |= isNaNChange;\r
+               } else {\r
+                       // The dead band cannot be have broken in first sample\r
+                       i.itemState.ooDeadband = false;\r
+               }\r
+\r
+\r
+               // Interval\r
+               boolean ooInterval = true;\r
+               if ( hasInterval ) {\r
+                       \r
+//                     System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);\r
+\r
+                       // First band is a special case\r
+                       if(isFirstValue) {\r
+                               // If this is the first sample store the current time as first time\r
+                               if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;\r
+                               int sn1 = sampleNum(firstTime,i.itemState.interval);\r
+                               int sn2 = sampleNum(currentTime,i.itemState.interval);\r
+                               // Wait until next sample if this position is not already exactly at sample border\r
+                               // Always start minmax immediately\r
+                               if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;\r
+                       } else {\r
+                               int sn1 = sampleNum(prevTime,i.itemState.interval);\r
+                               int sn2 = sampleNum(currentTime,i.itemState.interval);\r
+//                             System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);\r
+                               ooInterval = sn2 > sn1;\r
+                       }\r
+\r
+                       \r
+//                     \r
+//                     double n = (currentTime+EPSILON) / i.itemState.interval;\r
+//                     double n2 = n-Math.floor(n);\r
+//                     \r
+//                     \r
+//                     double deltaTime = currentTime - firstTime;\r
+//                     ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;\r
+               }\r
+               \r
+               // Should new sample be created, or old sample extended / forwarded in time\r
+               boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;\r
+               \r
+               // Update item min-max, avg, median\r
+               if ( isValidValue && i.isNumeric ) {\r
+                       if (i.current.hasAvg() && i.isNumeric) {\r
+                               if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;\r
+                               if (isValidValue && i.isNumeric) {\r
+                                       double timeDelta = currentTime - i.itemState.currentTime;                                                       \r
+                                       i.itemState.sum += timeDelta * currentValueDouble;\r
+                               }\r
+                       }\r
+\r
+                       if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {\r
+                               double deltaTime = currentTime - prevTime;\r
+                               i.itemState.median.add( deltaTime, prevValueDouble );\r
+                       }\r
+               }\r
+\r
+               // Increase the counter that tracks the number of samples acquired into the band\r
+               if (!restep) i.itemState.count++;\r
+\r
+               // Remember values \r
+               i.itemState.currentTime = currentTime;\r
+               i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      \r
+               i.itemState.isNaN = isNanValue;\r
+               i.itemState.isValid = isValidValue;\r
+               \r
+               // Put discontinuation marker\r
+               if ( (wasDisabled && i.current.supportsNullValue())) {\r
+                       // Extend marker\r
+                       if (i.current.isNullValue()) {\r
+                               if (i.current.hasEndTime()) {\r
+                                       i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );\r
+                               }\r
+                               \r
+                               if (i.current.hasCount()) {\r
+                                       i.current.setCount( i.current.getCount()+1 );\r
+                               }\r
+                               \r
+                               // Add the band to the file stream\r
+                               i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );\r
+                       }\r
+                       \r
+                       // Add marker\r
+                       i.current.reset();\r
+                       i.current.setValueNull();\r
+                       i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);\r
+                       i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);\r
+                       i.stream.addNoflush( i.binding, i.current.getSample() );\r
+               }\r
+               \r
+               boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); \r
+\r
+               if(i.stream.size() > 0 && !i.itemState.ooDeadband) {\r
+\r
+                       /// Extend existing band\r
+                       // Extend endTime value\r
+                       if (!i.hasEndTime) {\r
+                               i.current.setTime( time.getBinding(), time.getValue() );\r
+                       } else {\r
+                               i.current.setEndTime( time.getBinding(), time.getValue() );\r
+                       }\r
+\r
+                       if (isValidValue) {\r
+                               // Set last value\r
+                               if (i.current.hasLastValue()) {\r
+                                       i.current.setLastValue( value.getBinding(), value.getValue() );\r
+                               }\r
+\r
+                               // Add sum\r
+                               if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {\r
+                                       double duration = (currentTime - i.itemState.firstTime);\r
+                                       if(duration < 1e-9) {\r
+                                               i.current.setAvg( Bindings.DOUBLE, 0.0 );                                               \r
+                                       } else {\r
+                                               i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);                                         \r
+                                       }\r
+                               }\r
+\r
+                               // Update min-max\r
+                               if (i.current.hasMin() && i.isNumeric && !isNanValue) {\r
+                                       Binding minBinding = i.current.getMinBinding();\r
+                                       Object prevMinValue = i.current.getMin();\r
+                                       Object currentValueWithMinBinding = value.getValue( minBinding );\r
+                                       int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );\r
+                                       if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );\r
+                               }                               \r
+                               if (i.current.hasMax() && i.isNumeric && !isNanValue) {\r
+                                       Binding maxBinding = i.current.getMaxBinding();\r
+                                       Object prevMaxValue = i.current.getMax();\r
+                                       Object currentValueWithMaxBinding = value.getValue( maxBinding );\r
+                                       int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );\r
+                                       if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );\r
+                               }\r
+\r
+                               // Update median\r
+                               if (i.current.hasMedian() && i.isNumeric && !isNanValue) {\r
+                                       Double median = i.itemState.median.getMedian();\r
+                                       i.current.setMedian( Bindings.DOUBLE, median);\r
+                               }                               \r
+                       } else {\r
+                               i.current.setValueNull();\r
+                       }\r
+\r
+                       // Add count\r
+                       if (i.current.hasCount()) {\r
+                               i.current.setCount( i.itemState.count );\r
+                       }\r
+\r
+                       // Write entry to file stream\r
+                       //              if (i.current.getTimeDouble()<0) \r
+                       //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));\r
+                       i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             \r
+               }\r
+               \r
+               if (condition) {\r
+                       \r
+                       // Start a new value band\r
+                       i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );\r
+                       i.itemState.firstTime = currentTime;\r
+                       i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );\r
+                       i.itemState.currentTime = currentTime;\r
+                       i.itemState.isNaN = isNanValue;\r
+                       i.itemState.isValid = isValidValue;\r
+                       i.itemState.sum = 0;\r
+                       i.itemState.count = 1;\r
+                       i.itemState.ooDeadband = false;\r
+                       if (i.itemState.median != null) {\r
+                               i.itemState.median.clear();\r
+                               if (!Double.isNaN(prevValueDouble) ) {\r
+                                       double deltaTime = currentTime - prevTime;\r
+                                       i.itemState.median.add( deltaTime, prevValueDouble );\r
+                               }\r
+                       }\r
+                       \r
+                       // New Sample\r
+                       i.current.reset();\r
+                       \r
+                       // Start new band with current value & time\r
+                       i.current.setTime( time.getBinding(), time.getValue() );                \r
+\r
+                       // Is valid value\r
+                       if (i.current.hasEndTime()) {\r
+                               i.current.setEndTime( time.getBinding(), time.getValue() );\r
+                       }\r
+                       \r
+                       if (isValidValue) {\r
+                               i.current.setValue( value.getBinding(), value.getValue() );\r
+                               i.current.setMax( value.getBinding(), value.getValue() );\r
+                               i.current.setMin( value.getBinding(), value.getValue() );\r
+                               i.current.setLastValue( value.getBinding(), value.getValue() );\r
+                               i.current.setAvg( value.getBinding(), value.getValue() );\r
+                               i.current.setMedian( value.getBinding(), value.getValue() );                                    \r
+                       } else {\r
+                               i.current.setValueNull();\r
+                       }\r
+                       \r
+                               \r
+                       // Sample Count=1 for new bands (time, value) "added"\r
+                       if (i.current.hasCount()) {\r
+                               i.current.setCount( 1 );\r
+                       }\r
+                       \r
+                       // Add the band to the file stream\r
+                       i.stream.addNoflush( i.binding, i.current.getSample() );\r
+                       \r
+               }               \r
+               \r
+       }\r
+\r
+       @Override\r
+       public void flush() /*throws HistoryException*/ {\r
+               for (ActiveStream i : items) {\r
+                       try {                                           \r
+                               i.stream.flush();\r
+                       } catch (AccessorException e) {\r
+                               logger.log(Level.FINE, "File history flush failed.", e);                        \r
+//                             throw new HistoryException(e);\r
+                       }\r
+               }\r
+       }\r
+\r
+       public void flush(Set<String> ids) /*throws HistoryException*/ {\r
+               for (ActiveStream i : items) {\r
+                       if (!ids.contains(i.itemState.id)) continue;\r
+                       try {                                           \r
+                               i.stream.flush();\r
+                       } catch (AccessorException e) {\r
+                               logger.log(Level.FINE, "File history flush failed.", e);                        \r
+//                             throw new HistoryException(e);\r
+                       }\r
+               }\r
+       }\r
+       \r
+       \r
+       @Override\r
+       public synchronized void close()\r
+       {\r
+               // Write items back to history\r
+               List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );\r
+               for (CollectorState.Item itemState : state.itemStates.values()) {\r
+                       try {\r
+                               if (!history.exists(itemState.id)) continue;\r
+                               Bean bean = history.getItem(itemState.id);\r
+                               bean.readAvailableFields(itemState);\r
+                               beans.add( setItemState(bean, itemState) );\r
+                       } catch (BindingException e) {\r
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+                       } catch (BindingConstructionException e) {\r
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+                       } catch (HistoryException e) {\r
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+                       }\r
+               }\r
+               try {\r
+                       history.modify(beans.toArray( new Bean[beans.size()] ));\r
+               } catch (HistoryException e) {\r
+                       logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
+               }\r
+               state.itemStates.clear();\r
+               \r
+               // Close streams\r
+               for (ActiveStream item : items) {\r
+                       item.close();\r
+               }\r
+               items.clear();\r
+       }\r
+\r
+       public class ActiveStream {\r
+\r
+               /** Sample Stream */\r
+               public StreamAccessor stream;\r
+               \r
+               /** Sample binding */\r
+               public RecordBinding binding;\r
+               \r
+               /** Value binding (Generic) */\r
+               public Binding valueBinding;\r
+               \r
+               /** Set true if this item has numeric value type */\r
+               public boolean isNumeric;\r
+               \r
+               /** Set true if this item has end time */\r
+               public boolean hasEndTime;\r
+               \r
+               /** Item entry in subscription state */\r
+               public CollectorState.Item itemState;\r
+                               \r
+               /** Interface to current last sample (in the stream) */\r
+               public ValueBand current;\r
+               \r
+               /** Adapter that converts value to double value, null if not numeric */\r
+               public Adapter valueToDouble, doubleToValue;\r
+               \r
+               void close() {\r
+                       try {\r
+                               if (stream != null) { \r
+                                       stream.close();\r
+                               }\r
+                               stream = null;\r
+                       } catch (AccessorException e) {\r
+                               logger.log(Level.FINE, "Error closing stream", e);\r
+                       }\r
+               }\r
+\r
+       }\r
+\r
+       @Override\r
+       public StreamAccessor openStream(String itemId, String mode) \r
+       throws HistoryException \r
+       {\r
+               return history.openStream(itemId, mode);\r
+       }\r
+\r
+       @Override\r
+       public HistoryManager getHistory() {\r
+               return history;\r
+       }\r
+\r
+}\r