]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java
Fixed history data collection minmax stream updates and plot rendering
[simantics/platform.git] / bundles / org.simantics.history / src / org / simantics / history / impl / CollectorImpl.java
index c001adc52a97cb4fe00763d9b5b1608f0b45d6a5..87062a9dabecae759aa0f5dffd442a496af156e5 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2011 Association for Decentralized Information Management in\r
- * Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.history.impl;\r
-\r
-import gnu.trove.set.hash.THashSet;\r
-\r
-import java.io.IOException;\r
-import java.util.ArrayList;\r
-import java.util.List;\r
-import java.util.Set;\r
-import java.util.UUID;\r
-import java.util.concurrent.CopyOnWriteArrayList;\r
-import java.util.logging.Level;\r
-import java.util.logging.Logger;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.accessor.StreamAccessor;\r
-import org.simantics.databoard.accessor.error.AccessorException;\r
-import org.simantics.databoard.adapter.AdaptException;\r
-import org.simantics.databoard.adapter.Adapter;\r
-import org.simantics.databoard.adapter.AdapterConstructionException;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.DoubleBinding;\r
-import org.simantics.databoard.binding.FloatBinding;\r
-import org.simantics.databoard.binding.NumberBinding;\r
-import org.simantics.databoard.binding.RecordBinding;\r
-import org.simantics.databoard.binding.error.BindingConstructionException;\r
-import org.simantics.databoard.binding.error.BindingException;\r
-import org.simantics.databoard.binding.error.RuntimeBindingException;\r
-import org.simantics.databoard.binding.impl.DoubleBindingDefault;\r
-import org.simantics.databoard.binding.mutable.MutableVariant;\r
-import org.simantics.databoard.binding.mutable.Variant;\r
-import org.simantics.databoard.binding.reflection.VoidBinding;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.type.Component;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.databoard.type.NumberType;\r
-import org.simantics.databoard.type.RecordType;\r
-import org.simantics.databoard.util.Bean;\r
-import org.simantics.history.Collector;\r
-import org.simantics.history.HistoryException;\r
-import org.simantics.history.HistoryManager;\r
-import org.simantics.history.impl.CollectorState.Item;\r
-import org.simantics.history.impl.CollectorState.VariableState;\r
-import org.simantics.history.util.ValueBand;\r
-import org.simantics.history.util.WeightedMedian;\r
-import org.simantics.history.util.subscription.SubscriptionItem;\r
-\r
-/**\r
- * Collector is used to record data to streams in history manager.\r
- * \r
- * For every item there is collector item state {@link CollectorState.Item}, \r
- * that contains data required in writing process. This state is read and \r
- * written to history manager item (bean) as meta-data. \r
- * \r
- * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. \r
- * \r
- * @author toni.kalajainen\r
- *\r
- */\r
-public class CollectorImpl implements Collector {\r
-\r
-       /** Logger */\r
-       static Logger logger = Logger.getLogger(CollectorImpl.class.getName());\r
-       \r
-       public static final int MEDIAN_LIMIT = 500;\r
-       \r
-       public static final double EPSILON = 1e-2;\r
-       \r
-       /** Collector state */\r
-       CollectorState state;\r
-\r
-       /** Open streams, corresponds to subscription items */\r
-       CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();\r
-       \r
-       /** Associated history state */ \r
-       HistoryManager history;\r
-       \r
-       /** True when all items are open */\r
-       boolean itemsOpen = true;\r
-       \r
-       public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;\r
-       \r
-       public FlushPolicy getFlushPolicy() {\r
-               return flushPolicy;\r
-       }\r
-\r
-       public void setFlushPolicy(FlushPolicy flushPolicy) {\r
-               this.flushPolicy = flushPolicy;\r
-       }\r
-\r
-       /**\r
-        * Create a new collector that is associated with a history manager.\r
-        * \r
-        * @param history\r
-        * @throws HistoryException\r
-        */\r
-       public CollectorImpl(HistoryManager history) {\r
-               this.history = history;\r
-               this.state = new CollectorState();\r
-               this.state.init();\r
-               this.state.id = UUID.randomUUID().toString();\r
-       }\r
-\r
-       /**\r
-        * Create a new collector that is associated with a history manager.\r
-        * \r
-        * @param history\r
-        * @param previousState\r
-        * @throws HistoryException\r
-        */\r
-       public CollectorImpl(HistoryManager history, CollectorState previousState) {\r
-               this.history = history;\r
-               this.state = (CollectorState) previousState.clone();\r
-       }\r
-       \r
-       @Override\r
-       public synchronized void addItem(Bean item) throws HistoryException {\r
-               try {\r
-                       String id = (String) item.getField("id");\r
-                       \r
-                       // Item State\r
-                       CollectorState.Item itemState = null;\r
-                       if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  \r
-                       if ( itemState == null ) itemState = state.itemStates.get( id );\r
-                       if ( itemState == null ) {\r
-                               // Create new item\r
-                               itemState = new CollectorState.Item();\r
-                               itemState.init();\r
-                               itemState.readAvailableFields( item );\r
-                               // Guess values from the existing data  \r
-                               if (history.exists(id)) {\r
-                                       readItemState(history, item, itemState);\r
-                               }\r
-                       }\r
-                       \r
-                       addItem(id, itemState);                 \r
-               } catch (BindingException e) {\r
-                       throw new HistoryException( e ); \r
-               }\r
-       }\r
-       \r
-       public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {\r
-               try {                   \r
-                       // Item State\r
-                       state.itemStates.put(id, itemState);\r
-                       itemsOpen = false;\r
-                       \r
-                       // Time                 \r
-                       if (state.time.type() instanceof NumberType) {                          \r
-                               double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);\r
-                               if ( !Double.isNaN(itemState.currentTime) ) {\r
-                                       if ( itemState.currentTime > time ) {\r
-                                               state.time.setValue(Bindings.DOUBLE, itemState.currentTime);\r
-                                       }\r
-                               }                               \r
-                       }\r
-               } catch (AdaptException e) {\r
-                       throw new HistoryException( e ); \r
-               }\r
-       }\r
-       \r
-       public synchronized void addItems(Bean...items) throws HistoryException {\r
-               for (Bean item : items) addItem(item);\r
-       }\r
-       \r
-       @Override\r
-       public synchronized void removeItem(String id) throws HistoryException {\r
-               CollectorState.Item itemState = state.itemStates.remove( id );\r
-               if (itemState!=null && history.exists(id)) {\r
-                       Bean bean = history.getItem(id);\r
-                       bean.readAvailableFields(itemState);\r
-                       try {\r
-                               bean = setItemState(bean, itemState);\r
-                               history.modify(bean);\r
-                       } catch (BindingException e) {\r
-                               // Unexpected\r
-                               logger.log(Level.FINE, "Failed to update bean", e);                     \r
-                       } catch (BindingConstructionException e) {\r
-                               // Unexpected\r
-                               logger.log(Level.FINE, "Failed to update bean", e);\r
-                       } catch (HistoryException e) {\r
-                               logger.log(Level.FINE, "Failed to update history item meta-data", e);\r
-                       }\r
-               }\r
-               \r
-               ActiveStream as = getStream(id);\r
-               if ( as != null ) {\r
-                       items.remove( as );\r
-                       as.close();\r
-               }\r
-       }\r
-       \r
-       ActiveStream getStream(String id) {\r
-               for (ActiveStream as : items) {\r
-                       if (as.itemState.id.equals(id)) return as;\r
-               }\r
-               return null;\r
-       }\r
-       \r
-       /**\r
-        * Set (or Add) item's configuration. \r
-        * \r
-        * @param item item\r
-        * @throws HistoryException\r
-        */\r
-       public synchronized void setItem(Bean item) throws HistoryException {\r
-               try {\r
-                       String id = (String) item.getField("id");\r
-                       \r
-                       CollectorState.Item itemState = state.itemStates.get( id );\r
-                       if ( itemState==null ) {\r
-                               addItem(item);\r
-                               return;\r
-                       }\r
-                       itemState.readAvailableFields(item);\r
-               } catch (BindingException e) {\r
-                       throw new HistoryException( e ); \r
-               }\r
-       }\r
-       \r
-       \r
-       /**\r
-        * Set item state to bean. If the bean does not have field collectorState,\r
-        * a new bean class is created, and field added.\r
-        *  \r
-        * @param config\r
-        * @param state\r
-        * @return same or new bean\r
-        * @throws BindingException \r
-        * @throws BindingConstructionException \r
-        */\r
-       public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException\r
-       {\r
-               Binding binding = config.getFieldBinding("collectorState");\r
-               if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {\r
-                       RecordType rt = config.getBinding().type();\r
-                       RecordType newType = new RecordType();\r
-                       for (int i=0; i<rt.getComponentCount(); i++) {\r
-                               Component c = rt.getComponent(i);\r
-                               if ( c.name.equals("collectorState") ) continue;\r
-                               newType.addComponent(c.name, c.type);\r
-                       }\r
-                       newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());\r
-                       Binding newBinding = Bindings.getBeanBinding(newType);\r
-                       Bean newConfig = (Bean) newBinding.createDefault();\r
-                       newConfig.readAvailableFields( config );\r
-                       config = newConfig;\r
-               }\r
-               config.setField("collectorState", CollectorState.BINDING_ITEM, state);\r
-               return config;\r
-       }\r
-       \r
-       public SubscriptionItem[] getItems() {\r
-               int c = state.itemStates.size();\r
-               SubscriptionItem[] items = new SubscriptionItem[ c ];\r
-               int ix = 0;\r
-               Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );\r
-               for (CollectorState.Item item : state.itemStates.values()) {\r
-                       _HistoryItem hi = new _HistoryItem();\r
-                       hi.init();\r
-                       hi.readAvailableFields(item);\r
-                       //hi.collectorState = (Item) item.clone();\r
-                       \r
-                       // Workaround (clone doesn't work for WeightedMedian)\r
-                       try {\r
-                               byte[] data = s.serialize( item );                              \r
-                               hi.collectorState = (Item) s.deserialize(data);\r
-                       } catch (IOException e) {\r
-                       }\r
-                                       \r
-                       items[ix++] = hi;\r
-               }\r
-               return items;\r
-       }\r
-       \r
-       public static class _HistoryItem extends SubscriptionItem {\r
-               public CollectorState.Item collectorState;\r
-       }\r
-       \r
-       /**\r
-        * Creates and loads active item. TimeSeries stream file is opened.\r
-        * \r
-        * @param itemState\r
-        * @return new active item\r
-        * @throws HistoryException\r
-        */\r
-       ActiveStream openStream(CollectorState.Item itemState) throws HistoryException \r
-       {\r
-               // New Active Item\r
-               ActiveStream x = new ActiveStream();\r
-               x.itemState = itemState;\r
-\r
-               // Stream\r
-               {\r
-                       if ( !history.exists(x.itemState.id) ) {\r
-                               throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");\r
-                       }\r
-\r
-                       x.stream = history.openStream(x.itemState.id, "rw");\r
-               }\r
-\r
-               // Sample Binding\r
-               Datatype sampleType = x.stream.type().componentType;\r
-               itemState.format = sampleType;\r
-               try { \r
-                       x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );\r
-               } catch ( RuntimeException e ) {\r
-                       System.err.println("Could not create bean binding for type "+sampleType);\r
-                       x.binding = (RecordBinding) Bindings.getBinding( sampleType );          \r
-               }\r
-\r
-               // Value binding\r
-               x.valueBinding = x.binding.getComponentBinding("value");\r
-               x.isNumeric = x.valueBinding instanceof NumberBinding;\r
-               x.hasEndTime = x.binding.getComponentIndex("endTime")>0;\r
-\r
-               // Value adapter\r
-               if ( x.isNumeric ) \r
-               try {\r
-                       x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);\r
-                       x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);\r
-               } catch (AdapterConstructionException e1) {\r
-                       x.valueToDouble = null;\r
-               }\r
-\r
-               // Create valueband, and read last entry into memory\r
-               try {\r
-                       Object currentEntry = x.binding.createDefault();\r
-                       if (x.stream.size() >= 1) {\r
-                               x.stream.get(x.stream.size() - 1, x.binding, currentEntry);\r
-                       }\r
-                       x.current = new ValueBand(x.binding);\r
-                       x.current.setSample(currentEntry);\r
-               } catch (AccessorException e) {\r
-                       throw new HistoryException(\r
-                                       "Could not read sample from stream file, " + e, e);\r
-               } catch (BindingException e) {\r
-                       throw new HistoryException(\r
-                                       "Could not read sample from stream file, " + e, e);\r
-               }\r
-\r
-               // Item State\r
-               x.itemState = itemState;\r
-\r
-               // Median\r
-               if (x.isNumeric && x.current.hasMedian()\r
-                               && x.itemState.median == null) {\r
-                       x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );\r
-               }\r
-\r
-               return x;\r
-       }\r
-                       \r
-       /**\r
-        * Read itemState from history. Item must exist. \r
-        * \r
-        * @param si subscription item\r
-        * @return state item\r
-        * @throws RuntimeBindingException\r
-        * @throws HistoryException\r
-        */\r
-       static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {\r
-               // Create item state\r
-               item.readAvailableFields(si);\r
-               // Read state from items\r
-               StreamAccessor sa = history.openStream(item.id, "r");\r
-               try {\r
-                       Datatype sampleType = sa.type().componentType;\r
-                       \r
-                       Binding sampleBinding = Bindings.getBinding(sampleType);\r
-                       if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");\r
-                       Object sample = sampleBinding.createDefaultUnchecked();\r
-                       ValueBand v = new ValueBand(sampleBinding, sample);\r
-                       item.count = sa.size();\r
-                       if ( item.count>0 ) {\r
-                               sa.get(0, sampleBinding, sample);\r
-                               item.firstTime = v.getTimeDouble();\r
-                               Binding b = v.getValueBinding();\r
-                               Object value = v.getValue(b);\r
-                               item.firstValue.setValue(b, value);\r
-                                       \r
-                               sa.get(item.count-1, sampleBinding, sample);\r
-                               item.currentTime = v.getTimeDouble();\r
-                               b = v.getValueBinding();\r
-                               value = v.getValue(b);\r
-                               item.currentValue.setValue(b, value);\r
-                                       \r
-                               if ( item.currentValue.getBinding() instanceof DoubleBinding ) {\r
-                                       DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();\r
-                                       value = item.currentValue.getValue(db);\r
-                                       item.isNaN = Double.isNaN( (Double) value );\r
-                               }\r
-                       }\r
-                               \r
-               } catch (AccessorException e) {\r
-                       throw new HistoryException( e );\r
-               } catch (AdaptException e) {\r
-                       throw new HistoryException( e );\r
-               } finally {\r
-                       try { sa.close(); } catch (AccessorException e) {}\r
-               }\r
-       }\r
-\r
-       @Override\r
-       public CollectorState getState() {\r
-               return (CollectorState) state.clone();\r
-       }\r
-\r
-       @Override\r
-       public void setState(Bean newState) {\r
-               if (newState == null)\r
-                       return;\r
-               if (state == null) {\r
-                       state = new CollectorState();\r
-                       state.init();\r
-               }\r
-               state.readAvailableFields(newState);\r
-\r
-               // Make sure that openAllItems ensures that all streams are opened properly.\r
-               // This is because setItem will not invoke addItem if collectorState exists\r
-               // for an item and therefore itemsOpen might not otherwise be set to false.\r
-               itemsOpen = false;\r
-       }\r
-\r
-       @Override\r
-       public Object getTime(Binding binding) throws HistoryException {\r
-               MutableVariant v = state.time;\r
-               if (v==null) return null;\r
-               try {\r
-                       return v.getValue(binding);\r
-               } catch (AdaptException e) {\r
-                       throw new HistoryException(e);\r
-               }\r
-       }\r
-\r
-       @Override\r
-       public Object getValue(String id, Binding binding) throws HistoryException {\r
-               VariableState vs = state.values.get(id);\r
-               if (vs==null) return null;\r
-               \r
-               if (!vs.isValid) return null;\r
-               \r
-               try {\r
-                       return vs.value.getValue(binding);\r
-               } catch (AdaptException e) {\r
-                       throw new HistoryException(e);\r
-               }\r
-       }\r
-       \r
-       void openAllItems() throws HistoryException {\r
-               if (itemsOpen) return;\r
-\r
-               // Assert all items are open\r
-               // 1. Collect all items that still need to be opened.\r
-               Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );\r
-               for (ActiveStream as : items)\r
-                       itemsToOpen.remove(as.itemState.id);\r
-\r
-               // 2. Open missing streams\r
-               List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());\r
-               for (String itemId : itemsToOpen) {\r
-                       CollectorState.Item item = state.itemStates.get( itemId );\r
-                       ActiveStream as = openStream( item );\r
-                       opened.add(as);\r
-               }\r
-\r
-               // 3. Update list of open streams\r
-               items.addAll(opened);\r
-               itemsOpen = true;\r
-       }\r
-\r
-       private void updateDt(NumberBinding binding, Object time) {\r
-               try {\r
-                       Double current = (Double)getTime(Bindings.DOUBLE);\r
-                       if(current != null) {\r
-                               double t = binding.getValue(time).doubleValue();\r
-                               state.dT = t-current;\r
-                               if(state.dT > 0) return;\r
-                       }\r
-               } catch (Throwable t) {\r
-               }\r
-               state.dT = 1.0;\r
-       }\r
-       \r
-       @Override\r
-       public void beginStep(NumberBinding binding, Object time) throws HistoryException {\r
-               openAllItems();\r
-               updateDt(binding, time);\r
-               state.time.setValue(binding, time);\r
-       }\r
-\r
-       @Override\r
-       public void setValue(String id, Binding binding, Object value) throws HistoryException {\r
-               VariableState vs = state.values.get( id );\r
-               if (vs == null) {\r
-                       vs = new VariableState();\r
-                       vs.value = new MutableVariant(binding, value);\r
-                       state.values.put(id, vs);\r
-               }\r
-               \r
-               if (value==null) {\r
-                       vs.value.setValue(VoidBinding.VOID_BINDING, null);\r
-                       vs.isNan = true;\r
-                       vs.isValid = false;\r
-                       return;\r
-               } \r
-                                       \r
-               vs.value.setValue(binding, value);\r
-               vs.isValid = true;\r
-                       \r
-               if (binding instanceof DoubleBinding) {\r
-                       DoubleBinding db = (DoubleBinding) binding;\r
-                       try {\r
-                               double _v = db.getValue_(value);\r
-                               vs.isNan = Double.isNaN( _v );\r
-                       } catch (BindingException e) {\r
-                               vs.isNan = true;\r
-                               vs.isValid = false;\r
-                       }\r
-               } else if (binding instanceof FloatBinding) {\r
-                       FloatBinding db = (FloatBinding) binding;\r
-                       try {\r
-                               float _v = db.getValue_(value);\r
-                               vs.isNan = Float.isNaN( _v );\r
-                       } catch (BindingException e) {\r
-                               vs.isNan = true;\r
-                               vs.isValid = false;\r
-                       }\r
-               } else {\r
-                       vs.isNan = false;\r
-               }\r
-       }\r
-\r
-       @Override\r
-       public boolean getValue(String id, MutableVariant result) {\r
-               VariableState vs = state.values.get(id);\r
-               if (vs==null) return false;\r
-               \r
-               if (!vs.isValid) return false;\r
-               \r
-               result.setValue(vs.value.getBinding(), vs.value.getValue());\r
-               return true;\r
-       }\r
-\r
-       @Override\r
-       public void endStep() throws HistoryException {\r
-               // Write values to streams\r
-               for (ActiveStream i : items)\r
-               {\r
-                       try {\r
-                               putValueToStream(i);                            \r
-                               if (flushPolicy == FlushPolicy.FlushOnEveryStep) {\r
-                                       i.stream.flush();\r
-                               }\r
-                       } catch (AdaptException e) {\r
-                               throw new HistoryException( e );\r
-//                             FileAccessor fa = (FileAccessor) i.stream;\r
-//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
-                       } catch (AccessorException e) {\r
-                               throw new HistoryException( e );\r
-//                             FileAccessor fa = (FileAccessor) i.stream;\r
-//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
-                       } catch (BindingException e) {\r
-                               throw new HistoryException( e );\r
-//                             FileAccessor fa = (FileAccessor) i.stream;\r
-//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
-                       }\r
-               }\r
-       }\r
-               \r
-       private int sampleNum(double time, double interval) {\r
-               // 1.0 is needed to cover the case of 0.0\r
-               return (int)(1.0 + (time+EPSILON*state.dT)/interval);\r
-       }\r
-\r
-       private boolean isExactInterval(double time, double interval) {\r
-               return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);\r
-       }\r
-\r
-       /**\r
-        * This method puts a time,value to a stream.\r
-        * \r
-        * The stream is "packed" when the new value is equal to previous, or the\r
-        * value is within deadband (in comparison to the first value), or if\r
-        * the interval zone is not exceeded.\r
-        * \r
-        * The stream can be packed two different ways, depending on the sample\r
-        * format. The sample format can be (a) single value or (b) a time range of\r
-        * multiple values, Ranged Sample. \r
-        * \r
-        * Single value is packed by removing middle values, in other words, \r
-        * moving the timecode of the last sample.\r
-        * \r
-        * Ranged sample is packed by extending the endTime code of the last sample.\r
-        * Other fields, such as avg, sum, count, lastValue are also updated.\r
-        * \r
-        * @param i\r
-        * @throws AdaptException\r
-        * @throws AccessorException \r
-        * @throws BindingException \r
-        * @throws HistoryException \r
-        */\r
-       private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException\r
-       {\r
-               // Current Time\r
-               Variant time = state.time;\r
-               double currentTime = Double.NaN;\r
-               if (time != null) {\r
-                       Double x = (Double) time.getValue(Bindings.DOUBLE);\r
-                       currentTime = x == null ? Double.NaN : x;\r
-               }\r
-\r
-               // Is first value\r
-               boolean isFirstValue = i.stream.size() == 0;\r
-               // Is this item disabled\r
-               boolean isDisabled = !i.itemState.enabled; \r
-               // Discontinuety, first value after being disabled\r
-               boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;\r
-               \r
-               // Update currently disabled status\r
-               i.itemState.isDisabled = isDisabled;\r
-\r
-               // Update time codes\r
-               if (isDisabled) {\r
-                       i.itemState.count++;\r
-                       i.itemState.lastDisabledTime = currentTime;\r
-                       if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;\r
-                       return;\r
-               }\r
-               \r
-               // Is nan value, for doubles and floats\r
-               boolean isNanValue;\r
-               // Is valid value\r
-               boolean isValidValue;\r
-               // Value as variant\r
-               MutableVariant value;\r
-               // Get current value as double\r
-               double currentValueDouble;\r
-               \r
-               VariableState vs = state.values.get( i.itemState.variableId );                  \r
-               if ( vs!=null ) {\r
-                       isNanValue = vs.isNan;\r
-                       isValidValue = vs.isValid;\r
-                       value = vs.value;\r
-                       if (i.isNumeric && isValidValue && !isNanValue) {\r
-                               currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );\r
-                               if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {\r
-                                       currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;\r
-                                       Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);\r
-                                       value = new MutableVariant( i.valueBinding, x );\r
-                               }\r
-                       } else {\r
-                               currentValueDouble = Double.NaN;\r
-                       }\r
-                       \r
-               } else {\r
-                       isNanValue = true;\r
-                       isValidValue = false;\r
-                       value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );\r
-                       currentValueDouble = Double.NaN;\r
-               }\r
-                               \r
-               // Prev Value\r
-               double prevValueDouble = Double.NaN;\r
-               Variant pv = i.itemState.currentValue;\r
-               if (pv!=null && pv.getBinding() instanceof NumberBinding ) {\r
-                       prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );\r
-               } else {\r
-                       prevValueDouble = Double.NaN;\r
-               }\r
-               \r
-               // The time of the first sample of region\r
-               double firstTime = i.itemState.firstTime;\r
-               // Prev time\r
-               double prevTime = i.itemState.currentTime;\r
-               \r
-               boolean restep = prevTime == currentTime;\r
-               \r
-               // There was change in NaN\r
-               boolean isNaNChange = i.itemState.isNaN != isNanValue;\r
-               \r
-               // There was change in validity of the value\r
-               boolean isValidChange = i.itemState.isValid != isValidValue;\r
-               \r
-               //boolean wasValidValue = !i.itemState.isValid && isValidValue;\r
-               \r
-               boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );\r
-               \r
-               boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; \r
-               \r
-               boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&\r
-                               i.itemState.deadband==Double.MAX_VALUE &&\r
-                               i.binding.getComponentIndex("min")>=0 &&\r
-                               i.binding.getComponentIndex("max")>=0;\r
-\r
-               // Deadband\r
-               if (isValidValue && i.isNumeric && i.itemState.count>=1) {\r
-                       \r
-                       if (i.itemState.isValid) {\r
-                               double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );\r
-                               double diff = Math.abs( currentValueDouble - firstValueDouble );\r
-                               if( hasDeadband )\r
-                                       i.itemState.ooDeadband |= diff >= i.itemState.deadband;\r
-                               else\r
-                                       i.itemState.ooDeadband |= diff != 0.0;\r
-                       } else {\r
-                               i.itemState.ooDeadband = true;\r
-                       }\r
-                       \r
-                       // NaN change is deadband change\r
-                       i.itemState.ooDeadband |= isNaNChange;\r
-               } else {\r
-                       // The dead band cannot be have broken in first sample\r
-                       i.itemState.ooDeadband = false;\r
-               }\r
-\r
-\r
-               // Interval\r
-               boolean ooInterval = true;\r
-               if ( hasInterval ) {\r
-                       \r
-//                     System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);\r
-\r
-                       // First band is a special case\r
-                       if(isFirstValue) {\r
-                               // If this is the first sample store the current time as first time\r
-                               if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;\r
-                               int sn1 = sampleNum(firstTime,i.itemState.interval);\r
-                               int sn2 = sampleNum(currentTime,i.itemState.interval);\r
-                               // Wait until next sample if this position is not already exactly at sample border\r
-                               // Always start minmax immediately\r
-                               if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;\r
-                       } else {\r
-                               int sn1 = sampleNum(prevTime,i.itemState.interval);\r
-                               int sn2 = sampleNum(currentTime,i.itemState.interval);\r
-//                             System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);\r
-                               ooInterval = sn2 > sn1;\r
-                       }\r
-\r
-                       \r
-//                     \r
-//                     double n = (currentTime+EPSILON) / i.itemState.interval;\r
-//                     double n2 = n-Math.floor(n);\r
-//                     \r
-//                     \r
-//                     double deltaTime = currentTime - firstTime;\r
-//                     ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;\r
-               }\r
-               \r
-               // Should new sample be created, or old sample extended / forwarded in time\r
-               boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;\r
-               \r
-               // Update item min-max, avg, median\r
-               if ( isValidValue && i.isNumeric ) {\r
-                       if (i.current.hasAvg() && i.isNumeric) {\r
-                               if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;\r
-                               if (isValidValue && i.isNumeric) {\r
-                                       double timeDelta = currentTime - i.itemState.currentTime;                                                       \r
-                                       i.itemState.sum += timeDelta * currentValueDouble;\r
-                               }\r
-                       }\r
-\r
-                       if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {\r
-                               double deltaTime = currentTime - prevTime;\r
-                               i.itemState.median.add( deltaTime, prevValueDouble );\r
-                       }\r
-               }\r
-\r
-               // Increase the counter that tracks the number of samples acquired into the band\r
-               if (!restep) i.itemState.count++;\r
-\r
-               // Remember values \r
-               i.itemState.currentTime = currentTime;\r
-               i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      \r
-               i.itemState.isNaN = isNanValue;\r
-               i.itemState.isValid = isValidValue;\r
-               \r
-               // Put discontinuation marker\r
-               if ( (wasDisabled && i.current.supportsNullValue())) {\r
-                       // Extend marker\r
-                       if (i.current.isNullValue()) {\r
-                               if (i.current.hasEndTime()) {\r
-                                       i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );\r
-                               }\r
-                               \r
-                               if (i.current.hasCount()) {\r
-                                       i.current.setCount( i.current.getCount()+1 );\r
-                               }\r
-                               \r
-                               // Add the band to the file stream\r
-                               i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );\r
-                       }\r
-                       \r
-                       // Add marker\r
-                       i.current.reset();\r
-                       i.current.setValueNull();\r
-                       i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);\r
-                       i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);\r
-                       i.stream.addNoflush( i.binding, i.current.getSample() );\r
-               }\r
-               \r
-               boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); \r
-\r
-               if(i.stream.size() > 0 && !i.itemState.ooDeadband) {\r
-\r
-                       /// Extend existing band\r
-                       // Extend endTime value\r
-                       if (!i.hasEndTime) {\r
-                               i.current.setTime( time.getBinding(), time.getValue() );\r
-                       } else {\r
-                               i.current.setEndTime( time.getBinding(), time.getValue() );\r
-                       }\r
-\r
-                       if (isValidValue) {\r
-                               // Set last value\r
-                               if (i.current.hasLastValue()) {\r
-                                       i.current.setLastValue( value.getBinding(), value.getValue() );\r
-                               }\r
-\r
-                               // Add sum\r
-                               if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {\r
-                                       double duration = (currentTime - i.itemState.firstTime);\r
-                                       if(duration < 1e-9) {\r
-                                               i.current.setAvg( Bindings.DOUBLE, 0.0 );                                               \r
-                                       } else {\r
-                                               i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);                                         \r
-                                       }\r
-                               }\r
-\r
-                               // Update min-max\r
-                               if (i.current.hasMin() && i.isNumeric && !isNanValue) {\r
-                                       Binding minBinding = i.current.getMinBinding();\r
-                                       Object prevMinValue = i.current.getMin();\r
-                                       Object currentValueWithMinBinding = value.getValue( minBinding );\r
-                                       int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );\r
-                                       if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );\r
-                               }                               \r
-                               if (i.current.hasMax() && i.isNumeric && !isNanValue) {\r
-                                       Binding maxBinding = i.current.getMaxBinding();\r
-                                       Object prevMaxValue = i.current.getMax();\r
-                                       Object currentValueWithMaxBinding = value.getValue( maxBinding );\r
-                                       int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );\r
-                                       if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );\r
-                               }\r
-\r
-                               // Update median\r
-                               if (i.current.hasMedian() && i.isNumeric && !isNanValue) {\r
-                                       Double median = i.itemState.median.getMedian();\r
-                                       i.current.setMedian( Bindings.DOUBLE, median);\r
-                               }                               \r
-                       } else {\r
-                               i.current.setValueNull();\r
-                       }\r
-\r
-                       // Add count\r
-                       if (i.current.hasCount()) {\r
-                               i.current.setCount( i.itemState.count );\r
-                       }\r
-\r
-                       // Write entry to file stream\r
-                       //              if (i.current.getTimeDouble()<0) \r
-                       //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));\r
-                       i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             \r
-               }\r
-               \r
-               if (condition) {\r
-                       \r
-                       // Start a new value band\r
-                       i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );\r
-                       i.itemState.firstTime = currentTime;\r
-                       i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );\r
-                       i.itemState.currentTime = currentTime;\r
-                       i.itemState.isNaN = isNanValue;\r
-                       i.itemState.isValid = isValidValue;\r
-                       i.itemState.sum = 0;\r
-                       i.itemState.count = 1;\r
-                       i.itemState.ooDeadband = false;\r
-                       if (i.itemState.median != null) {\r
-                               i.itemState.median.clear();\r
-                               if (!Double.isNaN(prevValueDouble) ) {\r
-                                       double deltaTime = currentTime - prevTime;\r
-                                       i.itemState.median.add( deltaTime, prevValueDouble );\r
-                               }\r
-                       }\r
-                       \r
-                       // New Sample\r
-                       i.current.reset();\r
-                       \r
-                       // Start new band with current value & time\r
-                       i.current.setTime( time.getBinding(), time.getValue() );                \r
-\r
-                       // Is valid value\r
-                       if (i.current.hasEndTime()) {\r
-                               i.current.setEndTime( time.getBinding(), time.getValue() );\r
-                       }\r
-                       \r
-                       if (isValidValue) {\r
-                               i.current.setValue( value.getBinding(), value.getValue() );\r
-                               i.current.setMax( value.getBinding(), value.getValue() );\r
-                               i.current.setMin( value.getBinding(), value.getValue() );\r
-                               i.current.setLastValue( value.getBinding(), value.getValue() );\r
-                               i.current.setAvg( value.getBinding(), value.getValue() );\r
-                               i.current.setMedian( value.getBinding(), value.getValue() );                                    \r
-                       } else {\r
-                               i.current.setValueNull();\r
-                       }\r
-                       \r
-                               \r
-                       // Sample Count=1 for new bands (time, value) "added"\r
-                       if (i.current.hasCount()) {\r
-                               i.current.setCount( 1 );\r
-                       }\r
-                       \r
-                       // Add the band to the file stream\r
-                       i.stream.addNoflush( i.binding, i.current.getSample() );\r
-                       \r
-               }               \r
-               \r
-       }\r
-\r
-       @Override\r
-       public void flush() /*throws HistoryException*/ {\r
-               for (ActiveStream i : items) {\r
-                       try {                                           \r
-                               i.stream.flush();\r
-                       } catch (AccessorException e) {\r
-                               logger.log(Level.FINE, "File history flush failed.", e);                        \r
-//                             throw new HistoryException(e);\r
-                       }\r
-               }\r
-       }\r
-\r
-       public void flush(Set<String> ids) /*throws HistoryException*/ {\r
-               for (ActiveStream i : items) {\r
-                       if (!ids.contains(i.itemState.id)) continue;\r
-                       try {                                           \r
-                               i.stream.flush();\r
-                       } catch (AccessorException e) {\r
-                               logger.log(Level.FINE, "File history flush failed.", e);                        \r
-//                             throw new HistoryException(e);\r
-                       }\r
-               }\r
-       }\r
-       \r
-       \r
-       @Override\r
-       public synchronized void close()\r
-       {\r
-               // Write items back to history\r
-               List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );\r
-               for (CollectorState.Item itemState : state.itemStates.values()) {\r
-                       try {\r
-                               if (!history.exists(itemState.id)) continue;\r
-                               Bean bean = history.getItem(itemState.id);\r
-                               bean.readAvailableFields(itemState);\r
-                               beans.add( setItemState(bean, itemState) );\r
-                       } catch (BindingException e) {\r
-                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
-                       } catch (BindingConstructionException e) {\r
-                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
-                       } catch (HistoryException e) {\r
-                               logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
-                       }\r
-               }\r
-               try {\r
-                       history.modify(beans.toArray( new Bean[beans.size()] ));\r
-               } catch (HistoryException e) {\r
-                       logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
-               }\r
-               state.itemStates.clear();\r
-               \r
-               // Close streams\r
-               for (ActiveStream item : items) {\r
-                       item.close();\r
-               }\r
-               items.clear();\r
-       }\r
-\r
-       public class ActiveStream {\r
-\r
-               /** Sample Stream */\r
-               public StreamAccessor stream;\r
-               \r
-               /** Sample binding */\r
-               public RecordBinding binding;\r
-               \r
-               /** Value binding (Generic) */\r
-               public Binding valueBinding;\r
-               \r
-               /** Set true if this item has numeric value type */\r
-               public boolean isNumeric;\r
-               \r
-               /** Set true if this item has end time */\r
-               public boolean hasEndTime;\r
-               \r
-               /** Item entry in subscription state */\r
-               public CollectorState.Item itemState;\r
-                               \r
-               /** Interface to current last sample (in the stream) */\r
-               public ValueBand current;\r
-               \r
-               /** Adapter that converts value to double value, null if not numeric */\r
-               public Adapter valueToDouble, doubleToValue;\r
-               \r
-               void close() {\r
-                       try {\r
-                               if (stream != null) { \r
-                                       stream.close();\r
-                               }\r
-                               stream = null;\r
-                       } catch (AccessorException e) {\r
-                               logger.log(Level.FINE, "Error closing stream", e);\r
-                       }\r
-               }\r
-\r
-       }\r
-\r
-       @Override\r
-       public StreamAccessor openStream(String itemId, String mode) \r
-       throws HistoryException \r
-       {\r
-               return history.openStream(itemId, mode);\r
-       }\r
-\r
-       @Override\r
-       public HistoryManager getHistory() {\r
-               return history;\r
-       }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
+ * Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.history.impl;
+
+import gnu.trove.set.hash.THashSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.accessor.StreamAccessor;
+import org.simantics.databoard.accessor.error.AccessorException;
+import org.simantics.databoard.adapter.AdaptException;
+import org.simantics.databoard.adapter.Adapter;
+import org.simantics.databoard.adapter.AdapterConstructionException;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.DoubleBinding;
+import org.simantics.databoard.binding.FloatBinding;
+import org.simantics.databoard.binding.NumberBinding;
+import org.simantics.databoard.binding.RecordBinding;
+import org.simantics.databoard.binding.error.BindingConstructionException;
+import org.simantics.databoard.binding.error.BindingException;
+import org.simantics.databoard.binding.error.RuntimeBindingException;
+import org.simantics.databoard.binding.impl.DoubleBindingDefault;
+import org.simantics.databoard.binding.mutable.MutableVariant;
+import org.simantics.databoard.binding.mutable.Variant;
+import org.simantics.databoard.binding.reflection.VoidBinding;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.type.Component;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.databoard.type.NumberType;
+import org.simantics.databoard.type.RecordType;
+import org.simantics.databoard.util.Bean;
+import org.simantics.history.Collector;
+import org.simantics.history.HistoryException;
+import org.simantics.history.HistoryManager;
+import org.simantics.history.impl.CollectorState.Item;
+import org.simantics.history.impl.CollectorState.VariableState;
+import org.simantics.history.util.ValueBand;
+import org.simantics.history.util.WeightedMedian;
+import org.simantics.history.util.subscription.SubscriptionItem;
+
+/**
+ * Collector is used to record data to streams in history manager.
+ * 
+ * For every item there is collector item state {@link CollectorState.Item}, 
+ * that contains data required in writing process. This state is read and 
+ * written to history manager item (bean) as meta-data. 
+ * 
+ * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. 
+ * 
+ * @author toni.kalajainen
+ *
+ */
+public class CollectorImpl implements Collector {
+
+       /** Logger */
+       static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
+       
+       public static final int MEDIAN_LIMIT = 500;
+       
+       public static final double EPSILON = 1e-2;
+       
+       /** Collector state */
+       CollectorState state;
+
+       /** Open streams, corresponds to subscription items */
+       CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
+       
+       /** Associated history state */ 
+       HistoryManager history;
+       
+       /** True when all items are open */
+       boolean itemsOpen = true;
+       
+       public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
+       
+       public FlushPolicy getFlushPolicy() {
+               return flushPolicy;
+       }
+
+       public void setFlushPolicy(FlushPolicy flushPolicy) {
+               this.flushPolicy = flushPolicy;
+       }
+
+       /**
+        * Create a new collector that is associated with a history manager.
+        * 
+        * @param history
+        * @throws HistoryException
+        */
+       public CollectorImpl(HistoryManager history) {
+               this.history = history;
+               this.state = new CollectorState();
+               this.state.init();
+               this.state.id = UUID.randomUUID().toString();
+       }
+
+       /**
+        * Create a new collector that is associated with a history manager.
+        * 
+        * @param history
+        * @param previousState
+        * @throws HistoryException
+        */
+       public CollectorImpl(HistoryManager history, CollectorState previousState) {
+               this.history = history;
+               this.state = (CollectorState) previousState.clone();
+       }
+       
+       @Override
+       public synchronized void addItem(Bean item) throws HistoryException {
+               try {
+                       String id = (String) item.getField("id");
+                       
+                       // Item State
+                       CollectorState.Item itemState = null;
+                       if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  
+                       if ( itemState == null ) itemState = state.itemStates.get( id );
+                       if ( itemState == null ) {
+                               // Create new item
+                               itemState = new CollectorState.Item();
+                               itemState.init();
+                               itemState.readAvailableFields( item );
+                               // Guess values from the existing data  
+                               if (history.exists(id)) {
+                                       readItemState(history, item, itemState);
+                               }
+                       }
+                       
+                       addItem(id, itemState);                 
+               } catch (BindingException e) {
+                       throw new HistoryException( e ); 
+               }
+       }
+       
+       public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
+               try {                   
+                       // Item State
+                       state.itemStates.put(id, itemState);
+                       itemsOpen = false;
+                       
+                       // Time                 
+                       if (state.time.type() instanceof NumberType) {                          
+                               double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
+                               if ( !Double.isNaN(itemState.currentTime) ) {
+                                       if ( itemState.currentTime > time ) {
+                                               state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
+                                       }
+                               }                               
+                       }
+               } catch (AdaptException e) {
+                       throw new HistoryException( e ); 
+               }
+       }
+       
+       public synchronized void addItems(Bean...items) throws HistoryException {
+               for (Bean item : items) addItem(item);
+       }
+       
+       @Override
+       public synchronized void removeItem(String id) throws HistoryException {
+               CollectorState.Item itemState = state.itemStates.remove( id );
+               if (itemState!=null && history.exists(id)) {
+                       Bean bean = history.getItem(id);
+                       bean.readAvailableFields(itemState);
+                       try {
+                               bean = setItemState(bean, itemState);
+                               history.modify(bean);
+                       } catch (BindingException e) {
+                               // Unexpected
+                               logger.log(Level.FINE, "Failed to update bean", e);                     
+                       } catch (BindingConstructionException e) {
+                               // Unexpected
+                               logger.log(Level.FINE, "Failed to update bean", e);
+                       } catch (HistoryException e) {
+                               logger.log(Level.FINE, "Failed to update history item meta-data", e);
+                       }
+               }
+               
+               ActiveStream as = getStream(id);
+               if ( as != null ) {
+                       items.remove( as );
+                       as.close();
+               }
+       }
+       
+       ActiveStream getStream(String id) {
+               for (ActiveStream as : items) {
+                       if (as.itemState.id.equals(id)) return as;
+               }
+               return null;
+       }
+       
+       /**
+        * Set (or Add) item's configuration. 
+        * 
+        * @param item item
+        * @throws HistoryException
+        */
+       public synchronized void setItem(Bean item) throws HistoryException {
+               try {
+                       String id = (String) item.getField("id");
+                       
+                       CollectorState.Item itemState = state.itemStates.get( id );
+                       if ( itemState==null ) {
+                               addItem(item);
+                               return;
+                       }
+                       itemState.readAvailableFields(item);
+               } catch (BindingException e) {
+                       throw new HistoryException( e ); 
+               }
+       }
+       
+       
+       /**
+        * Set item state to bean. If the bean does not have field collectorState,
+        * a new bean class is created, and field added.
+        *  
+        * @param config
+        * @param state
+        * @return same or new bean
+        * @throws BindingException 
+        * @throws BindingConstructionException 
+        */
+       public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
+       {
+               Binding binding = config.getFieldBinding("collectorState");
+               if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
+                       RecordType rt = config.getBinding().type();
+                       RecordType newType = new RecordType();
+                       for (int i=0; i<rt.getComponentCount(); i++) {
+                               Component c = rt.getComponent(i);
+                               if ( c.name.equals("collectorState") ) continue;
+                               newType.addComponent(c.name, c.type);
+                       }
+                       newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
+                       Binding newBinding = Bindings.getBeanBinding(newType);
+                       Bean newConfig = (Bean) newBinding.createDefault();
+                       newConfig.readAvailableFields( config );
+                       config = newConfig;
+               }
+               config.setField("collectorState", CollectorState.BINDING_ITEM, state);
+               return config;
+       }
+       
+       public SubscriptionItem[] getItems() {
+               int c = state.itemStates.size();
+               SubscriptionItem[] items = new SubscriptionItem[ c ];
+               int ix = 0;
+               Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
+               for (CollectorState.Item item : state.itemStates.values()) {
+                       _HistoryItem hi = new _HistoryItem();
+                       hi.init();
+                       hi.readAvailableFields(item);
+                       //hi.collectorState = (Item) item.clone();
+                       
+                       // Workaround (clone doesn't work for WeightedMedian)
+                       try {
+                               byte[] data = s.serialize( item );                              
+                               hi.collectorState = (Item) s.deserialize(data);
+                       } catch (IOException e) {
+                       }
+                                       
+                       items[ix++] = hi;
+               }
+               return items;
+       }
+       
+       public static class _HistoryItem extends SubscriptionItem {
+               public CollectorState.Item collectorState;
+       }
+       
+       /**
+        * Creates and loads active item. TimeSeries stream file is opened.
+        * 
+        * @param itemState
+        * @return new active item
+        * @throws HistoryException
+        */
+       ActiveStream openStream(CollectorState.Item itemState) throws HistoryException 
+       {
+               // New Active Item
+               ActiveStream x = new ActiveStream();
+               x.itemState = itemState;
+
+               // Stream
+               {
+                       if ( !history.exists(x.itemState.id) ) {
+                               throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
+                       }
+
+                       x.stream = history.openStream(x.itemState.id, "rw");
+               }
+
+               // Sample Binding
+               Datatype sampleType = x.stream.type().componentType;
+               itemState.format = sampleType;
+               try { 
+                       x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
+               } catch ( RuntimeException e ) {
+                       System.err.println("Could not create bean binding for type "+sampleType);
+                       x.binding = (RecordBinding) Bindings.getBinding( sampleType );          
+               }
+
+               // Value binding
+               x.valueBinding = x.binding.getComponentBinding("value");
+               x.isNumeric = x.valueBinding instanceof NumberBinding;
+               x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
+
+               // Value adapter
+               if ( x.isNumeric ) 
+               try {
+                       x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
+                       x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
+               } catch (AdapterConstructionException e1) {
+                       x.valueToDouble = null;
+               }
+
+               // Create valueband, and read last entry into memory
+               try {
+                       Object currentEntry = x.binding.createDefault();
+                       if (x.stream.size() >= 1) {
+                               x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
+                       }
+                       x.current = new ValueBand(x.binding);
+                       x.current.setSample(currentEntry);
+               } catch (AccessorException e) {
+                       throw new HistoryException(
+                                       "Could not read sample from stream file, " + e, e);
+               } catch (BindingException e) {
+                       throw new HistoryException(
+                                       "Could not read sample from stream file, " + e, e);
+               }
+
+               // Item State
+               x.itemState = itemState;
+
+               // Median
+               if (x.isNumeric && x.current.hasMedian()
+                               && x.itemState.median == null) {
+                       x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
+               }
+
+               return x;
+       }
+                       
+       /**
+        * Read itemState from history. Item must exist. 
+        * 
+        * @param si subscription item
+        * @return state item
+        * @throws RuntimeBindingException
+        * @throws HistoryException
+        */
+       static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
+               // Create item state
+               item.readAvailableFields(si);
+               // Read state from items
+               StreamAccessor sa = history.openStream(item.id, "r");
+               try {
+                       Datatype sampleType = sa.type().componentType;
+                       
+                       Binding sampleBinding = Bindings.getBinding(sampleType);
+                       if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
+                       Object sample = sampleBinding.createDefaultUnchecked();
+                       ValueBand v = new ValueBand(sampleBinding, sample);
+                       item.count = sa.size();
+                       if ( item.count>0 ) {
+                               sa.get(0, sampleBinding, sample);
+                               item.firstTime = v.getTimeDouble();
+                               Binding b = v.getValueBinding();
+                               Object value = v.getValue(b);
+                               item.firstValue.setValue(b, value);
+                                       
+                               sa.get(item.count-1, sampleBinding, sample);
+                               item.currentTime = v.getTimeDouble();
+                               b = v.getValueBinding();
+                               value = v.getValue(b);
+                               item.currentValue.setValue(b, value);
+                                       
+                               if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
+                                       DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
+                                       value = item.currentValue.getValue(db);
+                                       item.isNaN = Double.isNaN( (Double) value );
+                               }
+                       }
+                               
+               } catch (AccessorException e) {
+                       throw new HistoryException( e );
+               } catch (AdaptException e) {
+                       throw new HistoryException( e );
+               } finally {
+                       try { sa.close(); } catch (AccessorException e) {}
+               }
+       }
+
+       @Override
+       public CollectorState getState() {
+               return (CollectorState) state.clone();
+       }
+
+       @Override
+       public void setState(Bean newState) {
+               if (newState == null)
+                       return;
+               if (state == null) {
+                       state = new CollectorState();
+                       state.init();
+               }
+               state.readAvailableFields(newState);
+
+               // Make sure that openAllItems ensures that all streams are opened properly.
+               // This is because setItem will not invoke addItem if collectorState exists
+               // for an item and therefore itemsOpen might not otherwise be set to false.
+               itemsOpen = false;
+       }
+
+       @Override
+       public Object getTime(Binding binding) throws HistoryException {
+               MutableVariant v = state.time;
+               if (v==null) return null;
+               try {
+                       return v.getValue(binding);
+               } catch (AdaptException e) {
+                       throw new HistoryException(e);
+               }
+       }
+
+       @Override
+       public Object getValue(String id, Binding binding) throws HistoryException {
+               VariableState vs = state.values.get(id);
+               if (vs==null) return null;
+               
+               if (!vs.isValid) return null;
+               
+               try {
+                       return vs.value.getValue(binding);
+               } catch (AdaptException e) {
+                       throw new HistoryException(e);
+               }
+       }
+       
+       void openAllItems() throws HistoryException {
+               if (itemsOpen) return;
+
+               // Assert all items are open
+               // 1. Collect all items that still need to be opened.
+               Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
+               for (ActiveStream as : items)
+                       itemsToOpen.remove(as.itemState.id);
+
+               // 2. Open missing streams
+               List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
+               for (String itemId : itemsToOpen) {
+                       CollectorState.Item item = state.itemStates.get( itemId );
+                       ActiveStream as = openStream( item );
+                       opened.add(as);
+               }
+
+               // 3. Update list of open streams
+               items.addAll(opened);
+               itemsOpen = true;
+       }
+
+       private void updateDt(NumberBinding binding, Object time) {
+               try {
+                       Double current = (Double)getTime(Bindings.DOUBLE);
+                       if(current != null) {
+                               double t = binding.getValue(time).doubleValue();
+                               state.dT = t-current;
+                               if(state.dT > 0) return;
+                       }
+               } catch (Throwable t) {
+               }
+               state.dT = 1.0;
+       }
+       
+       @Override
+       public void beginStep(NumberBinding binding, Object time) throws HistoryException {
+               openAllItems();
+               updateDt(binding, time);
+               state.time.setValue(binding, time);
+       }
+
+       @Override
+       public void setValue(String id, Binding binding, Object value) throws HistoryException {
+               VariableState vs = state.values.get( id );
+               if (vs == null) {
+                       vs = new VariableState();
+                       vs.value = new MutableVariant(binding, value);
+                       state.values.put(id, vs);
+               }
+               
+               if (value==null) {
+                       vs.value.setValue(VoidBinding.VOID_BINDING, null);
+                       vs.isNan = true;
+                       vs.isValid = false;
+                       return;
+               } 
+                                       
+               vs.value.setValue(binding, value);
+               vs.isValid = true;
+                       
+               if (binding instanceof DoubleBinding) {
+                       DoubleBinding db = (DoubleBinding) binding;
+                       try {
+                               double _v = db.getValue_(value);
+                               vs.isNan = Double.isNaN( _v );
+                       } catch (BindingException e) {
+                               vs.isNan = true;
+                               vs.isValid = false;
+                       }
+               } else if (binding instanceof FloatBinding) {
+                       FloatBinding db = (FloatBinding) binding;
+                       try {
+                               float _v = db.getValue_(value);
+                               vs.isNan = Float.isNaN( _v );
+                       } catch (BindingException e) {
+                               vs.isNan = true;
+                               vs.isValid = false;
+                       }
+               } else {
+                       vs.isNan = false;
+               }
+       }
+
+       @Override
+       public boolean getValue(String id, MutableVariant result) {
+               VariableState vs = state.values.get(id);
+               if (vs==null) return false;
+               
+               if (!vs.isValid) return false;
+               
+               result.setValue(vs.value.getBinding(), vs.value.getValue());
+               return true;
+       }
+
+       @Override
+       public void endStep() throws HistoryException {
+               // Write values to streams
+               for (ActiveStream i : items)
+               {
+                       try {
+                               putValueToStream(i);                            
+                               if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
+                                       i.stream.flush();
+                               }
+                       } catch (AdaptException e) {
+                               throw new HistoryException( e );
+//                             FileAccessor fa = (FileAccessor) i.stream;
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
+                       } catch (AccessorException e) {
+                               throw new HistoryException( e );
+//                             FileAccessor fa = (FileAccessor) i.stream;
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
+                       } catch (BindingException e) {
+                               throw new HistoryException( e );
+//                             FileAccessor fa = (FileAccessor) i.stream;
+//                             logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
+                       }
+               }
+       }
+               
+       private int sampleNum(double time, double interval) {
+               // 1.0 is needed to cover the case of 0.0
+               return (int)(1.0 + (time+EPSILON*state.dT)/interval);
+       }
+
+       private boolean isExactInterval(double time, double interval) {
+               return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
+       }
+
+       /**
+        * This method puts a time,value to a stream.
+        * 
+        * The stream is "packed" when the new value is equal to previous, or the
+        * value is within deadband (in comparison to the first value), or if
+        * the interval zone is not exceeded.
+        * 
+        * The stream can be packed two different ways, depending on the sample
+        * format. The sample format can be (a) single value or (b) a time range of
+        * multiple values, Ranged Sample. 
+        * 
+        * Single value is packed by removing middle values, in other words, 
+        * moving the timecode of the last sample.
+        * 
+        * Ranged sample is packed by extending the endTime code of the last sample.
+        * Other fields, such as avg, sum, count, lastValue are also updated.
+        * 
+        * @param i
+        * @throws AdaptException
+        * @throws AccessorException 
+        * @throws BindingException 
+        * @throws HistoryException 
+        */
+       private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
+       {
+               // Current Time
+               Variant time = state.time;
+               double currentTime = Double.NaN;
+               if (time != null) {
+                       Double x = (Double) time.getValue(Bindings.DOUBLE);
+                       currentTime = x == null ? Double.NaN : x;
+               }
+
+               // Is first value
+               boolean isFirstValue = i.stream.size() == 0;
+               // Is this item disabled
+               boolean isDisabled = !i.itemState.enabled; 
+               // Discontinuety, first value after being disabled
+               boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
+               
+               // Update currently disabled status
+               i.itemState.isDisabled = isDisabled;
+
+               // Update time codes
+               if (isDisabled) {
+                       i.itemState.count++;
+                       i.itemState.lastDisabledTime = currentTime;
+                       if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
+                       return;
+               }
+               
+               // Is nan value, for doubles and floats
+               boolean isNanValue;
+               // Is valid value
+               boolean isValidValue;
+               // Value as variant
+               MutableVariant value;
+               // Get current value as double
+               double currentValueDouble;
+               
+               VariableState vs = state.values.get( i.itemState.variableId );                  
+               if ( vs!=null ) {
+                       isNanValue = vs.isNan;
+                       isValidValue = vs.isValid;
+                       value = vs.value;
+                       if (i.isNumeric && isValidValue && !isNanValue) {
+                               currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
+                               if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
+                                       currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
+                                       Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
+                                       value = new MutableVariant( i.valueBinding, x );
+                               }
+                       } else {
+                               currentValueDouble = Double.NaN;
+                       }
+                       
+               } else {
+                       isNanValue = true;
+                       isValidValue = false;
+                       value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
+                       currentValueDouble = Double.NaN;
+               }
+                               
+               // Prev Value
+               double prevValueDouble = Double.NaN;
+               Variant pv = i.itemState.currentValue;
+               if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
+                       prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
+               } else {
+                       prevValueDouble = Double.NaN;
+               }
+               
+               // The time of the first sample of region
+               double firstTime = i.itemState.firstTime;
+               // Prev time
+               double prevTime = i.itemState.currentTime;
+               
+               boolean restep = prevTime == currentTime;
+               
+               // There was change in NaN
+               boolean isNaNChange = i.itemState.isNaN != isNanValue;
+               
+               // There was change in validity of the value
+               boolean isValidChange = i.itemState.isValid != isValidValue;
+               
+               //boolean wasValidValue = !i.itemState.isValid && isValidValue;
+               
+               boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
+               
+               boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; 
+               
+               boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
+                               i.itemState.deadband==Double.MAX_VALUE &&
+                               i.binding.getComponentIndex("min")>=0 &&
+                               i.binding.getComponentIndex("max")>=0;
+
+               // Deadband
+               if (isValidValue && i.isNumeric && i.itemState.count>=1) {
+                       
+                       if (i.itemState.isValid) {
+                               double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
+                               double diff = Math.abs( currentValueDouble - firstValueDouble );
+                               if( hasDeadband )
+                                       i.itemState.ooDeadband |= diff >= i.itemState.deadband;
+                               else
+                                       i.itemState.ooDeadband |= diff != 0.0;
+                       } else {
+                               i.itemState.ooDeadband = true;
+                       }
+                       
+                       // NaN change is deadband change
+                       i.itemState.ooDeadband |= isNaNChange;
+               } else {
+                       // The dead band cannot be have broken in first sample
+                       i.itemState.ooDeadband = false;
+               }
+
+
+               // Interval
+               boolean ooInterval = true;
+               if ( hasInterval ) {
+                       
+//                     System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
+
+                       // First band is a special case
+                       if(isFirstValue) {
+                               // If this is the first sample store the current time as first time
+                               if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
+                               int sn1 = sampleNum(firstTime,i.itemState.interval);
+                               int sn2 = sampleNum(currentTime,i.itemState.interval);
+                               // Wait until next sample if this position is not already exactly at sample border
+                               // Always start minmax immediately
+                               if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
+                       } else {
+                               int sn1 = sampleNum(prevTime,i.itemState.interval);
+                               int sn2 = sampleNum(currentTime,i.itemState.interval);
+//                             System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
+                               ooInterval = sn2 > sn1;
+                       }
+
+                       
+//                     
+//                     double n = (currentTime+EPSILON) / i.itemState.interval;
+//                     double n2 = n-Math.floor(n);
+//                     
+//                     
+//                     double deltaTime = currentTime - firstTime;
+//                     ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
+               }
+               
+               // Should new sample be created, or old sample extended / forwarded in time
+               boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
+               
+               // Update item min-max, avg, median
+               if ( isValidValue && i.isNumeric ) {
+                       if (i.current.hasAvg() && i.isNumeric) {
+                               if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
+                               if (isValidValue && i.isNumeric) {
+                                       double timeDelta = currentTime - i.itemState.currentTime;                                                       
+                                       i.itemState.sum += timeDelta * currentValueDouble;
+                               }
+                       }
+
+                       if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
+                               double deltaTime = currentTime - prevTime;
+                               i.itemState.median.add( deltaTime, prevValueDouble );
+                       }
+               }
+
+               // Increase the counter that tracks the number of samples acquired into the band
+               if (!restep) i.itemState.count++;
+
+               // Remember values 
+               i.itemState.currentTime = currentTime;
+               i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      
+               i.itemState.isNaN = isNanValue;
+               i.itemState.isValid = isValidValue;
+               
+               // Put discontinuation marker
+               if ( (wasDisabled && i.current.supportsNullValue())) {
+                       // Extend marker
+                       if (i.current.isNullValue()) {
+                               if (i.current.hasEndTime()) {
+                                       i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
+                               }
+                               
+                               if (i.current.hasCount()) {
+                                       i.current.setCount( i.current.getCount()+1 );
+                               }
+                               
+                               // Add the band to the file stream
+                               i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
+                       }
+                       
+                       // Add marker
+                       i.current.reset();
+                       i.current.setValueNull();
+                       i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
+                       i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
+                       i.stream.addNoflush( i.binding, i.current.getSample() );
+               }
+               
+               boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); 
+
+               if (i.stream.size() > 0) {
+
+                       /// Extend existing band
+                       // Extend endTime value
+                       if (!i.hasEndTime) {
+                               i.current.setTime( time.getBinding(), time.getValue() );
+                       } else {
+                               i.current.setEndTime( time.getBinding(), time.getValue() );
+                       }
+                       if (i.itemState.ooDeadband || isMinMaxFormat) {
+                               if (isValidValue) {
+                                       // Set last value
+                                       if (i.current.hasLastValue()) {
+                                               i.current.setLastValue( value.getBinding(), value.getValue() );
+                                       }
+
+                                       if (i.isNumeric && !isNanValue) {
+                                               // Add sum
+                                               if (i.current.hasAvg() && !restep) {
+                                                       double duration = (currentTime - i.itemState.firstTime);
+                                                       if(duration < 1e-9) {
+                                                               i.current.setAvg( Bindings.DOUBLE, 0.0 );
+                                                       } else {
+                                                               i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
+                                                       }
+                                               }
+
+                                               // Update min-max
+                                               if (i.current.hasMin()) {
+                                                       Binding minBinding = i.current.getMinBinding();
+                                                       Object prevMinValue = i.current.getMin();
+                                                       Object currentValueWithMinBinding = value.getValue( minBinding );
+                                                       int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
+                                                       if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
+                                               }
+                                               if (i.current.hasMax()) {
+                                                       Binding maxBinding = i.current.getMaxBinding();
+                                                       Object prevMaxValue = i.current.getMax();
+                                                       Object currentValueWithMaxBinding = value.getValue( maxBinding );
+                                                       int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
+                                                       if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
+                                               }
+
+                                               // Update median
+                                               if (i.current.hasMedian()) {
+                                                       Double median = i.itemState.median.getMedian();
+                                                       i.current.setMedian( Bindings.DOUBLE, median);
+                                               }
+                                       }
+                               } else {
+                                       i.current.setValueNull();
+                               }
+
+                               // Update count
+                               if (i.current.hasCount()) {
+                                       i.current.setCount( i.itemState.count );
+                               }
+                       }
+
+                       // Write entry to file stream
+                       //              if (i.current.getTimeDouble()<0) 
+                       //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
+                       i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             
+                       //System.err.println(i.itemState.id + " update " + i.current.getSample());
+               }
+               
+               if (condition) {
+                       
+                       // Start a new value band
+                       i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
+                       i.itemState.firstTime = currentTime;
+                       i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
+                       i.itemState.currentTime = currentTime;
+                       i.itemState.isNaN = isNanValue;
+                       i.itemState.isValid = isValidValue;
+                       i.itemState.sum = 0;
+                       i.itemState.count = 1;
+                       i.itemState.ooDeadband = false;
+                       if (i.itemState.median != null) {
+                               i.itemState.median.clear();
+                               if (!Double.isNaN(prevValueDouble) ) {
+                                       double deltaTime = currentTime - prevTime;
+                                       i.itemState.median.add( deltaTime, prevValueDouble );
+                               }
+                       }
+                       
+                       // New Sample
+                       i.current.reset();
+                       
+                       // Start new band with current value & time
+                       i.current.setTime( time.getBinding(), time.getValue() );                
+
+                       // Is valid value
+                       if (i.current.hasEndTime()) {
+                               i.current.setEndTime( time.getBinding(), time.getValue() );
+                       }
+                       
+                       if (isValidValue) {
+                               i.current.setValue( value.getBinding(), value.getValue() );
+                               i.current.setMax( value.getBinding(), value.getValue() );
+                               i.current.setMin( value.getBinding(), value.getValue() );
+                               i.current.setLastValue( value.getBinding(), value.getValue() );
+                               i.current.setAvg( value.getBinding(), value.getValue() );
+                               i.current.setMedian( value.getBinding(), value.getValue() );                                    
+                       } else {
+                               i.current.setValueNull();
+                       }
+                       
+                               
+                       // Sample Count=1 for new bands (time, value) "added"
+                       if (i.current.hasCount()) {
+                               i.current.setCount( 1 );
+                       }
+                       
+                       // Add the band to the file stream
+                       i.stream.addNoflush( i.binding, i.current.getSample() );
+                       //System.err.println(i.itemState.id + " add " + i.current.getSample());
+                       
+               }               
+               
+       }
+
+       @Override
+       public void flush() /*throws HistoryException*/ {
+               for (ActiveStream i : items) {
+                       try {                                           
+                               i.stream.flush();
+                       } catch (AccessorException e) {
+                               logger.log(Level.FINE, "File history flush failed.", e);                        
+//                             throw new HistoryException(e);
+                       }
+               }
+       }
+
+       public void flush(Set<String> ids) /*throws HistoryException*/ {
+               for (ActiveStream i : items) {
+                       if (!ids.contains(i.itemState.id)) continue;
+                       try {                                           
+                               i.stream.flush();
+                       } catch (AccessorException e) {
+                               logger.log(Level.FINE, "File history flush failed.", e);                        
+//                             throw new HistoryException(e);
+                       }
+               }
+       }
+       
+       
+       @Override
+       public synchronized void close()
+       {
+               // Write items back to history
+               List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
+               for (CollectorState.Item itemState : state.itemStates.values()) {
+                       try {
+                               if (!history.exists(itemState.id)) continue;
+                               Bean bean = history.getItem(itemState.id);
+                               bean.readAvailableFields(itemState);
+                               beans.add( setItemState(bean, itemState) );
+                       } catch (BindingException e) {
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);
+                       } catch (BindingConstructionException e) {
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);
+                       } catch (HistoryException e) {
+                               logger.log(Level.FINE, "Error writing a meta-data to history", e);
+                       }
+               }
+               try {
+                       history.modify(beans.toArray( new Bean[beans.size()] ));
+               } catch (HistoryException e) {
+                       logger.log(Level.FINE, "Error writing a meta-data to history", e);
+               }
+               state.itemStates.clear();
+               
+               // Close streams
+               for (ActiveStream item : items) {
+                       item.close();
+               }
+               items.clear();
+       }
+
+       public class ActiveStream {
+
+               /** Sample Stream */
+               public StreamAccessor stream;
+               
+               /** Sample binding */
+               public RecordBinding binding;
+               
+               /** Value binding (Generic) */
+               public Binding valueBinding;
+               
+               /** Set true if this item has numeric value type */
+               public boolean isNumeric;
+               
+               /** Set true if this item has end time */
+               public boolean hasEndTime;
+               
+               /** Item entry in subscription state */
+               public CollectorState.Item itemState;
+                               
+               /** Interface to current last sample (in the stream) */
+               public ValueBand current;
+               
+               /** Adapter that converts value to double value, null if not numeric */
+               public Adapter valueToDouble, doubleToValue;
+               
+               void close() {
+                       try {
+                               if (stream != null) { 
+                                       stream.close();
+                               }
+                               stream = null;
+                       } catch (AccessorException e) {
+                               logger.log(Level.FINE, "Error closing stream", e);
+                       }
+               }
+
+       }
+
+       @Override
+       public StreamAccessor openStream(String itemId, String mode) 
+       throws HistoryException 
+       {
+               return history.openStream(itemId, mode);
+       }
+
+       @Override
+       public HistoryManager getHistory() {
+               return history;
+       }
+
+}