1 /*******************************************************************************
2 * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.history.impl;
14 import gnu.trove.set.hash.THashSet;
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.List;
20 import java.util.UUID;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
25 import org.simantics.databoard.Bindings;
26 import org.simantics.databoard.accessor.StreamAccessor;
27 import org.simantics.databoard.accessor.error.AccessorException;
28 import org.simantics.databoard.adapter.AdaptException;
29 import org.simantics.databoard.adapter.Adapter;
30 import org.simantics.databoard.adapter.AdapterConstructionException;
31 import org.simantics.databoard.binding.Binding;
32 import org.simantics.databoard.binding.DoubleBinding;
33 import org.simantics.databoard.binding.FloatBinding;
34 import org.simantics.databoard.binding.NumberBinding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.error.RuntimeBindingException;
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;
40 import org.simantics.databoard.binding.mutable.MutableVariant;
41 import org.simantics.databoard.binding.mutable.Variant;
42 import org.simantics.databoard.binding.reflection.VoidBinding;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.type.Component;
45 import org.simantics.databoard.type.Datatype;
46 import org.simantics.databoard.type.NumberType;
47 import org.simantics.databoard.type.RecordType;
48 import org.simantics.databoard.util.Bean;
49 import org.simantics.history.Collector;
50 import org.simantics.history.HistoryException;
51 import org.simantics.history.HistoryManager;
52 import org.simantics.history.impl.CollectorState.Item;
53 import org.simantics.history.impl.CollectorState.VariableState;
54 import org.simantics.history.util.ValueBand;
55 import org.simantics.history.util.WeightedMedian;
56 import org.simantics.history.util.subscription.SubscriptionItem;
59 * Collector is used to record data to streams in history manager.
61 * For every item there is collector item state {@link CollectorState.Item},
62 * that contains data required in writing process. This state is read and
63 * written to history manager item (bean) as meta-data.
65 * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}.
67 * @author toni.kalajainen
70 public class CollectorImpl implements Collector {
73 static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
75 public static final int MEDIAN_LIMIT = 500;
77 public static final double EPSILON = 1e-2;
79 /** Collector state */
82 /** Open streams, corresponds to subscription items */
83 CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
85 /** Associated history state */
86 HistoryManager history;
88 /** True when all items are open */
89 boolean itemsOpen = true;
91 public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
93 public FlushPolicy getFlushPolicy() {
97 public void setFlushPolicy(FlushPolicy flushPolicy) {
98 this.flushPolicy = flushPolicy;
102 * Create a new collector that is associated with a history manager.
105 * @throws HistoryException
107 public CollectorImpl(HistoryManager history) {
108 this.history = history;
109 this.state = new CollectorState();
111 this.state.id = UUID.randomUUID().toString();
115 * Create a new collector that is associated with a history manager.
118 * @param previousState
119 * @throws HistoryException
121 public CollectorImpl(HistoryManager history, CollectorState previousState) {
122 this.history = history;
123 this.state = (CollectorState) previousState.clone();
127 public synchronized void addItem(Bean item) throws HistoryException {
129 String id = (String) item.getField("id");
132 CollectorState.Item itemState = null;
133 if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);
134 if ( itemState == null ) itemState = state.itemStates.get( id );
135 if ( itemState == null ) {
137 itemState = new CollectorState.Item();
139 itemState.readAvailableFields( item );
140 // Guess values from the existing data
141 if (history.exists(id)) {
142 readItemState(history, item, itemState);
146 addItem(id, itemState);
147 } catch (BindingException e) {
148 throw new HistoryException( e );
152 public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
155 state.itemStates.put(id, itemState);
159 if (state.time.type() instanceof NumberType) {
160 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
161 if ( !Double.isNaN(itemState.currentTime) ) {
162 if ( itemState.currentTime > time ) {
163 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
167 } catch (AdaptException e) {
168 throw new HistoryException( e );
172 public synchronized void addItems(Bean...items) throws HistoryException {
173 for (Bean item : items) addItem(item);
177 public synchronized void removeItem(String id) throws HistoryException {
178 CollectorState.Item itemState = state.itemStates.remove( id );
179 if (itemState!=null && history.exists(id)) {
180 Bean bean = history.getItem(id);
181 bean.readAvailableFields(itemState);
183 bean = setItemState(bean, itemState);
184 history.modify(bean);
185 } catch (BindingException e) {
187 logger.log(Level.FINE, "Failed to update bean", e);
188 } catch (BindingConstructionException e) {
190 logger.log(Level.FINE, "Failed to update bean", e);
191 } catch (HistoryException e) {
192 logger.log(Level.FINE, "Failed to update history item meta-data", e);
196 ActiveStream as = getStream(id);
203 ActiveStream getStream(String id) {
204 for (ActiveStream as : items) {
205 if (as.itemState.id.equals(id)) return as;
211 * Set (or Add) item's configuration.
214 * @throws HistoryException
216 public synchronized void setItem(Bean item) throws HistoryException {
218 String id = (String) item.getField("id");
220 CollectorState.Item itemState = state.itemStates.get( id );
221 if ( itemState==null ) {
225 itemState.readAvailableFields(item);
226 } catch (BindingException e) {
227 throw new HistoryException( e );
233 * Set item state to bean. If the bean does not have field collectorState,
234 * a new bean class is created, and field added.
238 * @return same or new bean
239 * @throws BindingException
240 * @throws BindingConstructionException
242 public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
244 Binding binding = config.getFieldBinding("collectorState");
245 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
246 RecordType rt = config.getBinding().type();
247 RecordType newType = new RecordType();
248 for (int i=0; i<rt.getComponentCount(); i++) {
249 Component c = rt.getComponent(i);
250 if ( c.name.equals("collectorState") ) continue;
251 newType.addComponent(c.name, c.type);
253 newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
254 Binding newBinding = Bindings.getBeanBinding(newType);
255 Bean newConfig = (Bean) newBinding.createDefault();
256 newConfig.readAvailableFields( config );
259 config.setField("collectorState", CollectorState.BINDING_ITEM, state);
263 public SubscriptionItem[] getItems() {
264 int c = state.itemStates.size();
265 SubscriptionItem[] items = new SubscriptionItem[ c ];
267 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
268 for (CollectorState.Item item : state.itemStates.values()) {
269 _HistoryItem hi = new _HistoryItem();
271 hi.readAvailableFields(item);
272 //hi.collectorState = (Item) item.clone();
274 // Workaround (clone doesn't work for WeightedMedian)
276 byte[] data = s.serialize( item );
277 hi.collectorState = (Item) s.deserialize(data);
278 } catch (IOException e) {
286 public static class _HistoryItem extends SubscriptionItem {
287 public CollectorState.Item collectorState;
291 * Creates and loads active item. TimeSeries stream file is opened.
294 * @return new active item
295 * @throws HistoryException
297 ActiveStream openStream(CollectorState.Item itemState) throws HistoryException
300 ActiveStream x = new ActiveStream();
301 x.itemState = itemState;
305 if ( !history.exists(x.itemState.id) ) {
306 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
309 x.stream = history.openStream(x.itemState.id, "rw");
313 Datatype sampleType = x.stream.type().componentType;
314 itemState.format = sampleType;
316 x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
317 } catch ( RuntimeException e ) {
318 System.err.println("Could not create bean binding for type "+sampleType);
319 x.binding = (RecordBinding) Bindings.getBinding( sampleType );
323 x.valueBinding = x.binding.getComponentBinding("value");
324 x.isNumeric = x.valueBinding instanceof NumberBinding;
325 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
330 x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
331 x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
332 } catch (AdapterConstructionException e1) {
333 x.valueToDouble = null;
336 // Create valueband, and read last entry into memory
338 Object currentEntry = x.binding.createDefault();
339 if (x.stream.size() >= 1) {
340 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
342 x.current = new ValueBand(x.binding);
343 x.current.setSample(currentEntry);
344 } catch (AccessorException e) {
345 throw new HistoryException(
346 "Could not read sample from stream file, " + e, e);
347 } catch (BindingException e) {
348 throw new HistoryException(
349 "Could not read sample from stream file, " + e, e);
353 x.itemState = itemState;
356 if (x.isNumeric && x.current.hasMedian()
357 && x.itemState.median == null) {
358 x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
365 * Read itemState from history. Item must exist.
367 * @param si subscription item
369 * @throws RuntimeBindingException
370 * @throws HistoryException
372 static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
374 item.readAvailableFields(si);
375 // Read state from items
376 StreamAccessor sa = history.openStream(item.id, "r");
378 Datatype sampleType = sa.type().componentType;
380 Binding sampleBinding = Bindings.getBinding(sampleType);
381 if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
382 Object sample = sampleBinding.createDefaultUnchecked();
383 ValueBand v = new ValueBand(sampleBinding, sample);
384 item.count = sa.size();
385 if ( item.count>0 ) {
386 sa.get(0, sampleBinding, sample);
387 item.firstTime = v.getTimeDouble();
388 Binding b = v.getValueBinding();
389 Object value = v.getValue(b);
390 item.firstValue.setValue(b, value);
392 sa.get(item.count-1, sampleBinding, sample);
393 item.currentTime = v.getTimeDouble();
394 b = v.getValueBinding();
395 value = v.getValue(b);
396 item.currentValue.setValue(b, value);
398 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
399 DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
400 value = item.currentValue.getValue(db);
401 item.isNaN = Double.isNaN( (Double) value );
405 } catch (AccessorException e) {
406 throw new HistoryException( e );
407 } catch (AdaptException e) {
408 throw new HistoryException( e );
410 try { sa.close(); } catch (AccessorException e) {}
415 public CollectorState getState() {
416 return (CollectorState) state.clone();
420 public void setState(Bean newState) {
421 if (newState == null)
424 state = new CollectorState();
427 state.readAvailableFields(newState);
429 // Make sure that openAllItems ensures that all streams are opened properly.
430 // This is because setItem will not invoke addItem if collectorState exists
431 // for an item and therefore itemsOpen might not otherwise be set to false.
436 public Object getTime(Binding binding) throws HistoryException {
437 MutableVariant v = state.time;
438 if (v==null) return null;
440 return v.getValue(binding);
441 } catch (AdaptException e) {
442 throw new HistoryException(e);
447 public Object getValue(String id, Binding binding) throws HistoryException {
448 VariableState vs = state.values.get(id);
449 if (vs==null) return null;
451 if (!vs.isValid) return null;
454 return vs.value.getValue(binding);
455 } catch (AdaptException e) {
456 throw new HistoryException(e);
460 void openAllItems() throws HistoryException {
461 if (itemsOpen) return;
463 // Assert all items are open
464 // 1. Collect all items that still need to be opened.
465 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
466 for (ActiveStream as : items)
467 itemsToOpen.remove(as.itemState.id);
469 // 2. Open missing streams
470 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
471 for (String itemId : itemsToOpen) {
472 CollectorState.Item item = state.itemStates.get( itemId );
473 ActiveStream as = openStream( item );
477 // 3. Update list of open streams
478 items.addAll(opened);
482 private void updateDt(NumberBinding binding, Object time) {
484 Double current = (Double)getTime(Bindings.DOUBLE);
485 if(current != null) {
486 double t = binding.getValue(time).doubleValue();
487 state.dT = t-current;
488 if(state.dT > 0) return;
490 } catch (Throwable t) {
496 public void beginStep(NumberBinding binding, Object time) throws HistoryException {
498 updateDt(binding, time);
499 state.time.setValue(binding, time);
503 public void setValue(String id, Binding binding, Object value) throws HistoryException {
504 VariableState vs = state.values.get( id );
506 vs = new VariableState();
507 vs.value = new MutableVariant(binding, value);
508 state.values.put(id, vs);
512 vs.value.setValue(VoidBinding.VOID_BINDING, null);
518 vs.value.setValue(binding, value);
521 if (binding instanceof DoubleBinding) {
522 DoubleBinding db = (DoubleBinding) binding;
524 double _v = db.getValue_(value);
525 vs.isNan = Double.isNaN( _v );
526 } catch (BindingException e) {
530 } else if (binding instanceof FloatBinding) {
531 FloatBinding db = (FloatBinding) binding;
533 float _v = db.getValue_(value);
534 vs.isNan = Float.isNaN( _v );
535 } catch (BindingException e) {
545 public boolean getValue(String id, MutableVariant result) {
546 VariableState vs = state.values.get(id);
547 if (vs==null) return false;
549 if (!vs.isValid) return false;
551 result.setValue(vs.value.getBinding(), vs.value.getValue());
556 public void endStep() throws HistoryException {
557 // Write values to streams
558 for (ActiveStream i : items)
562 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
565 } catch (AdaptException e) {
566 throw new HistoryException( e );
567 // FileAccessor fa = (FileAccessor) i.stream;
568 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
569 } catch (AccessorException e) {
570 throw new HistoryException( e );
571 // FileAccessor fa = (FileAccessor) i.stream;
572 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
573 } catch (BindingException e) {
574 throw new HistoryException( e );
575 // FileAccessor fa = (FileAccessor) i.stream;
576 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
581 private int sampleNum(double time, double interval) {
582 // 1.0 is needed to cover the case of 0.0
583 return (int)(1.0 + (time+EPSILON*state.dT)/interval);
586 private boolean isExactInterval(double time, double interval) {
587 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
591 * This method puts a time,value to a stream.
593 * The stream is "packed" when the new value is equal to previous, or the
594 * value is within deadband (in comparison to the first value), or if
595 * the interval zone is not exceeded.
597 * The stream can be packed two different ways, depending on the sample
598 * format. The sample format can be (a) single value or (b) a time range of
599 * multiple values, Ranged Sample.
601 * Single value is packed by removing middle values, in other words,
602 * moving the timecode of the last sample.
604 * Ranged sample is packed by extending the endTime code of the last sample.
605 * Other fields, such as avg, sum, count, lastValue are also updated.
608 * @throws AdaptException
609 * @throws AccessorException
610 * @throws BindingException
611 * @throws HistoryException
613 private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
616 Variant time = state.time;
617 double currentTime = Double.NaN;
619 Double x = (Double) time.getValue(Bindings.DOUBLE);
620 currentTime = x == null ? Double.NaN : x;
624 boolean isFirstValue = i.stream.size() == 0;
625 // Is this item disabled
626 boolean isDisabled = !i.itemState.enabled;
627 // Discontinuety, first value after being disabled
628 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
630 // Update currently disabled status
631 i.itemState.isDisabled = isDisabled;
636 i.itemState.lastDisabledTime = currentTime;
637 if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
641 // Is nan value, for doubles and floats
644 boolean isValidValue;
646 MutableVariant value;
647 // Get current value as double
648 double currentValueDouble;
650 VariableState vs = state.values.get( i.itemState.variableId );
652 isNanValue = vs.isNan;
653 isValidValue = vs.isValid;
655 if (i.isNumeric && isValidValue && !isNanValue) {
656 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
657 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
658 currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
659 Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
660 value = new MutableVariant( i.valueBinding, x );
663 currentValueDouble = Double.NaN;
668 isValidValue = false;
669 value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
670 currentValueDouble = Double.NaN;
674 double prevValueDouble = Double.NaN;
675 Variant pv = i.itemState.currentValue;
676 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
677 prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
679 prevValueDouble = Double.NaN;
682 // The time of the first sample of region
683 double firstTime = i.itemState.firstTime;
685 double prevTime = i.itemState.currentTime;
687 boolean restep = prevTime == currentTime;
689 // There was change in NaN
690 boolean isNaNChange = i.itemState.isNaN != isNanValue;
692 // There was change in validity of the value
693 boolean isValidChange = i.itemState.isValid != isValidValue;
695 //boolean wasValidValue = !i.itemState.isValid && isValidValue;
697 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
699 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric;
701 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
702 i.itemState.deadband==Double.MAX_VALUE &&
703 i.binding.getComponentIndex("min")>=0 &&
704 i.binding.getComponentIndex("max")>=0;
707 if (isValidValue && i.isNumeric && i.itemState.count>=1) {
709 if (i.itemState.isValid) {
710 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
711 double diff = Math.abs( currentValueDouble - firstValueDouble );
713 i.itemState.ooDeadband |= diff >= i.itemState.deadband;
715 i.itemState.ooDeadband |= diff != 0.0;
717 i.itemState.ooDeadband = true;
720 // NaN change is deadband change
721 i.itemState.ooDeadband |= isNaNChange;
723 // The dead band cannot be have broken in first sample
724 i.itemState.ooDeadband = false;
729 boolean ooInterval = true;
732 // System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
734 // First band is a special case
736 // If this is the first sample store the current time as first time
737 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
738 int sn1 = sampleNum(firstTime,i.itemState.interval);
739 int sn2 = sampleNum(currentTime,i.itemState.interval);
740 // Wait until next sample if this position is not already exactly at sample border
741 // Always start minmax immediately
742 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
744 int sn1 = sampleNum(prevTime,i.itemState.interval);
745 int sn2 = sampleNum(currentTime,i.itemState.interval);
746 // System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
747 ooInterval = sn2 > sn1;
752 // double n = (currentTime+EPSILON) / i.itemState.interval;
753 // double n2 = n-Math.floor(n);
756 // double deltaTime = currentTime - firstTime;
757 // ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
760 // Should new sample be created, or old sample extended / forwarded in time
761 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
763 // Update item min-max, avg, median
764 if ( isValidValue && i.isNumeric ) {
765 if (i.current.hasAvg() && i.isNumeric) {
766 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
767 if (isValidValue && i.isNumeric) {
768 double timeDelta = currentTime - i.itemState.currentTime;
769 i.itemState.sum += timeDelta * currentValueDouble;
773 if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
774 double deltaTime = currentTime - prevTime;
775 i.itemState.median.add( deltaTime, prevValueDouble );
779 // Increase the counter that tracks the number of samples acquired into the band
780 if (!restep) i.itemState.count++;
783 i.itemState.currentTime = currentTime;
784 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
785 i.itemState.isNaN = isNanValue;
786 i.itemState.isValid = isValidValue;
788 // Put discontinuation marker
789 if ( (wasDisabled && i.current.supportsNullValue())) {
791 if (i.current.isNullValue()) {
792 if (i.current.hasEndTime()) {
793 i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
796 if (i.current.hasCount()) {
797 i.current.setCount( i.current.getCount()+1 );
800 // Add the band to the file stream
801 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
806 i.current.setValueNull();
807 i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
808 i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
809 i.stream.addNoflush( i.binding, i.current.getSample() );
812 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2);
814 if(i.stream.size() > 0 && !i.itemState.ooDeadband) {
816 /// Extend existing band
817 // Extend endTime value
819 i.current.setTime( time.getBinding(), time.getValue() );
821 i.current.setEndTime( time.getBinding(), time.getValue() );
826 if (i.current.hasLastValue()) {
827 i.current.setLastValue( value.getBinding(), value.getValue() );
831 if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {
832 double duration = (currentTime - i.itemState.firstTime);
833 if(duration < 1e-9) {
834 i.current.setAvg( Bindings.DOUBLE, 0.0 );
836 i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
841 if (i.current.hasMin() && i.isNumeric && !isNanValue) {
842 Binding minBinding = i.current.getMinBinding();
843 Object prevMinValue = i.current.getMin();
844 Object currentValueWithMinBinding = value.getValue( minBinding );
845 int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
846 if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
848 if (i.current.hasMax() && i.isNumeric && !isNanValue) {
849 Binding maxBinding = i.current.getMaxBinding();
850 Object prevMaxValue = i.current.getMax();
851 Object currentValueWithMaxBinding = value.getValue( maxBinding );
852 int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
853 if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
857 if (i.current.hasMedian() && i.isNumeric && !isNanValue) {
858 Double median = i.itemState.median.getMedian();
859 i.current.setMedian( Bindings.DOUBLE, median);
862 i.current.setValueNull();
866 if (i.current.hasCount()) {
867 i.current.setCount( i.itemState.count );
870 // Write entry to file stream
871 // if (i.current.getTimeDouble()<0)
872 // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
873 i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );
878 // Start a new value band
879 i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
880 i.itemState.firstTime = currentTime;
881 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
882 i.itemState.currentTime = currentTime;
883 i.itemState.isNaN = isNanValue;
884 i.itemState.isValid = isValidValue;
886 i.itemState.count = 1;
887 i.itemState.ooDeadband = false;
888 if (i.itemState.median != null) {
889 i.itemState.median.clear();
890 if (!Double.isNaN(prevValueDouble) ) {
891 double deltaTime = currentTime - prevTime;
892 i.itemState.median.add( deltaTime, prevValueDouble );
899 // Start new band with current value & time
900 i.current.setTime( time.getBinding(), time.getValue() );
903 if (i.current.hasEndTime()) {
904 i.current.setEndTime( time.getBinding(), time.getValue() );
908 i.current.setValue( value.getBinding(), value.getValue() );
909 i.current.setMax( value.getBinding(), value.getValue() );
910 i.current.setMin( value.getBinding(), value.getValue() );
911 i.current.setLastValue( value.getBinding(), value.getValue() );
912 i.current.setAvg( value.getBinding(), value.getValue() );
913 i.current.setMedian( value.getBinding(), value.getValue() );
915 i.current.setValueNull();
919 // Sample Count=1 for new bands (time, value) "added"
920 if (i.current.hasCount()) {
921 i.current.setCount( 1 );
924 // Add the band to the file stream
925 i.stream.addNoflush( i.binding, i.current.getSample() );
932 public void flush() /*throws HistoryException*/ {
933 for (ActiveStream i : items) {
936 } catch (AccessorException e) {
937 logger.log(Level.FINE, "File history flush failed.", e);
938 // throw new HistoryException(e);
943 public void flush(Set<String> ids) /*throws HistoryException*/ {
944 for (ActiveStream i : items) {
945 if (!ids.contains(i.itemState.id)) continue;
948 } catch (AccessorException e) {
949 logger.log(Level.FINE, "File history flush failed.", e);
950 // throw new HistoryException(e);
957 public synchronized void close()
959 // Write items back to history
960 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
961 for (CollectorState.Item itemState : state.itemStates.values()) {
963 if (!history.exists(itemState.id)) continue;
964 Bean bean = history.getItem(itemState.id);
965 bean.readAvailableFields(itemState);
966 beans.add( setItemState(bean, itemState) );
967 } catch (BindingException e) {
968 logger.log(Level.FINE, "Error writing a meta-data to history", e);
969 } catch (BindingConstructionException e) {
970 logger.log(Level.FINE, "Error writing a meta-data to history", e);
971 } catch (HistoryException e) {
972 logger.log(Level.FINE, "Error writing a meta-data to history", e);
976 history.modify(beans.toArray( new Bean[beans.size()] ));
977 } catch (HistoryException e) {
978 logger.log(Level.FINE, "Error writing a meta-data to history", e);
980 state.itemStates.clear();
983 for (ActiveStream item : items) {
989 public class ActiveStream {
992 public StreamAccessor stream;
994 /** Sample binding */
995 public RecordBinding binding;
997 /** Value binding (Generic) */
998 public Binding valueBinding;
1000 /** Set true if this item has numeric value type */
1001 public boolean isNumeric;
1003 /** Set true if this item has end time */
1004 public boolean hasEndTime;
1006 /** Item entry in subscription state */
1007 public CollectorState.Item itemState;
1009 /** Interface to current last sample (in the stream) */
1010 public ValueBand current;
1012 /** Adapter that converts value to double value, null if not numeric */
1013 public Adapter valueToDouble, doubleToValue;
1017 if (stream != null) {
1021 } catch (AccessorException e) {
1022 logger.log(Level.FINE, "Error closing stream", e);
1029 public StreamAccessor openStream(String itemId, String mode)
1030 throws HistoryException
1032 return history.openStream(itemId, mode);
1036 public HistoryManager getHistory() {