History data collector did not collect min/max/avg values correctly
[simantics/platform.git] / bundles / org.simantics.history / src / org / simantics / history / impl / CollectorImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.history.impl;
13
14 import gnu.trove.set.hash.THashSet;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
24
25 import org.simantics.databoard.Bindings;
26 import org.simantics.databoard.accessor.StreamAccessor;
27 import org.simantics.databoard.accessor.error.AccessorException;
28 import org.simantics.databoard.adapter.AdaptException;
29 import org.simantics.databoard.adapter.Adapter;
30 import org.simantics.databoard.adapter.AdapterConstructionException;
31 import org.simantics.databoard.binding.Binding;
32 import org.simantics.databoard.binding.DoubleBinding;
33 import org.simantics.databoard.binding.FloatBinding;
34 import org.simantics.databoard.binding.NumberBinding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.error.RuntimeBindingException;
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;
40 import org.simantics.databoard.binding.mutable.MutableVariant;
41 import org.simantics.databoard.binding.mutable.Variant;
42 import org.simantics.databoard.binding.reflection.VoidBinding;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.type.Component;
45 import org.simantics.databoard.type.Datatype;
46 import org.simantics.databoard.type.NumberType;
47 import org.simantics.databoard.type.RecordType;
48 import org.simantics.databoard.util.Bean;
49 import org.simantics.history.Collector;
50 import org.simantics.history.HistoryException;
51 import org.simantics.history.HistoryManager;
52 import org.simantics.history.impl.CollectorState.Item;
53 import org.simantics.history.impl.CollectorState.VariableState;
54 import org.simantics.history.util.ValueBand;
55 import org.simantics.history.util.WeightedMedian;
56 import org.simantics.history.util.subscription.SubscriptionItem;
57
58 /**
59  * Collector is used to record data to streams in history manager.
60  * 
61  * For every item there is collector item state {@link CollectorState.Item}, 
62  * that contains data required in writing process. This state is read and 
63  * written to history manager item (bean) as meta-data. 
64  * 
65  * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. 
66  * 
67  * @author toni.kalajainen
68  *
69  */
70 public class CollectorImpl implements Collector {
71
72         /** Logger */
73         static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
74         
75         public static final int MEDIAN_LIMIT = 500;
76         
77         public static final double EPSILON = 1e-2;
78         
79         /** Collector state */
80         CollectorState state;
81
82         /** Open streams, corresponds to subscription items */
83         CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
84         
85         /** Associated history state */ 
86         HistoryManager history;
87         
88         /** True when all items are open */
89         boolean itemsOpen = true;
90         
91         public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
92         
93         public FlushPolicy getFlushPolicy() {
94                 return flushPolicy;
95         }
96
97         public void setFlushPolicy(FlushPolicy flushPolicy) {
98                 this.flushPolicy = flushPolicy;
99         }
100
101         /**
102          * Create a new collector that is associated with a history manager.
103          * 
104          * @param history
105          * @throws HistoryException
106          */
107         public CollectorImpl(HistoryManager history) {
108                 this.history = history;
109                 this.state = new CollectorState();
110                 this.state.init();
111                 this.state.id = UUID.randomUUID().toString();
112         }
113
114         /**
115          * Create a new collector that is associated with a history manager.
116          * 
117          * @param history
118          * @param previousState
119          * @throws HistoryException
120          */
121         public CollectorImpl(HistoryManager history, CollectorState previousState) {
122                 this.history = history;
123                 this.state = (CollectorState) previousState.clone();
124         }
125         
126         @Override
127         public synchronized void addItem(Bean item) throws HistoryException {
128                 try {
129                         String id = (String) item.getField("id");
130                         
131                         // Item State
132                         CollectorState.Item itemState = null;
133                         if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  
134                         if ( itemState == null ) itemState = state.itemStates.get( id );
135                         if ( itemState == null ) {
136                                 // Create new item
137                                 itemState = new CollectorState.Item();
138                                 itemState.init();
139                                 itemState.readAvailableFields( item );
140                                 // Guess values from the existing data  
141                                 if (history.exists(id)) {
142                                         readItemState(history, item, itemState);
143                                 }
144                         }
145                         
146                         addItem(id, itemState);                 
147                 } catch (BindingException e) {
148                         throw new HistoryException( e ); 
149                 }
150         }
151         
152         public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
153                 try {                   
154                         // Item State
155                         state.itemStates.put(id, itemState);
156                         itemsOpen = false;
157                         
158                         // Time                 
159                         if (state.time.type() instanceof NumberType) {                          
160                                 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
161                                 if ( !Double.isNaN(itemState.currentTime) ) {
162                                         if ( itemState.currentTime > time ) {
163                                                 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
164                                         }
165                                 }                               
166                         }
167                 } catch (AdaptException e) {
168                         throw new HistoryException( e ); 
169                 }
170         }
171         
172         public synchronized void addItems(Bean...items) throws HistoryException {
173                 for (Bean item : items) addItem(item);
174         }
175         
176         @Override
177         public synchronized void removeItem(String id) throws HistoryException {
178                 CollectorState.Item itemState = state.itemStates.remove( id );
179                 if (itemState!=null && history.exists(id)) {
180                         Bean bean = history.getItem(id);
181                         bean.readAvailableFields(itemState);
182                         try {
183                                 bean = setItemState(bean, itemState);
184                                 history.modify(bean);
185                         } catch (BindingException e) {
186                                 // Unexpected
187                                 logger.log(Level.FINE, "Failed to update bean", e);                     
188                         } catch (BindingConstructionException e) {
189                                 // Unexpected
190                                 logger.log(Level.FINE, "Failed to update bean", e);
191                         } catch (HistoryException e) {
192                                 logger.log(Level.FINE, "Failed to update history item meta-data", e);
193                         }
194                 }
195                 
196                 ActiveStream as = getStream(id);
197                 if ( as != null ) {
198                         items.remove( as );
199                         as.close();
200                 }
201         }
202         
203         ActiveStream getStream(String id) {
204                 for (ActiveStream as : items) {
205                         if (as.itemState.id.equals(id)) return as;
206                 }
207                 return null;
208         }
209         
210         /**
211          * Set (or Add) item's configuration. 
212          * 
213          * @param item item
214          * @throws HistoryException
215          */
216         public synchronized void setItem(Bean item) throws HistoryException {
217                 try {
218                         String id = (String) item.getField("id");
219                         
220                         CollectorState.Item itemState = state.itemStates.get( id );
221                         if ( itemState==null ) {
222                                 addItem(item);
223                                 return;
224                         }
225                         itemState.readAvailableFields(item);
226                 } catch (BindingException e) {
227                         throw new HistoryException( e ); 
228                 }
229         }
230         
231         
232         /**
233          * Set item state to bean. If the bean does not have field collectorState,
234          * a new bean class is created, and field added.
235          *  
236          * @param config
237          * @param state
238          * @return same or new bean
239          * @throws BindingException 
240          * @throws BindingConstructionException 
241          */
242         public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
243         {
244                 Binding binding = config.getFieldBinding("collectorState");
245                 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
246                         RecordType rt = config.getBinding().type();
247                         RecordType newType = new RecordType();
248                         for (int i=0; i<rt.getComponentCount(); i++) {
249                                 Component c = rt.getComponent(i);
250                                 if ( c.name.equals("collectorState") ) continue;
251                                 newType.addComponent(c.name, c.type);
252                         }
253                         newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
254                         Binding newBinding = Bindings.getBeanBinding(newType);
255                         Bean newConfig = (Bean) newBinding.createDefault();
256                         newConfig.readAvailableFields( config );
257                         config = newConfig;
258                 }
259                 config.setField("collectorState", CollectorState.BINDING_ITEM, state);
260                 return config;
261         }
262         
263         public SubscriptionItem[] getItems() {
264                 int c = state.itemStates.size();
265                 SubscriptionItem[] items = new SubscriptionItem[ c ];
266                 int ix = 0;
267                 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
268                 for (CollectorState.Item item : state.itemStates.values()) {
269                         _HistoryItem hi = new _HistoryItem();
270                         hi.init();
271                         hi.readAvailableFields(item);
272                         //hi.collectorState = (Item) item.clone();
273                         
274                         // Workaround (clone doesn't work for WeightedMedian)
275                         try {
276                                 byte[] data = s.serialize( item );                              
277                                 hi.collectorState = (Item) s.deserialize(data);
278                         } catch (IOException e) {
279                         }
280                                         
281                         items[ix++] = hi;
282                 }
283                 return items;
284         }
285         
286         public static class _HistoryItem extends SubscriptionItem {
287                 public CollectorState.Item collectorState;
288         }
289         
290         /**
291          * Creates and loads active item. TimeSeries stream file is opened.
292          * 
293          * @param itemState
294          * @return new active item
295          * @throws HistoryException
296          */
297         ActiveStream openStream(CollectorState.Item itemState) throws HistoryException 
298         {
299                 // New Active Item
300                 ActiveStream x = new ActiveStream();
301                 x.itemState = itemState;
302
303                 // Stream
304                 {
305                         if ( !history.exists(x.itemState.id) ) {
306                                 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
307                         }
308
309                         x.stream = history.openStream(x.itemState.id, "rw");
310                 }
311
312                 // Sample Binding
313                 Datatype sampleType = x.stream.type().componentType;
314                 itemState.format = sampleType;
315                 try { 
316                         x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
317                 } catch ( RuntimeException e ) {
318                         System.err.println("Could not create bean binding for type "+sampleType);
319                         x.binding = (RecordBinding) Bindings.getBinding( sampleType );          
320                 }
321
322                 // Value binding
323                 x.valueBinding = x.binding.getComponentBinding("value");
324                 x.isNumeric = x.valueBinding instanceof NumberBinding;
325                 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
326
327                 // Value adapter
328                 if ( x.isNumeric ) 
329                 try {
330                         x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
331                         x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
332                 } catch (AdapterConstructionException e1) {
333                         x.valueToDouble = null;
334                 }
335
336                 // Create valueband, and read last entry into memory
337                 try {
338                         Object currentEntry = x.binding.createDefault();
339                         if (x.stream.size() >= 1) {
340                                 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
341                         }
342                         x.current = new ValueBand(x.binding);
343                         x.current.setSample(currentEntry);
344                 } catch (AccessorException e) {
345                         throw new HistoryException(
346                                         "Could not read sample from stream file, " + e, e);
347                 } catch (BindingException e) {
348                         throw new HistoryException(
349                                         "Could not read sample from stream file, " + e, e);
350                 }
351
352                 // Item State
353                 x.itemState = itemState;
354
355                 // Median
356                 if (x.isNumeric && x.current.hasMedian()
357                                 && x.itemState.median == null) {
358                         x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
359                 }
360
361                 return x;
362         }
363                         
364         /**
365          * Read itemState from history. Item must exist. 
366          * 
367          * @param si subscription item
368          * @return state item
369          * @throws RuntimeBindingException
370          * @throws HistoryException
371          */
372         static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
373                 // Create item state
374                 item.readAvailableFields(si);
375                 // Read state from items
376                 StreamAccessor sa = history.openStream(item.id, "r");
377                 try {
378                         Datatype sampleType = sa.type().componentType;
379                         
380                         Binding sampleBinding = Bindings.getBinding(sampleType);
381                         if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
382                         Object sample = sampleBinding.createDefaultUnchecked();
383                         ValueBand v = new ValueBand(sampleBinding, sample);
384                         item.count = sa.size();
385                         if ( item.count>0 ) {
386                                 sa.get(0, sampleBinding, sample);
387                                 item.firstTime = v.getTimeDouble();
388                                 Binding b = v.getValueBinding();
389                                 Object value = v.getValue(b);
390                                 item.firstValue.setValue(b, value);
391                                         
392                                 sa.get(item.count-1, sampleBinding, sample);
393                                 item.currentTime = v.getTimeDouble();
394                                 b = v.getValueBinding();
395                                 value = v.getValue(b);
396                                 item.currentValue.setValue(b, value);
397                                         
398                                 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
399                                         DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
400                                         value = item.currentValue.getValue(db);
401                                         item.isNaN = Double.isNaN( (Double) value );
402                                 }
403                         }
404                                 
405                 } catch (AccessorException e) {
406                         throw new HistoryException( e );
407                 } catch (AdaptException e) {
408                         throw new HistoryException( e );
409                 } finally {
410                         try { sa.close(); } catch (AccessorException e) {}
411                 }
412         }
413
414         @Override
415         public CollectorState getState() {
416                 return (CollectorState) state.clone();
417         }
418
419         @Override
420         public void setState(Bean newState) {
421                 if (newState == null)
422                         return;
423                 if (state == null) {
424                         state = new CollectorState();
425                         state.init();
426                 }
427                 state.readAvailableFields(newState);
428
429                 // Make sure that openAllItems ensures that all streams are opened properly.
430                 // This is because setItem will not invoke addItem if collectorState exists
431                 // for an item and therefore itemsOpen might not otherwise be set to false.
432                 itemsOpen = false;
433         }
434
435         @Override
436         public Object getTime(Binding binding) throws HistoryException {
437                 MutableVariant v = state.time;
438                 if (v==null) return null;
439                 try {
440                         return v.getValue(binding);
441                 } catch (AdaptException e) {
442                         throw new HistoryException(e);
443                 }
444         }
445
446         @Override
447         public Object getValue(String id, Binding binding) throws HistoryException {
448                 VariableState vs = state.values.get(id);
449                 if (vs==null) return null;
450                 
451                 if (!vs.isValid) return null;
452                 
453                 try {
454                         return vs.value.getValue(binding);
455                 } catch (AdaptException e) {
456                         throw new HistoryException(e);
457                 }
458         }
459         
460         void openAllItems() throws HistoryException {
461                 if (itemsOpen) return;
462
463                 // Assert all items are open
464                 // 1. Collect all items that still need to be opened.
465                 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
466                 for (ActiveStream as : items)
467                         itemsToOpen.remove(as.itemState.id);
468
469                 // 2. Open missing streams
470                 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
471                 for (String itemId : itemsToOpen) {
472                         CollectorState.Item item = state.itemStates.get( itemId );
473                         ActiveStream as = openStream( item );
474                         opened.add(as);
475                 }
476
477                 // 3. Update list of open streams
478                 items.addAll(opened);
479                 itemsOpen = true;
480         }
481
482         private void updateDt(NumberBinding binding, Object time) {
483                 try {
484                         Double current = (Double)getTime(Bindings.DOUBLE);
485                         if(current != null) {
486                                 double t = binding.getValue(time).doubleValue();
487                                 state.dT = t-current;
488                                 if(state.dT > 0) return;
489                         }
490                 } catch (Throwable t) {
491                 }
492                 state.dT = 1.0;
493         }
494         
495         @Override
496         public void beginStep(NumberBinding binding, Object time) throws HistoryException {
497                 openAllItems();
498                 updateDt(binding, time);
499                 state.time.setValue(binding, time);
500         }
501
502         @Override
503         public void setValue(String id, Binding binding, Object value) throws HistoryException {
504                 VariableState vs = state.values.get( id );
505                 if (vs == null) {
506                         vs = new VariableState();
507                         vs.value = new MutableVariant(binding, value);
508                         state.values.put(id, vs);
509                 }
510                 
511                 if (value==null) {
512                         vs.value.setValue(VoidBinding.VOID_BINDING, null);
513                         vs.isNan = true;
514                         vs.isValid = false;
515                         return;
516                 } 
517                                         
518                 vs.value.setValue(binding, value);
519                 vs.isValid = true;
520                         
521                 if (binding instanceof DoubleBinding) {
522                         DoubleBinding db = (DoubleBinding) binding;
523                         try {
524                                 double _v = db.getValue_(value);
525                                 vs.isNan = Double.isNaN( _v );
526                         } catch (BindingException e) {
527                                 vs.isNan = true;
528                                 vs.isValid = false;
529                         }
530                 } else if (binding instanceof FloatBinding) {
531                         FloatBinding db = (FloatBinding) binding;
532                         try {
533                                 float _v = db.getValue_(value);
534                                 vs.isNan = Float.isNaN( _v );
535                         } catch (BindingException e) {
536                                 vs.isNan = true;
537                                 vs.isValid = false;
538                         }
539                 } else {
540                         vs.isNan = false;
541                 }
542         }
543
544         @Override
545         public boolean getValue(String id, MutableVariant result) {
546                 VariableState vs = state.values.get(id);
547                 if (vs==null) return false;
548                 
549                 if (!vs.isValid) return false;
550                 
551                 result.setValue(vs.value.getBinding(), vs.value.getValue());
552                 return true;
553         }
554
555         @Override
556         public void endStep() throws HistoryException {
557                 // Write values to streams
558                 for (ActiveStream i : items)
559                 {
560                         try {
561                                 putValueToStream(i);                            
562                                 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
563                                         i.stream.flush();
564                                 }
565                         } catch (AdaptException e) {
566                                 throw new HistoryException( e );
567 //                              FileAccessor fa = (FileAccessor) i.stream;
568 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
569                         } catch (AccessorException e) {
570                                 throw new HistoryException( e );
571 //                              FileAccessor fa = (FileAccessor) i.stream;
572 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
573                         } catch (BindingException e) {
574                                 throw new HistoryException( e );
575 //                              FileAccessor fa = (FileAccessor) i.stream;
576 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
577                         }
578                 }
579         }
580                 
581         private int sampleNum(double time, double interval) {
582                 // 1.0 is needed to cover the case of 0.0
583                 return (int)(1.0 + (time+EPSILON*state.dT)/interval);
584         }
585
586         private boolean isExactInterval(double time, double interval) {
587                 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
588         }
589
590         /**
591          * This method puts a time,value to a stream.
592          * 
593          * The stream is "packed" when the new value is equal to previous, or the
594          * value is within deadband (in comparison to the first value), or if
595          * the interval zone is not exceeded.
596          * 
597          * The stream can be packed two different ways, depending on the sample
598          * format. The sample format can be (a) single value or (b) a time range of
599          * multiple values, Ranged Sample. 
600          * 
601          * Single value is packed by removing middle values, in other words, 
602          * moving the timecode of the last sample.
603          * 
604          * Ranged sample is packed by extending the endTime code of the last sample.
605          * Other fields, such as avg, sum, count, lastValue are also updated.
606          * 
607          * @param i
608          * @throws AdaptException
609          * @throws AccessorException 
610          * @throws BindingException 
611          * @throws HistoryException 
612          */
613         private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
614         {
615                 // Current Time
616                 Variant time = state.time;
617                 double currentTime = Double.NaN;
618                 if (time != null) {
619                         Double x = (Double) time.getValue(Bindings.DOUBLE);
620                         currentTime = x == null ? Double.NaN : x;
621                 }
622
623                 // Is first value
624                 boolean isFirstValue = i.stream.size() == 0;
625                 // Is this item disabled
626                 boolean isDisabled = !i.itemState.enabled; 
627                 // Discontinuety, first value after being disabled
628                 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
629                 
630                 // Update currently disabled status
631                 i.itemState.isDisabled = isDisabled;
632
633                 // Update time codes
634                 if (isDisabled) {
635                         i.itemState.count++;
636                         i.itemState.lastDisabledTime = currentTime;
637                         if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
638                         return;
639                 }
640                 
641                 // Is nan value, for doubles and floats
642                 boolean isNanValue;
643                 // Is valid value
644                 boolean isValidValue;
645                 // Value as variant
646                 MutableVariant value;
647                 // Get current value as double
648                 double currentValueDouble;
649                 
650                 VariableState vs = state.values.get( i.itemState.variableId );                  
651                 if ( vs!=null ) {
652                         isNanValue = vs.isNan;
653                         isValidValue = vs.isValid;
654                         value = vs.value;
655                         if (i.isNumeric && isValidValue && !isNanValue) {
656                                 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
657                                 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
658                                         currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
659                                         Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
660                                         value = new MutableVariant( i.valueBinding, x );
661                                 }
662                         } else {
663                                 currentValueDouble = Double.NaN;
664                         }
665                         
666                 } else {
667                         isNanValue = true;
668                         isValidValue = false;
669                         value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
670                         currentValueDouble = Double.NaN;
671                 }
672                                 
673                 // Prev Value
674                 double prevValueDouble = Double.NaN;
675                 Variant pv = i.itemState.currentValue;
676                 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
677                         prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
678                 } else {
679                         prevValueDouble = Double.NaN;
680                 }
681                 
682                 // The time of the first sample of region
683                 double firstTime = i.itemState.firstTime;
684                 // Prev time
685                 double prevTime = i.itemState.currentTime;
686                 
687                 boolean restep = prevTime == currentTime;
688                 
689                 // There was change in NaN
690                 boolean isNaNChange = i.itemState.isNaN != isNanValue;
691                 
692                 // There was change in validity of the value
693                 boolean isValidChange = i.itemState.isValid != isValidValue;
694                 
695                 //boolean wasValidValue = !i.itemState.isValid && isValidValue;
696                 
697                 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
698                 
699                 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; 
700                 
701                 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
702                                 i.itemState.deadband==Double.MAX_VALUE &&
703                                 i.binding.getComponentIndex("min")>=0 &&
704                                 i.binding.getComponentIndex("max")>=0;
705
706                 // Deadband
707                 if (isValidValue && i.isNumeric && i.itemState.count>=1) {
708                         
709                         if (i.itemState.isValid) {
710                                 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
711                                 double diff = Math.abs( currentValueDouble - firstValueDouble );
712                                 if( hasDeadband )
713                                         i.itemState.ooDeadband |= diff >= i.itemState.deadband;
714                                 else
715                                         i.itemState.ooDeadband |= diff != 0.0;
716                         } else {
717                                 i.itemState.ooDeadband = true;
718                         }
719                         
720                         // NaN change is deadband change
721                         i.itemState.ooDeadband |= isNaNChange;
722                 } else {
723                         // The dead band cannot be have broken in first sample
724                         i.itemState.ooDeadband = false;
725                 }
726
727
728                 // Interval
729                 boolean ooInterval = true;
730                 if ( hasInterval ) {
731                         
732 //                      System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
733
734                         // First band is a special case
735                         if(isFirstValue) {
736                                 // If this is the first sample store the current time as first time
737                                 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
738                                 int sn1 = sampleNum(firstTime,i.itemState.interval);
739                                 int sn2 = sampleNum(currentTime,i.itemState.interval);
740                                 // Wait until next sample if this position is not already exactly at sample border
741                                 // Always start minmax immediately
742                                 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
743                         } else {
744                                 int sn1 = sampleNum(prevTime,i.itemState.interval);
745                                 int sn2 = sampleNum(currentTime,i.itemState.interval);
746 //                              System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
747                                 ooInterval = sn2 > sn1;
748                         }
749
750                         
751 //                      
752 //                      double n = (currentTime+EPSILON) / i.itemState.interval;
753 //                      double n2 = n-Math.floor(n);
754 //                      
755 //                      
756 //                      double deltaTime = currentTime - firstTime;
757 //                      ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
758                 }
759                 
760                 // Should new sample be created, or old sample extended / forwarded in time
761                 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
762                 
763                 // Update item min-max, avg, median
764                 if ( isValidValue && i.isNumeric ) {
765                         if (i.current.hasAvg() && i.isNumeric) {
766                                 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
767                                 if (isValidValue && i.isNumeric) {
768                                         double timeDelta = currentTime - i.itemState.currentTime;                                                       
769                                         i.itemState.sum += timeDelta * currentValueDouble;
770                                 }
771                         }
772
773                         if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
774                                 double deltaTime = currentTime - prevTime;
775                                 i.itemState.median.add( deltaTime, prevValueDouble );
776                         }
777                 }
778
779                 // Increase the counter that tracks the number of samples acquired into the band
780                 if (!restep) i.itemState.count++;
781
782                 // Remember values 
783                 i.itemState.currentTime = currentTime;
784                 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      
785                 i.itemState.isNaN = isNanValue;
786                 i.itemState.isValid = isValidValue;
787                 
788                 // Put discontinuation marker
789                 if ( (wasDisabled && i.current.supportsNullValue())) {
790                         // Extend marker
791                         if (i.current.isNullValue()) {
792                                 if (i.current.hasEndTime()) {
793                                         i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
794                                 }
795                                 
796                                 if (i.current.hasCount()) {
797                                         i.current.setCount( i.current.getCount()+1 );
798                                 }
799                                 
800                                 // Add the band to the file stream
801                                 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
802                         }
803                         
804                         // Add marker
805                         i.current.reset();
806                         i.current.setValueNull();
807                         i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
808                         i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
809                         i.stream.addNoflush( i.binding, i.current.getSample() );
810                 }
811                 
812                 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); 
813
814                 if(i.stream.size() > 0 ) { //!i.itemState.ooDeadband) {
815
816                         /// Extend existing band
817                         // Extend endTime value
818                         if (!i.hasEndTime) {
819                                 i.current.setTime( time.getBinding(), time.getValue() );
820                         } else {
821                                 i.current.setEndTime( time.getBinding(), time.getValue() );
822                         }
823                         if  (i.itemState.ooDeadband) {
824                                 if (isValidValue) {
825                                         // Set last value
826                                         if (i.current.hasLastValue()) {
827                                                 i.current.setLastValue( value.getBinding(), value.getValue() );
828                                         }
829
830                                         // Add sum
831                                         if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {
832                                                 double duration = (currentTime - i.itemState.firstTime);
833                                                 if(duration < 1e-9) {
834                                                         i.current.setAvg( Bindings.DOUBLE, 0.0 );
835                                                 } else {
836                                                         i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
837                                                 }
838                                         }
839
840                                         // Update min-max
841                                         if (i.current.hasMin() && i.isNumeric && !isNanValue) {
842                                                 Binding minBinding = i.current.getMinBinding();
843                                                 Object prevMinValue = i.current.getMin();
844                                                 Object currentValueWithMinBinding = value.getValue( minBinding );
845                                                 int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
846                                                 if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
847                                         }
848                                         if (i.current.hasMax() && i.isNumeric && !isNanValue) {
849                                                 Binding maxBinding = i.current.getMaxBinding();
850                                                 Object prevMaxValue = i.current.getMax();
851                                                 Object currentValueWithMaxBinding = value.getValue( maxBinding );
852                                                 int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
853                                                 if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
854                                         }
855
856                                         // Update median
857                                         if (i.current.hasMedian() && i.isNumeric && !isNanValue) {
858                                                 Double median = i.itemState.median.getMedian();
859                                                 i.current.setMedian( Bindings.DOUBLE, median);
860                                         }
861                                 } else {
862                                         i.current.setValueNull();
863                                 }
864
865                                 // Add count
866                                 if (i.current.hasCount()) {
867                                         i.current.setCount( i.itemState.count );
868                                 }
869                         }
870
871                         // Write entry to file stream
872                         //              if (i.current.getTimeDouble()<0) 
873                         //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
874                         i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             
875                         //System.err.println(i.itemState.id + " update " + i.current.getSample());
876                 }
877                 
878                 if (condition) {
879                         
880                         // Start a new value band
881                         i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
882                         i.itemState.firstTime = currentTime;
883                         i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
884                         i.itemState.currentTime = currentTime;
885                         i.itemState.isNaN = isNanValue;
886                         i.itemState.isValid = isValidValue;
887                         i.itemState.sum = 0;
888                         i.itemState.count = 1;
889                         i.itemState.ooDeadband = false;
890                         if (i.itemState.median != null) {
891                                 i.itemState.median.clear();
892                                 if (!Double.isNaN(prevValueDouble) ) {
893                                         double deltaTime = currentTime - prevTime;
894                                         i.itemState.median.add( deltaTime, prevValueDouble );
895                                 }
896                         }
897                         
898                         // New Sample
899                         i.current.reset();
900                         
901                         // Start new band with current value & time
902                         i.current.setTime( time.getBinding(), time.getValue() );                
903
904                         // Is valid value
905                         if (i.current.hasEndTime()) {
906                                 i.current.setEndTime( time.getBinding(), time.getValue() );
907                         }
908                         
909                         if (isValidValue) {
910                                 i.current.setValue( value.getBinding(), value.getValue() );
911                                 i.current.setMax( value.getBinding(), value.getValue() );
912                                 i.current.setMin( value.getBinding(), value.getValue() );
913                                 i.current.setLastValue( value.getBinding(), value.getValue() );
914                                 i.current.setAvg( value.getBinding(), value.getValue() );
915                                 i.current.setMedian( value.getBinding(), value.getValue() );                                    
916                         } else {
917                                 i.current.setValueNull();
918                         }
919                         
920                                 
921                         // Sample Count=1 for new bands (time, value) "added"
922                         if (i.current.hasCount()) {
923                                 i.current.setCount( 1 );
924                         }
925                         
926                         // Add the band to the file stream
927                         i.stream.addNoflush( i.binding, i.current.getSample() );
928                         //System.err.println(i.itemState.id + " add " + i.current.getSample());
929                         
930                 }               
931                 
932         }
933
934         @Override
935         public void flush() /*throws HistoryException*/ {
936                 for (ActiveStream i : items) {
937                         try {                                           
938                                 i.stream.flush();
939                         } catch (AccessorException e) {
940                                 logger.log(Level.FINE, "File history flush failed.", e);                        
941 //                              throw new HistoryException(e);
942                         }
943                 }
944         }
945
946         public void flush(Set<String> ids) /*throws HistoryException*/ {
947                 for (ActiveStream i : items) {
948                         if (!ids.contains(i.itemState.id)) continue;
949                         try {                                           
950                                 i.stream.flush();
951                         } catch (AccessorException e) {
952                                 logger.log(Level.FINE, "File history flush failed.", e);                        
953 //                              throw new HistoryException(e);
954                         }
955                 }
956         }
957         
958         
959         @Override
960         public synchronized void close()
961         {
962                 // Write items back to history
963                 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
964                 for (CollectorState.Item itemState : state.itemStates.values()) {
965                         try {
966                                 if (!history.exists(itemState.id)) continue;
967                                 Bean bean = history.getItem(itemState.id);
968                                 bean.readAvailableFields(itemState);
969                                 beans.add( setItemState(bean, itemState) );
970                         } catch (BindingException e) {
971                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
972                         } catch (BindingConstructionException e) {
973                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
974                         } catch (HistoryException e) {
975                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
976                         }
977                 }
978                 try {
979                         history.modify(beans.toArray( new Bean[beans.size()] ));
980                 } catch (HistoryException e) {
981                         logger.log(Level.FINE, "Error writing a meta-data to history", e);
982                 }
983                 state.itemStates.clear();
984                 
985                 // Close streams
986                 for (ActiveStream item : items) {
987                         item.close();
988                 }
989                 items.clear();
990         }
991
992         public class ActiveStream {
993
994                 /** Sample Stream */
995                 public StreamAccessor stream;
996                 
997                 /** Sample binding */
998                 public RecordBinding binding;
999                 
1000                 /** Value binding (Generic) */
1001                 public Binding valueBinding;
1002                 
1003                 /** Set true if this item has numeric value type */
1004                 public boolean isNumeric;
1005                 
1006                 /** Set true if this item has end time */
1007                 public boolean hasEndTime;
1008                 
1009                 /** Item entry in subscription state */
1010                 public CollectorState.Item itemState;
1011                                 
1012                 /** Interface to current last sample (in the stream) */
1013                 public ValueBand current;
1014                 
1015                 /** Adapter that converts value to double value, null if not numeric */
1016                 public Adapter valueToDouble, doubleToValue;
1017                 
1018                 void close() {
1019                         try {
1020                                 if (stream != null) { 
1021                                         stream.close();
1022                                 }
1023                                 stream = null;
1024                         } catch (AccessorException e) {
1025                                 logger.log(Level.FINE, "Error closing stream", e);
1026                         }
1027                 }
1028
1029         }
1030
1031         @Override
1032         public StreamAccessor openStream(String itemId, String mode) 
1033         throws HistoryException 
1034         {
1035                 return history.openStream(itemId, mode);
1036         }
1037
1038         @Override
1039         public HistoryManager getHistory() {
1040                 return history;
1041         }
1042
1043 }