]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java
c202f733ca6af88c284351de3cfadea3eee9899d
[simantics/platform.git] / bundles / org.simantics.history / src / org / simantics / history / impl / CollectorImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.history.impl;
13
14 import gnu.trove.set.hash.THashSet;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
24
25 import org.simantics.databoard.Bindings;
26 import org.simantics.databoard.accessor.StreamAccessor;
27 import org.simantics.databoard.accessor.error.AccessorException;
28 import org.simantics.databoard.adapter.AdaptException;
29 import org.simantics.databoard.adapter.Adapter;
30 import org.simantics.databoard.adapter.AdapterConstructionException;
31 import org.simantics.databoard.binding.Binding;
32 import org.simantics.databoard.binding.DoubleBinding;
33 import org.simantics.databoard.binding.FloatBinding;
34 import org.simantics.databoard.binding.NumberBinding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.error.RuntimeBindingException;
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;
40 import org.simantics.databoard.binding.mutable.MutableVariant;
41 import org.simantics.databoard.binding.mutable.Variant;
42 import org.simantics.databoard.binding.reflection.VoidBinding;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.type.Component;
45 import org.simantics.databoard.type.Datatype;
46 import org.simantics.databoard.type.NumberType;
47 import org.simantics.databoard.type.RecordType;
48 import org.simantics.databoard.util.Bean;
49 import org.simantics.history.Collector;
50 import org.simantics.history.HistoryException;
51 import org.simantics.history.HistoryManager;
52 import org.simantics.history.impl.CollectorState.Item;
53 import org.simantics.history.impl.CollectorState.VariableState;
54 import org.simantics.history.util.ValueBand;
55 import org.simantics.history.util.WeightedMedian;
56 import org.simantics.history.util.subscription.SubscriptionItem;
57
58 /**
59  * Collector is used to record data to streams in history manager.
60  * 
61  * For every item there is collector item state {@link CollectorState.Item}, 
62  * that contains data required in writing process. This state is read and 
63  * written to history manager item (bean) as meta-data. 
64  * 
65  * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. 
66  * 
67  * @author toni.kalajainen
68  *
69  */
70 public class CollectorImpl implements Collector {
71
72         /** Logger */
73         static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
74         
75         public static final int MEDIAN_LIMIT = 500;
76         
77         public static final double EPSILON = 1e-2;
78         
79         /** Collector state */
80         CollectorState state;
81
82         /** Open streams, corresponds to subscription items */
83         CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
84         
85         /** Associated history state */ 
86         HistoryManager history;
87         
88         /** True when all items are open */
89         boolean itemsOpen = true;
90
91         /** True when history collection will write data to disk */
92         boolean enabled = true;
93
94         /**
95          * Transient info only set in {@link #beginStep(NumberBinding, Object)} based on
96          * {@link #enabled}
97          */
98         private transient boolean stepEnabled = true;
99         
100         public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
101         
102         public FlushPolicy getFlushPolicy() {
103                 return flushPolicy;
104         }
105
106         public void setFlushPolicy(FlushPolicy flushPolicy) {
107                 this.flushPolicy = flushPolicy;
108         }
109
110         /**
111          * Create a new collector that is associated with a history manager.
112          * 
113          * @param history
114          * @throws HistoryException
115          */
116         public CollectorImpl(HistoryManager history) {
117                 this.history = history;
118                 this.state = new CollectorState();
119                 this.state.init();
120                 this.state.id = UUID.randomUUID().toString();
121         }
122
123         /**
124          * Create a new collector that is associated with a history manager.
125          * 
126          * @param history
127          * @param previousState
128          * @throws HistoryException
129          */
130         public CollectorImpl(HistoryManager history, CollectorState previousState) {
131                 this.history = history;
132                 this.state = (CollectorState) previousState.clone();
133         }
134
135         public void setEnabled(boolean enabled) {
136                 this.enabled = enabled;
137         }
138
139         @Override
140         public boolean isEnabled() {
141                 return enabled;
142         }
143
144         @Override
145         public synchronized void addItem(Bean item) throws HistoryException {
146                 try {
147                         String id = (String) item.getField("id");
148                         
149                         // Item State
150                         CollectorState.Item itemState = null;
151                         if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  
152                         if ( itemState == null ) itemState = state.itemStates.get( id );
153                         if ( itemState == null ) {
154                                 // Create new item
155                                 itemState = new CollectorState.Item();
156                                 itemState.init();
157                                 itemState.readAvailableFields( item );
158                                 // Guess values from the existing data  
159                                 if (history.exists(id)) {
160                                         readItemState(history, item, itemState);
161                                 }
162                         }
163                         
164                         addItem(id, itemState);                 
165                 } catch (BindingException e) {
166                         throw new HistoryException( e ); 
167                 }
168         }
169         
170         public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
171                 try {                   
172                         // Item State
173                         state.itemStates.put(id, itemState);
174                         itemsOpen = false;
175                         
176                         // Time                 
177                         if (state.time.type() instanceof NumberType) {                          
178                                 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
179                                 if ( !Double.isNaN(itemState.currentTime) ) {
180                                         if ( itemState.currentTime > time ) {
181                                                 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
182                                         }
183                                 }                               
184                         }
185                 } catch (AdaptException e) {
186                         throw new HistoryException( e ); 
187                 }
188         }
189         
190         public synchronized void addItems(Bean...items) throws HistoryException {
191                 for (Bean item : items) addItem(item);
192         }
193         
194         @Override
195         public synchronized void removeItem(String id) throws HistoryException {
196                 CollectorState.Item itemState = state.itemStates.remove( id );
197                 if (itemState!=null && history.exists(id)) {
198                         Bean bean = history.getItem(id);
199                         bean.readAvailableFields(itemState);
200                         try {
201                                 bean = setItemState(bean, itemState);
202                                 history.modify(bean);
203                         } catch (BindingException e) {
204                                 // Unexpected
205                                 logger.log(Level.FINE, "Failed to update bean", e);                     
206                         } catch (BindingConstructionException e) {
207                                 // Unexpected
208                                 logger.log(Level.FINE, "Failed to update bean", e);
209                         } catch (HistoryException e) {
210                                 logger.log(Level.FINE, "Failed to update history item meta-data", e);
211                         }
212                 }
213                 
214                 ActiveStream as = getStream(id);
215                 if ( as != null ) {
216                         items.remove( as );
217                         as.close();
218                 }
219         }
220         
221         ActiveStream getStream(String id) {
222                 for (ActiveStream as : items) {
223                         if (as.itemState.id.equals(id)) return as;
224                 }
225                 return null;
226         }
227         
228         /**
229          * Set (or Add) item's configuration. 
230          * 
231          * @param item item
232          * @throws HistoryException
233          */
234         public synchronized void setItem(Bean item) throws HistoryException {
235                 try {
236                         String id = (String) item.getField("id");
237                         
238                         CollectorState.Item itemState = state.itemStates.get( id );
239                         if ( itemState==null ) {
240                                 addItem(item);
241                                 return;
242                         }
243                         itemState.readAvailableFields(item);
244                 } catch (BindingException e) {
245                         throw new HistoryException( e ); 
246                 }
247         }
248         
249         
250         /**
251          * Set item state to bean. If the bean does not have field collectorState,
252          * a new bean class is created, and field added.
253          *  
254          * @param config
255          * @param state
256          * @return same or new bean
257          * @throws BindingException 
258          * @throws BindingConstructionException 
259          */
260         public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
261         {
262                 Binding binding = config.getFieldBinding("collectorState");
263                 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
264                         RecordType rt = config.getBinding().type();
265                         RecordType newType = new RecordType();
266                         for (int i=0; i<rt.getComponentCount(); i++) {
267                                 Component c = rt.getComponent(i);
268                                 if ( c.name.equals("collectorState") ) continue;
269                                 newType.addComponent(c.name, c.type);
270                         }
271                         newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
272                         Binding newBinding = Bindings.getBeanBinding(newType);
273                         Bean newConfig = (Bean) newBinding.createDefault();
274                         newConfig.readAvailableFields( config );
275                         config = newConfig;
276                 }
277                 config.setField("collectorState", CollectorState.BINDING_ITEM, state);
278                 return config;
279         }
280         
281         public SubscriptionItem[] getItems() {
282                 int c = state.itemStates.size();
283                 SubscriptionItem[] items = new SubscriptionItem[ c ];
284                 int ix = 0;
285                 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
286                 for (CollectorState.Item item : state.itemStates.values()) {
287                         _HistoryItem hi = new _HistoryItem();
288                         hi.init();
289                         hi.readAvailableFields(item);
290                         //hi.collectorState = (Item) item.clone();
291                         
292                         // Workaround (clone doesn't work for WeightedMedian)
293                         try {
294                                 byte[] data = s.serialize( item );                              
295                                 hi.collectorState = (Item) s.deserialize(data);
296                         } catch (IOException e) {
297                         }
298                                         
299                         items[ix++] = hi;
300                 }
301                 return items;
302         }
303         
304         public static class _HistoryItem extends SubscriptionItem {
305                 public CollectorState.Item collectorState;
306         }
307         
308         /**
309          * Creates and loads active item. TimeSeries stream file is opened.
310          * 
311          * @param itemState
312          * @return new active item
313          * @throws HistoryException
314          */
315         ActiveStream openStream(CollectorState.Item itemState) throws HistoryException 
316         {
317                 // New Active Item
318                 ActiveStream x = new ActiveStream();
319                 x.itemState = itemState;
320
321                 // Stream
322                 {
323                         if ( !history.exists(x.itemState.id) ) {
324                                 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
325                         }
326
327                         x.stream = history.openStream(x.itemState.id, "rw");
328                 }
329
330                 // Sample Binding
331                 Datatype sampleType = x.stream.type().componentType;
332                 itemState.format = sampleType;
333                 try { 
334                         x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
335                 } catch ( RuntimeException e ) {
336                         System.err.println("Could not create bean binding for type "+sampleType);
337                         x.binding = (RecordBinding) Bindings.getBinding( sampleType );          
338                 }
339
340                 // Value binding
341                 x.valueBinding = x.binding.getComponentBinding("value");
342                 x.isNumeric = x.valueBinding instanceof NumberBinding;
343                 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
344
345                 // Value adapter
346                 if ( x.isNumeric ) 
347                 try {
348                         x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
349                         x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
350                 } catch (AdapterConstructionException e1) {
351                         x.valueToDouble = null;
352                 }
353
354                 // Create valueband, and read last entry into memory
355                 try {
356                         Object currentEntry = x.binding.createDefault();
357                         if (x.stream.size() >= 1) {
358                                 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
359                         }
360                         x.current = new ValueBand(x.binding);
361                         x.current.setSample(currentEntry);
362                 } catch (AccessorException e) {
363                         throw new HistoryException(
364                                         "Could not read sample from stream file, " + e, e);
365                 } catch (BindingException e) {
366                         throw new HistoryException(
367                                         "Could not read sample from stream file, " + e, e);
368                 }
369
370                 // Item State
371                 x.itemState = itemState;
372
373                 // Median
374                 if (x.isNumeric && x.current.hasMedian()
375                                 && x.itemState.median == null) {
376                         x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
377                 }
378
379                 return x;
380         }
381                         
382         /**
383          * Read itemState from history. Item must exist. 
384          * 
385          * @param si subscription item
386          * @return state item
387          * @throws RuntimeBindingException
388          * @throws HistoryException
389          */
390         static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
391                 // Create item state
392                 item.readAvailableFields(si);
393                 // Read state from items
394                 StreamAccessor sa = history.openStream(item.id, "r");
395                 try {
396                         Datatype sampleType = sa.type().componentType;
397                         
398                         Binding sampleBinding = Bindings.getBinding(sampleType);
399                         if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
400                         Object sample = sampleBinding.createDefaultUnchecked();
401                         ValueBand v = new ValueBand(sampleBinding, sample);
402                         item.count = sa.size();
403                         if ( item.count>0 ) {
404                                 sa.get(0, sampleBinding, sample);
405                                 item.firstTime = v.getTimeDouble();
406                                 Binding b = v.getValueBinding();
407                                 Object value = v.getValue(b);
408                                 item.firstValue.setValue(b, value);
409                                         
410                                 sa.get(item.count-1, sampleBinding, sample);
411                                 item.currentTime = v.getTimeDouble();
412                                 b = v.getValueBinding();
413                                 value = v.getValue(b);
414                                 item.currentValue.setValue(b, value);
415                                         
416                                 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
417                                         DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
418                                         value = item.currentValue.getValue(db);
419                                         item.isNaN = Double.isNaN( (Double) value );
420                                 }
421                         }
422                                 
423                 } catch (AccessorException e) {
424                         throw new HistoryException( e );
425                 } catch (AdaptException e) {
426                         throw new HistoryException( e );
427                 } finally {
428                         try { sa.close(); } catch (AccessorException e) {}
429                 }
430         }
431
432         @Override
433         public CollectorState getState() {
434                 return (CollectorState) state.clone();
435         }
436
437         @Override
438         public void setState(Bean newState) {
439                 if (newState == null)
440                         return;
441                 if (state == null) {
442                         state = new CollectorState();
443                         state.init();
444                 }
445                 state.readAvailableFields(newState);
446
447                 // Make sure that openAllItems ensures that all streams are opened properly.
448                 // This is because setItem will not invoke addItem if collectorState exists
449                 // for an item and therefore itemsOpen might not otherwise be set to false.
450                 itemsOpen = false;
451         }
452
453         @Override
454         public Object getTime(Binding binding) throws HistoryException {
455                 MutableVariant v = state.time;
456                 if (v==null) return null;
457                 try {
458                         return v.getValue(binding);
459                 } catch (AdaptException e) {
460                         throw new HistoryException(e);
461                 }
462         }
463
464         @Override
465         public Object getValue(String id, Binding binding) throws HistoryException {
466                 VariableState vs = state.values.get(id);
467                 if (vs==null) return null;
468                 
469                 if (!vs.isValid) return null;
470                 
471                 try {
472                         return vs.value.getValue(binding);
473                 } catch (AdaptException e) {
474                         throw new HistoryException(e);
475                 }
476         }
477         
478         void openAllItems() throws HistoryException {
479                 if (itemsOpen) return;
480
481                 // Assert all items are open
482                 // 1. Collect all items that still need to be opened.
483                 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
484                 for (ActiveStream as : items)
485                         itemsToOpen.remove(as.itemState.id);
486
487                 // 2. Open missing streams
488                 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
489                 for (String itemId : itemsToOpen) {
490                         CollectorState.Item item = state.itemStates.get( itemId );
491                         ActiveStream as = openStream( item );
492                         opened.add(as);
493                 }
494
495                 // 3. Update list of open streams
496                 items.addAll(opened);
497                 itemsOpen = true;
498         }
499
500         private void updateDt(NumberBinding binding, Object time) {
501                 try {
502                         Double current = (Double)getTime(Bindings.DOUBLE);
503                         if(current != null) {
504                                 double t = binding.getValue(time).doubleValue();
505                                 state.dT = t-current;
506                                 if(state.dT > 0) return;
507                         }
508                 } catch (Throwable t) {
509                 }
510                 state.dT = 1.0;
511         }
512         
513         @Override
514         public void beginStep(NumberBinding binding, Object time) throws HistoryException {
515                 stepEnabled = enabled;
516                 updateDt(binding, time);
517                 state.time.setValue(binding, time);
518                 if (enabled) {
519                         openAllItems();
520                 }
521         }
522
523         @Override
524         public void setValue(String id, Binding binding, Object value) throws HistoryException {
525                 // Short-circuit calls when collection not enabled
526                 if (!stepEnabled)
527                         return;
528
529                 VariableState vs = state.values.get( id );
530                 if (vs == null) {
531                         vs = new VariableState();
532                         vs.value = new MutableVariant(binding, value);
533                         state.values.put(id, vs);
534                 }
535                 
536                 if (value==null) {
537                         vs.value.setValue(VoidBinding.VOID_BINDING, null);
538                         vs.isNan = true;
539                         vs.isValid = false;
540                         return;
541                 } 
542                                         
543                 vs.value.setValue(binding, value);
544                 vs.isValid = true;
545                         
546                 if (binding instanceof DoubleBinding) {
547                         DoubleBinding db = (DoubleBinding) binding;
548                         try {
549                                 double _v = db.getValue_(value);
550                                 vs.isNan = Double.isNaN( _v );
551                         } catch (BindingException e) {
552                                 vs.isNan = true;
553                                 vs.isValid = false;
554                         }
555                 } else if (binding instanceof FloatBinding) {
556                         FloatBinding db = (FloatBinding) binding;
557                         try {
558                                 float _v = db.getValue_(value);
559                                 vs.isNan = Float.isNaN( _v );
560                         } catch (BindingException e) {
561                                 vs.isNan = true;
562                                 vs.isValid = false;
563                         }
564                 } else {
565                         vs.isNan = false;
566                 }
567         }
568
569         @Override
570         public boolean getValue(String id, MutableVariant result) {
571                 VariableState vs = state.values.get(id);
572                 if (vs==null) return false;
573                 
574                 if (!vs.isValid) return false;
575                 
576                 result.setValue(vs.value.getBinding(), vs.value.getValue());
577                 return true;
578         }
579
580         @Override
581         public void endStep() throws HistoryException {
582                 if (!stepEnabled)
583                         return;
584                 // Write values to streams
585                 for (ActiveStream i : items)
586                 {
587                         try {
588                                 putValueToStream(i);                            
589                                 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
590                                         i.stream.flush();
591                                 }
592                         } catch (AdaptException e) {
593                                 throw new HistoryException( e );
594 //                              FileAccessor fa = (FileAccessor) i.stream;
595 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
596                         } catch (AccessorException e) {
597                                 throw new HistoryException( e );
598 //                              FileAccessor fa = (FileAccessor) i.stream;
599 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
600                         } catch (BindingException e) {
601                                 throw new HistoryException( e );
602 //                              FileAccessor fa = (FileAccessor) i.stream;
603 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
604                         }
605                 }
606         }
607                 
608         private int sampleNum(double time, double interval) {
609                 // 1.0 is needed to cover the case of 0.0
610                 return (int)(1.0 + (time+EPSILON*state.dT)/interval);
611         }
612
613         private boolean isExactInterval(double time, double interval) {
614                 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
615         }
616
617         /**
618          * This method puts a time,value to a stream.
619          * 
620          * The stream is "packed" when the new value is equal to previous, or the
621          * value is within deadband (in comparison to the first value), or if
622          * the interval zone is not exceeded.
623          * 
624          * The stream can be packed two different ways, depending on the sample
625          * format. The sample format can be (a) single value or (b) a time range of
626          * multiple values, Ranged Sample. 
627          * 
628          * Single value is packed by removing middle values, in other words, 
629          * moving the timecode of the last sample.
630          * 
631          * Ranged sample is packed by extending the endTime code of the last sample.
632          * Other fields, such as avg, sum, count, lastValue are also updated.
633          * 
634          * @param i
635          * @throws AdaptException
636          * @throws AccessorException 
637          * @throws BindingException 
638          * @throws HistoryException 
639          */
640         private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
641         {
642                 // Current Time
643                 Variant time = state.time;
644                 double currentTime = Double.NaN;
645                 if (time != null) {
646                         Double x = (Double) time.getValue(Bindings.DOUBLE);
647                         currentTime = x == null ? Double.NaN : x;
648                 }
649
650                 // Is first value
651                 boolean isFirstValue = i.stream.size() == 0;
652                 // Is this item disabled
653                 boolean isDisabled = !i.itemState.enabled; 
654                 // Discontinuety, first value after being disabled
655                 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
656                 
657                 // Update currently disabled status
658                 i.itemState.isDisabled = isDisabled;
659
660                 // Update time codes
661                 if (isDisabled) {
662                         i.itemState.count++;
663                         i.itemState.lastDisabledTime = currentTime;
664                         if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
665                         return;
666                 }
667                 
668                 // Is nan value, for doubles and floats
669                 boolean isNanValue;
670                 // Is valid value
671                 boolean isValidValue;
672                 // Value as variant
673                 MutableVariant value;
674                 // Get current value as double
675                 double currentValueDouble;
676                 
677                 VariableState vs = state.values.get( i.itemState.variableId );                  
678                 if ( vs!=null ) {
679                         isNanValue = vs.isNan;
680                         isValidValue = vs.isValid;
681                         value = vs.value;
682                         if (i.isNumeric && isValidValue && !isNanValue) {
683                                 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
684                                 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
685                                         currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
686                                         Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
687                                         value = new MutableVariant( i.valueBinding, x );
688                                 }
689                         } else {
690                                 currentValueDouble = Double.NaN;
691                         }
692                         
693                 } else {
694                         isNanValue = true;
695                         isValidValue = false;
696                         value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
697                         currentValueDouble = Double.NaN;
698                 }
699                                 
700                 // Prev Value
701                 double prevValueDouble = Double.NaN;
702                 Variant pv = i.itemState.currentValue;
703                 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
704                         prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
705                 } else {
706                         prevValueDouble = Double.NaN;
707                 }
708                 
709                 // The time of the first sample of region
710                 double firstTime = i.itemState.firstTime;
711                 // Prev time
712                 double prevTime = i.itemState.currentTime;
713                 
714                 boolean restep = prevTime == currentTime;
715                 
716                 // There was change in NaN
717                 boolean isNaNChange = i.itemState.isNaN != isNanValue;
718                 
719                 // There was change in validity of the value
720                 boolean isValidChange = i.itemState.isValid != isValidValue;
721                 
722                 //boolean wasValidValue = !i.itemState.isValid && isValidValue;
723                 
724                 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
725                 
726                 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; 
727                 
728                 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
729                                 i.itemState.deadband==Double.MAX_VALUE &&
730                                 i.binding.getComponentIndex("min")>=0 &&
731                                 i.binding.getComponentIndex("max")>=0;
732
733                 // Deadband
734                 if (isValidValue && i.isNumeric && i.itemState.count>=1) {
735                         
736                         if (i.itemState.isValid) {
737                                 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
738                                 double diff = Math.abs( currentValueDouble - firstValueDouble );
739                                 if( hasDeadband )
740                                         i.itemState.ooDeadband |= diff >= i.itemState.deadband;
741                                 else
742                                         i.itemState.ooDeadband |= diff != 0.0;
743                         } else {
744                                 i.itemState.ooDeadband = true;
745                         }
746                         
747                         // NaN change is deadband change
748                         i.itemState.ooDeadband |= isNaNChange;
749                 } else {
750                         // The dead band cannot be have broken in first sample
751                         i.itemState.ooDeadband = false;
752                 }
753
754
755                 // Interval
756                 boolean ooInterval = true;
757                 if ( hasInterval ) {
758                         
759 //                      System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
760
761                         // First band is a special case
762                         if(isFirstValue) {
763                                 // If this is the first sample store the current time as first time
764                                 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
765                                 int sn1 = sampleNum(firstTime,i.itemState.interval);
766                                 int sn2 = sampleNum(currentTime,i.itemState.interval);
767                                 // Wait until next sample if this position is not already exactly at sample border
768                                 // Always start minmax immediately
769                                 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
770                         } else {
771                                 int sn1 = sampleNum(prevTime,i.itemState.interval);
772                                 int sn2 = sampleNum(currentTime,i.itemState.interval);
773 //                              System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
774                                 ooInterval = sn2 > sn1;
775                         }
776
777                         
778 //                      
779 //                      double n = (currentTime+EPSILON) / i.itemState.interval;
780 //                      double n2 = n-Math.floor(n);
781 //                      
782 //                      
783 //                      double deltaTime = currentTime - firstTime;
784 //                      ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
785                 }
786                 
787                 // Should new sample be created, or old sample extended / forwarded in time
788                 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
789                 
790                 // Update item min-max, avg, median
791                 if ( isValidValue && i.isNumeric ) {
792                         if (i.current.hasAvg() && i.isNumeric) {
793                                 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
794                                 if (isValidValue && i.isNumeric) {
795                                         double timeDelta = currentTime - i.itemState.currentTime;                                                       
796                                         i.itemState.sum += timeDelta * currentValueDouble;
797                                 }
798                         }
799
800                         if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
801                                 double deltaTime = currentTime - prevTime;
802                                 i.itemState.median.add( deltaTime, prevValueDouble );
803                         }
804                 }
805
806                 // Increase the counter that tracks the number of samples acquired into the band
807                 if (!restep) i.itemState.count++;
808
809                 // Remember values 
810                 i.itemState.currentTime = currentTime;
811                 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      
812                 i.itemState.isNaN = isNanValue;
813                 i.itemState.isValid = isValidValue;
814                 
815                 // Put discontinuation marker
816                 if ( (wasDisabled && i.current.supportsNullValue())) {
817                         // Extend marker
818                         if (i.current.isNullValue()) {
819                                 if (i.current.hasEndTime()) {
820                                         i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
821                                 }
822                                 
823                                 if (i.current.hasCount()) {
824                                         i.current.setCount( i.current.getCount()+1 );
825                                 }
826                                 
827                                 // Add the band to the file stream
828                                 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
829                         }
830                         
831                         // Add marker
832                         i.current.reset();
833                         i.current.setValueNull();
834                         i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
835                         i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
836                         i.stream.addNoflush( i.binding, i.current.getSample() );
837                 }
838                 
839                 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); 
840
841                 if (i.stream.size() > 0) {
842
843                         /// Extend existing band
844                         // Extend endTime value
845
846                         // This ensures that a value band's end time isn't the same
847                         // as the next band's start time which makes it possible to
848                         // know what are the timestamps between which the item value
849                         // changed from a to b. This allows us to more correctly render
850                         // steps and sloped lines accoring to the actual data.
851                         if (!i.itemState.ooDeadband || isMinMaxFormat) {
852                                 if (!i.hasEndTime) {
853                                         i.current.setTime( time.getBinding(), time.getValue() );
854                                 } else {
855                                         i.current.setEndTime( time.getBinding(), time.getValue() );
856                                 }
857                         }
858
859                         if (i.itemState.ooDeadband || isMinMaxFormat) {
860                                 if (isValidValue) {
861                                         // Set last value
862                                         if (i.current.hasLastValue()) {
863                                                 i.current.setLastValue( value.getBinding(), value.getValue() );
864                                         }
865
866                                         if (i.isNumeric && !isNanValue) {
867                                                 // Add sum
868                                                 if (i.current.hasAvg() && !restep) {
869                                                         double duration = (currentTime - i.itemState.firstTime);
870                                                         if(duration < 1e-9) {
871                                                                 i.current.setAvg( Bindings.DOUBLE, 0.0 );
872                                                         } else {
873                                                                 i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
874                                                         }
875                                                 }
876
877                                                 // Update min-max
878                                                 if (i.current.hasMin()) {
879                                                         Binding minBinding = i.current.getMinBinding();
880                                                         Object prevMinValue = i.current.getMin();
881                                                         Object currentValueWithMinBinding = value.getValue( minBinding );
882                                                         int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
883                                                         if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
884                                                 }
885                                                 if (i.current.hasMax()) {
886                                                         Binding maxBinding = i.current.getMaxBinding();
887                                                         Object prevMaxValue = i.current.getMax();
888                                                         Object currentValueWithMaxBinding = value.getValue( maxBinding );
889                                                         int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
890                                                         if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
891                                                 }
892
893                                                 // Update median
894                                                 if (i.current.hasMedian()) {
895                                                         Double median = i.itemState.median.getMedian();
896                                                         i.current.setMedian( Bindings.DOUBLE, median);
897                                                 }
898                                         }
899                                 } else {
900                                         i.current.setValueNull();
901                                 }
902
903                                 // Update count
904                                 if (i.current.hasCount()) {
905                                         i.current.setCount( i.itemState.count );
906                                 }
907                         }
908
909                         // Write entry to file stream
910                         //              if (i.current.getTimeDouble()<0) 
911                         //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
912                         i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             
913                         //System.err.println(i.itemState.id + " update " + i.current.getSample());
914                 }
915                 
916                 if (condition) {
917                         
918                         // Start a new value band
919                         i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
920                         i.itemState.firstTime = currentTime;
921                         i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
922                         i.itemState.currentTime = currentTime;
923                         i.itemState.isNaN = isNanValue;
924                         i.itemState.isValid = isValidValue;
925                         i.itemState.sum = 0;
926                         i.itemState.count = 1;
927                         i.itemState.ooDeadband = false;
928                         if (i.itemState.median != null) {
929                                 i.itemState.median.clear();
930                                 if (!Double.isNaN(prevValueDouble) ) {
931                                         double deltaTime = currentTime - prevTime;
932                                         i.itemState.median.add( deltaTime, prevValueDouble );
933                                 }
934                         }
935                         
936                         // New Sample
937                         i.current.reset();
938                         
939                         // Start new band with current value & time
940                         i.current.setTime( time.getBinding(), time.getValue() );                
941
942                         // Is valid value
943                         if (i.current.hasEndTime()) {
944                                 i.current.setEndTime( time.getBinding(), time.getValue() );
945                         }
946                         
947                         if (isValidValue) {
948                                 i.current.setValue( value.getBinding(), value.getValue() );
949                                 i.current.setMax( value.getBinding(), value.getValue() );
950                                 i.current.setMin( value.getBinding(), value.getValue() );
951                                 i.current.setLastValue( value.getBinding(), value.getValue() );
952                                 i.current.setAvg( value.getBinding(), value.getValue() );
953                                 i.current.setMedian( value.getBinding(), value.getValue() );                                    
954                         } else {
955                                 i.current.setValueNull();
956                         }
957                         
958                                 
959                         // Sample Count=1 for new bands (time, value) "added"
960                         if (i.current.hasCount()) {
961                                 i.current.setCount( 1 );
962                         }
963                         
964                         // Add the band to the file stream
965                         i.stream.addNoflush( i.binding, i.current.getSample() );
966                         //System.err.println(i.itemState.id + " add " + i.current.getSample());
967                         
968                 }               
969                 
970         }
971
972         @Override
973         public void flush() /*throws HistoryException*/ {
974                 for (ActiveStream i : items) {
975                         try {                                           
976                                 i.stream.flush();
977                         } catch (AccessorException e) {
978                                 logger.log(Level.FINE, "File history flush failed.", e);                        
979 //                              throw new HistoryException(e);
980                         }
981                 }
982         }
983
984         public void flush(Set<String> ids) /*throws HistoryException*/ {
985                 for (ActiveStream i : items) {
986                         if (!ids.contains(i.itemState.id)) continue;
987                         try {                                           
988                                 i.stream.flush();
989                         } catch (AccessorException e) {
990                                 logger.log(Level.FINE, "File history flush failed.", e);                        
991 //                              throw new HistoryException(e);
992                         }
993                 }
994         }
995         
996         
997         @Override
998         public synchronized void close()
999         {
1000                 // Write items back to history
1001                 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
1002                 for (CollectorState.Item itemState : state.itemStates.values()) {
1003                         try {
1004                                 if (!history.exists(itemState.id)) continue;
1005                                 Bean bean = history.getItem(itemState.id);
1006                                 bean.readAvailableFields(itemState);
1007                                 beans.add( setItemState(bean, itemState) );
1008                         } catch (BindingException e) {
1009                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1010                         } catch (BindingConstructionException e) {
1011                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1012                         } catch (HistoryException e) {
1013                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1014                         }
1015                 }
1016                 try {
1017                         history.modify(beans.toArray( new Bean[beans.size()] ));
1018                 } catch (HistoryException e) {
1019                         logger.log(Level.FINE, "Error writing a meta-data to history", e);
1020                 }
1021                 state.itemStates.clear();
1022                 
1023                 // Close streams
1024                 for (ActiveStream item : items) {
1025                         item.close();
1026                 }
1027                 items.clear();
1028         }
1029
1030         public class ActiveStream {
1031
1032                 /** Sample Stream */
1033                 public StreamAccessor stream;
1034                 
1035                 /** Sample binding */
1036                 public RecordBinding binding;
1037                 
1038                 /** Value binding (Generic) */
1039                 public Binding valueBinding;
1040                 
1041                 /** Set true if this item has numeric value type */
1042                 public boolean isNumeric;
1043                 
1044                 /** Set true if this item has end time */
1045                 public boolean hasEndTime;
1046                 
1047                 /** Item entry in subscription state */
1048                 public CollectorState.Item itemState;
1049                                 
1050                 /** Interface to current last sample (in the stream) */
1051                 public ValueBand current;
1052                 
1053                 /** Adapter that converts value to double value, null if not numeric */
1054                 public Adapter valueToDouble, doubleToValue;
1055                 
1056                 void close() {
1057                         try {
1058                                 if (stream != null) { 
1059                                         stream.close();
1060                                 }
1061                                 stream = null;
1062                         } catch (AccessorException e) {
1063                                 logger.log(Level.FINE, "Error closing stream", e);
1064                         }
1065                 }
1066
1067         }
1068
1069         @Override
1070         public StreamAccessor openStream(String itemId, String mode) 
1071         throws HistoryException 
1072         {
1073                 return history.openStream(itemId, mode);
1074         }
1075
1076         @Override
1077         public HistoryManager getHistory() {
1078                 return history;
1079         }
1080
1081 }