1 /*******************************************************************************
2 * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.history.impl;
14 import gnu.trove.set.hash.THashSet;
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.List;
20 import java.util.UUID;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.logging.Level;
23 import java.util.logging.Logger;
25 import org.simantics.databoard.Bindings;
26 import org.simantics.databoard.accessor.StreamAccessor;
27 import org.simantics.databoard.accessor.error.AccessorException;
28 import org.simantics.databoard.adapter.AdaptException;
29 import org.simantics.databoard.adapter.Adapter;
30 import org.simantics.databoard.adapter.AdapterConstructionException;
31 import org.simantics.databoard.binding.Binding;
32 import org.simantics.databoard.binding.DoubleBinding;
33 import org.simantics.databoard.binding.FloatBinding;
34 import org.simantics.databoard.binding.NumberBinding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.error.RuntimeBindingException;
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;
40 import org.simantics.databoard.binding.mutable.MutableVariant;
41 import org.simantics.databoard.binding.mutable.Variant;
42 import org.simantics.databoard.binding.reflection.VoidBinding;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.type.Component;
45 import org.simantics.databoard.type.Datatype;
46 import org.simantics.databoard.type.NumberType;
47 import org.simantics.databoard.type.RecordType;
48 import org.simantics.databoard.util.Bean;
49 import org.simantics.history.Collector;
50 import org.simantics.history.HistoryException;
51 import org.simantics.history.HistoryManager;
52 import org.simantics.history.impl.CollectorState.Item;
53 import org.simantics.history.impl.CollectorState.VariableState;
54 import org.simantics.history.util.ValueBand;
55 import org.simantics.history.util.WeightedMedian;
56 import org.simantics.history.util.subscription.SubscriptionItem;
59 * Collector is used to record data to streams in history manager.
61 * For every item there is collector item state {@link CollectorState.Item},
62 * that contains data required in writing process. This state is read and
63 * written to history manager item (bean) as meta-data.
65 * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}.
67 * @author toni.kalajainen
70 public class CollectorImpl implements Collector {
73 static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
75 public static final int MEDIAN_LIMIT = 500;
77 public static final double EPSILON = 1e-2;
79 /** Collector state */
82 /** Open streams, corresponds to subscription items */
83 CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
85 /** Associated history state */
86 HistoryManager history;
88 /** True when all items are open */
89 boolean itemsOpen = true;
91 /** True when history collection will write data to disk */
92 boolean enabled = true;
95 * Transient info only set in {@link #beginStep(NumberBinding, Object)} based on
98 private transient boolean stepEnabled = true;
100 public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
102 public FlushPolicy getFlushPolicy() {
106 public void setFlushPolicy(FlushPolicy flushPolicy) {
107 this.flushPolicy = flushPolicy;
111 * Create a new collector that is associated with a history manager.
114 * @throws HistoryException
116 public CollectorImpl(HistoryManager history) {
117 this.history = history;
118 this.state = new CollectorState();
120 this.state.id = UUID.randomUUID().toString();
124 * Create a new collector that is associated with a history manager.
127 * @param previousState
128 * @throws HistoryException
130 public CollectorImpl(HistoryManager history, CollectorState previousState) {
131 this.history = history;
132 this.state = (CollectorState) previousState.clone();
135 public void setEnabled(boolean enabled) {
136 this.enabled = enabled;
140 public boolean isEnabled() {
145 public synchronized void addItem(Bean item) throws HistoryException {
147 String id = (String) item.getField("id");
150 CollectorState.Item itemState = null;
151 if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);
152 if ( itemState == null ) itemState = state.itemStates.get( id );
153 if ( itemState == null ) {
155 itemState = new CollectorState.Item();
157 itemState.readAvailableFields( item );
158 // Guess values from the existing data
159 if (history.exists(id)) {
160 readItemState(history, item, itemState);
164 addItem(id, itemState);
165 } catch (BindingException e) {
166 throw new HistoryException( e );
170 public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
173 state.itemStates.put(id, itemState);
177 if (state.time.type() instanceof NumberType) {
178 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
179 if ( !Double.isNaN(itemState.currentTime) ) {
180 if ( itemState.currentTime > time ) {
181 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
185 } catch (AdaptException e) {
186 throw new HistoryException( e );
190 public synchronized void addItems(Bean...items) throws HistoryException {
191 for (Bean item : items) addItem(item);
195 public synchronized void removeItem(String id) throws HistoryException {
196 CollectorState.Item itemState = state.itemStates.remove( id );
197 if (itemState!=null && history.exists(id)) {
198 Bean bean = history.getItem(id);
199 bean.readAvailableFields(itemState);
201 bean = setItemState(bean, itemState);
202 history.modify(bean);
203 } catch (BindingException e) {
205 logger.log(Level.FINE, "Failed to update bean", e);
206 } catch (BindingConstructionException e) {
208 logger.log(Level.FINE, "Failed to update bean", e);
209 } catch (HistoryException e) {
210 logger.log(Level.FINE, "Failed to update history item meta-data", e);
214 ActiveStream as = getStream(id);
221 ActiveStream getStream(String id) {
222 for (ActiveStream as : items) {
223 if (as.itemState.id.equals(id)) return as;
229 * Set (or Add) item's configuration.
232 * @throws HistoryException
234 public synchronized void setItem(Bean item) throws HistoryException {
236 String id = (String) item.getField("id");
238 CollectorState.Item itemState = state.itemStates.get( id );
239 if ( itemState==null ) {
243 itemState.readAvailableFields(item);
244 } catch (BindingException e) {
245 throw new HistoryException( e );
251 * Set item state to bean. If the bean does not have field collectorState,
252 * a new bean class is created, and field added.
256 * @return same or new bean
257 * @throws BindingException
258 * @throws BindingConstructionException
260 public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
262 Binding binding = config.getFieldBinding("collectorState");
263 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
264 RecordType rt = config.getBinding().type();
265 RecordType newType = new RecordType();
266 for (int i=0; i<rt.getComponentCount(); i++) {
267 Component c = rt.getComponent(i);
268 if ( c.name.equals("collectorState") ) continue;
269 newType.addComponent(c.name, c.type);
271 newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
272 Binding newBinding = Bindings.getBeanBinding(newType);
273 Bean newConfig = (Bean) newBinding.createDefault();
274 newConfig.readAvailableFields( config );
277 config.setField("collectorState", CollectorState.BINDING_ITEM, state);
281 public SubscriptionItem[] getItems() {
282 int c = state.itemStates.size();
283 SubscriptionItem[] items = new SubscriptionItem[ c ];
285 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
286 for (CollectorState.Item item : state.itemStates.values()) {
287 _HistoryItem hi = new _HistoryItem();
289 hi.readAvailableFields(item);
290 //hi.collectorState = (Item) item.clone();
292 // Workaround (clone doesn't work for WeightedMedian)
294 byte[] data = s.serialize( item );
295 hi.collectorState = (Item) s.deserialize(data);
296 } catch (IOException e) {
304 public static class _HistoryItem extends SubscriptionItem {
305 public CollectorState.Item collectorState;
309 * Creates and loads active item. TimeSeries stream file is opened.
312 * @return new active item
313 * @throws HistoryException
315 ActiveStream openStream(CollectorState.Item itemState) throws HistoryException
318 ActiveStream x = new ActiveStream();
319 x.itemState = itemState;
323 if ( !history.exists(x.itemState.id) ) {
324 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
327 x.stream = history.openStream(x.itemState.id, "rw");
331 Datatype sampleType = x.stream.type().componentType;
332 itemState.format = sampleType;
334 x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
335 } catch ( RuntimeException e ) {
336 System.err.println("Could not create bean binding for type "+sampleType);
337 x.binding = (RecordBinding) Bindings.getBinding( sampleType );
341 x.valueBinding = x.binding.getComponentBinding("value");
342 x.isNumeric = x.valueBinding instanceof NumberBinding;
343 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
348 x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
349 x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
350 } catch (AdapterConstructionException e1) {
351 x.valueToDouble = null;
354 // Create valueband, and read last entry into memory
356 Object currentEntry = x.binding.createDefault();
357 if (x.stream.size() >= 1) {
358 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
360 x.current = new ValueBand(x.binding);
361 x.current.setSample(currentEntry);
362 } catch (AccessorException e) {
363 throw new HistoryException(
364 "Could not read sample from stream file, " + e, e);
365 } catch (BindingException e) {
366 throw new HistoryException(
367 "Could not read sample from stream file, " + e, e);
371 x.itemState = itemState;
374 if (x.isNumeric && x.current.hasMedian()
375 && x.itemState.median == null) {
376 x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
383 * Read itemState from history. Item must exist.
385 * @param si subscription item
387 * @throws RuntimeBindingException
388 * @throws HistoryException
390 static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
392 item.readAvailableFields(si);
393 // Read state from items
394 StreamAccessor sa = history.openStream(item.id, "r");
396 Datatype sampleType = sa.type().componentType;
398 Binding sampleBinding = Bindings.getBinding(sampleType);
399 if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
400 Object sample = sampleBinding.createDefaultUnchecked();
401 ValueBand v = new ValueBand(sampleBinding, sample);
402 item.count = sa.size();
403 if ( item.count>0 ) {
404 sa.get(0, sampleBinding, sample);
405 item.firstTime = v.getTimeDouble();
406 Binding b = v.getValueBinding();
407 Object value = v.getValue(b);
408 item.firstValue.setValue(b, value);
410 sa.get(item.count-1, sampleBinding, sample);
411 item.currentTime = v.getTimeDouble();
412 b = v.getValueBinding();
413 value = v.getValue(b);
414 item.currentValue.setValue(b, value);
416 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
417 DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
418 value = item.currentValue.getValue(db);
419 item.isNaN = Double.isNaN( (Double) value );
423 } catch (AccessorException e) {
424 throw new HistoryException( e );
425 } catch (AdaptException e) {
426 throw new HistoryException( e );
428 try { sa.close(); } catch (AccessorException e) {}
433 public CollectorState getState() {
434 return (CollectorState) state.clone();
438 public void setState(Bean newState) {
439 if (newState == null)
442 state = new CollectorState();
445 state.readAvailableFields(newState);
447 // Make sure that openAllItems ensures that all streams are opened properly.
448 // This is because setItem will not invoke addItem if collectorState exists
449 // for an item and therefore itemsOpen might not otherwise be set to false.
454 public Object getTime(Binding binding) throws HistoryException {
455 MutableVariant v = state.time;
456 if (v==null) return null;
458 return v.getValue(binding);
459 } catch (AdaptException e) {
460 throw new HistoryException(e);
465 public Object getValue(String id, Binding binding) throws HistoryException {
466 VariableState vs = state.values.get(id);
467 if (vs==null) return null;
469 if (!vs.isValid) return null;
472 return vs.value.getValue(binding);
473 } catch (AdaptException e) {
474 throw new HistoryException(e);
478 void openAllItems() throws HistoryException {
479 if (itemsOpen) return;
481 // Assert all items are open
482 // 1. Collect all items that still need to be opened.
483 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
484 for (ActiveStream as : items)
485 itemsToOpen.remove(as.itemState.id);
487 // 2. Open missing streams
488 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
489 for (String itemId : itemsToOpen) {
490 CollectorState.Item item = state.itemStates.get( itemId );
491 ActiveStream as = openStream( item );
495 // 3. Update list of open streams
496 items.addAll(opened);
500 private void updateDt(NumberBinding binding, Object time) {
502 Double current = (Double)getTime(Bindings.DOUBLE);
503 if(current != null) {
504 double t = binding.getValue(time).doubleValue();
505 state.dT = t-current;
506 if(state.dT > 0) return;
508 } catch (Throwable t) {
514 public void beginStep(NumberBinding binding, Object time) throws HistoryException {
515 stepEnabled = enabled;
516 updateDt(binding, time);
517 state.time.setValue(binding, time);
524 public void setValue(String id, Binding binding, Object value) throws HistoryException {
525 // Short-circuit calls when collection not enabled
529 VariableState vs = state.values.get( id );
531 vs = new VariableState();
532 vs.value = new MutableVariant(binding, value);
533 state.values.put(id, vs);
537 vs.value.setValue(VoidBinding.VOID_BINDING, null);
543 vs.value.setValue(binding, value);
546 if (binding instanceof DoubleBinding) {
547 DoubleBinding db = (DoubleBinding) binding;
549 double _v = db.getValue_(value);
550 vs.isNan = Double.isNaN( _v );
551 } catch (BindingException e) {
555 } else if (binding instanceof FloatBinding) {
556 FloatBinding db = (FloatBinding) binding;
558 float _v = db.getValue_(value);
559 vs.isNan = Float.isNaN( _v );
560 } catch (BindingException e) {
570 public boolean getValue(String id, MutableVariant result) {
571 VariableState vs = state.values.get(id);
572 if (vs==null) return false;
574 if (!vs.isValid) return false;
576 result.setValue(vs.value.getBinding(), vs.value.getValue());
581 public void endStep() throws HistoryException {
584 // Write values to streams
585 for (ActiveStream i : items)
589 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
592 } catch (AdaptException e) {
593 throw new HistoryException( e );
594 // FileAccessor fa = (FileAccessor) i.stream;
595 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
596 } catch (AccessorException e) {
597 throw new HistoryException( e );
598 // FileAccessor fa = (FileAccessor) i.stream;
599 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
600 } catch (BindingException e) {
601 throw new HistoryException( e );
602 // FileAccessor fa = (FileAccessor) i.stream;
603 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
608 private int sampleNum(double time, double interval) {
609 // 1.0 is needed to cover the case of 0.0
610 return (int)(1.0 + (time+EPSILON*state.dT)/interval);
613 private boolean isExactInterval(double time, double interval) {
614 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
618 * This method puts a time,value to a stream.
620 * The stream is "packed" when the new value is equal to previous, or the
621 * value is within deadband (in comparison to the first value), or if
622 * the interval zone is not exceeded.
624 * The stream can be packed two different ways, depending on the sample
625 * format. The sample format can be (a) single value or (b) a time range of
626 * multiple values, Ranged Sample.
628 * Single value is packed by removing middle values, in other words,
629 * moving the timecode of the last sample.
631 * Ranged sample is packed by extending the endTime code of the last sample.
632 * Other fields, such as avg, sum, count, lastValue are also updated.
635 * @throws AdaptException
636 * @throws AccessorException
637 * @throws BindingException
638 * @throws HistoryException
640 private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
643 Variant time = state.time;
644 double currentTime = Double.NaN;
646 Double x = (Double) time.getValue(Bindings.DOUBLE);
647 currentTime = x == null ? Double.NaN : x;
651 boolean isFirstValue = i.stream.size() == 0;
652 // Is this item disabled
653 boolean isDisabled = !i.itemState.enabled;
654 // Discontinuety, first value after being disabled
655 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
657 // Update currently disabled status
658 i.itemState.isDisabled = isDisabled;
663 i.itemState.lastDisabledTime = currentTime;
664 if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
668 // Is nan value, for doubles and floats
671 boolean isValidValue;
673 MutableVariant value;
674 // Get current value as double
675 double currentValueDouble;
677 VariableState vs = state.values.get( i.itemState.variableId );
679 isNanValue = vs.isNan;
680 isValidValue = vs.isValid;
682 if (i.isNumeric && isValidValue && !isNanValue) {
683 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
684 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
685 currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
686 Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
687 value = new MutableVariant( i.valueBinding, x );
690 currentValueDouble = Double.NaN;
695 isValidValue = false;
696 value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
697 currentValueDouble = Double.NaN;
701 double prevValueDouble = Double.NaN;
702 Variant pv = i.itemState.currentValue;
703 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
704 prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
706 prevValueDouble = Double.NaN;
709 // The time of the first sample of region
710 double firstTime = i.itemState.firstTime;
712 double prevTime = i.itemState.currentTime;
714 boolean restep = prevTime == currentTime;
716 // There was change in NaN
717 boolean isNaNChange = i.itemState.isNaN != isNanValue;
719 // There was change in validity of the value
720 boolean isValidChange = i.itemState.isValid != isValidValue;
722 //boolean wasValidValue = !i.itemState.isValid && isValidValue;
724 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
726 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric;
728 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
729 i.itemState.deadband==Double.MAX_VALUE &&
730 i.binding.getComponentIndex("min")>=0 &&
731 i.binding.getComponentIndex("max")>=0;
734 if (isValidValue && i.isNumeric && i.itemState.count>=1) {
736 if (i.itemState.isValid) {
737 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
738 double diff = Math.abs( currentValueDouble - firstValueDouble );
740 i.itemState.ooDeadband |= diff >= i.itemState.deadband;
742 i.itemState.ooDeadband |= diff != 0.0;
744 i.itemState.ooDeadband = true;
747 // NaN change is deadband change
748 i.itemState.ooDeadband |= isNaNChange;
750 // The dead band cannot be have broken in first sample
751 i.itemState.ooDeadband = false;
756 boolean ooInterval = true;
759 // System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
761 // First band is a special case
763 // If this is the first sample store the current time as first time
764 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
765 int sn1 = sampleNum(firstTime,i.itemState.interval);
766 int sn2 = sampleNum(currentTime,i.itemState.interval);
767 // Wait until next sample if this position is not already exactly at sample border
768 // Always start minmax immediately
769 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
771 int sn1 = sampleNum(prevTime,i.itemState.interval);
772 int sn2 = sampleNum(currentTime,i.itemState.interval);
773 // System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
774 ooInterval = sn2 > sn1;
779 // double n = (currentTime+EPSILON) / i.itemState.interval;
780 // double n2 = n-Math.floor(n);
783 // double deltaTime = currentTime - firstTime;
784 // ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
787 // Should new sample be created, or old sample extended / forwarded in time
788 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
790 // Update item min-max, avg, median
791 if ( isValidValue && i.isNumeric ) {
792 if (i.current.hasAvg() && i.isNumeric) {
793 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
794 if (isValidValue && i.isNumeric) {
795 double timeDelta = currentTime - i.itemState.currentTime;
796 i.itemState.sum += timeDelta * currentValueDouble;
800 if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
801 double deltaTime = currentTime - prevTime;
802 i.itemState.median.add( deltaTime, prevValueDouble );
806 // Increase the counter that tracks the number of samples acquired into the band
807 if (!restep) i.itemState.count++;
810 i.itemState.currentTime = currentTime;
811 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
812 i.itemState.isNaN = isNanValue;
813 i.itemState.isValid = isValidValue;
815 // Put discontinuation marker
816 if ( (wasDisabled && i.current.supportsNullValue())) {
818 if (i.current.isNullValue()) {
819 if (i.current.hasEndTime()) {
820 i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
823 if (i.current.hasCount()) {
824 i.current.setCount( i.current.getCount()+1 );
827 // Add the band to the file stream
828 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
833 i.current.setValueNull();
834 i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
835 i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
836 i.stream.addNoflush( i.binding, i.current.getSample() );
839 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2);
841 if (i.stream.size() > 0) {
843 /// Extend existing band
844 // Extend endTime value
846 // This ensures that a value band's end time isn't the same
847 // as the next band's start time which makes it possible to
848 // know what are the timestamps between which the item value
849 // changed from a to b. This allows us to more correctly render
850 // steps and sloped lines accoring to the actual data.
851 if (!i.itemState.ooDeadband || isMinMaxFormat) {
853 i.current.setTime( time.getBinding(), time.getValue() );
855 i.current.setEndTime( time.getBinding(), time.getValue() );
859 if (i.itemState.ooDeadband || isMinMaxFormat) {
862 if (i.current.hasLastValue()) {
863 i.current.setLastValue( value.getBinding(), value.getValue() );
866 if (i.isNumeric && !isNanValue) {
868 if (i.current.hasAvg() && !restep) {
869 double duration = (currentTime - i.itemState.firstTime);
870 if(duration < 1e-9) {
871 i.current.setAvg( Bindings.DOUBLE, 0.0 );
873 i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
878 if (i.current.hasMin()) {
879 Binding minBinding = i.current.getMinBinding();
880 Object prevMinValue = i.current.getMin();
881 Object currentValueWithMinBinding = value.getValue( minBinding );
882 int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
883 if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
885 if (i.current.hasMax()) {
886 Binding maxBinding = i.current.getMaxBinding();
887 Object prevMaxValue = i.current.getMax();
888 Object currentValueWithMaxBinding = value.getValue( maxBinding );
889 int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
890 if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
894 if (i.current.hasMedian()) {
895 Double median = i.itemState.median.getMedian();
896 i.current.setMedian( Bindings.DOUBLE, median);
900 i.current.setValueNull();
904 if (i.current.hasCount()) {
905 i.current.setCount( i.itemState.count );
909 // Write entry to file stream
910 // if (i.current.getTimeDouble()<0)
911 // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
912 i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );
913 //System.err.println(i.itemState.id + " update " + i.current.getSample());
918 // Start a new value band
919 i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
920 i.itemState.firstTime = currentTime;
921 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
922 i.itemState.currentTime = currentTime;
923 i.itemState.isNaN = isNanValue;
924 i.itemState.isValid = isValidValue;
926 i.itemState.count = 1;
927 i.itemState.ooDeadband = false;
928 if (i.itemState.median != null) {
929 i.itemState.median.clear();
930 if (!Double.isNaN(prevValueDouble) ) {
931 double deltaTime = currentTime - prevTime;
932 i.itemState.median.add( deltaTime, prevValueDouble );
939 // Start new band with current value & time
940 i.current.setTime( time.getBinding(), time.getValue() );
943 if (i.current.hasEndTime()) {
944 i.current.setEndTime( time.getBinding(), time.getValue() );
948 i.current.setValue( value.getBinding(), value.getValue() );
949 i.current.setMax( value.getBinding(), value.getValue() );
950 i.current.setMin( value.getBinding(), value.getValue() );
951 i.current.setLastValue( value.getBinding(), value.getValue() );
952 i.current.setAvg( value.getBinding(), value.getValue() );
953 i.current.setMedian( value.getBinding(), value.getValue() );
955 i.current.setValueNull();
959 // Sample Count=1 for new bands (time, value) "added"
960 if (i.current.hasCount()) {
961 i.current.setCount( 1 );
964 // Add the band to the file stream
965 i.stream.addNoflush( i.binding, i.current.getSample() );
966 //System.err.println(i.itemState.id + " add " + i.current.getSample());
973 public void flush() /*throws HistoryException*/ {
974 for (ActiveStream i : items) {
977 } catch (AccessorException e) {
978 logger.log(Level.FINE, "File history flush failed.", e);
979 // throw new HistoryException(e);
984 public void flush(Set<String> ids) /*throws HistoryException*/ {
985 for (ActiveStream i : items) {
986 if (!ids.contains(i.itemState.id)) continue;
989 } catch (AccessorException e) {
990 logger.log(Level.FINE, "File history flush failed.", e);
991 // throw new HistoryException(e);
998 public synchronized void close()
1000 // Write items back to history
1001 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
1002 for (CollectorState.Item itemState : state.itemStates.values()) {
1004 if (!history.exists(itemState.id)) continue;
1005 Bean bean = history.getItem(itemState.id);
1006 bean.readAvailableFields(itemState);
1007 beans.add( setItemState(bean, itemState) );
1008 } catch (BindingException e) {
1009 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1010 } catch (BindingConstructionException e) {
1011 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1012 } catch (HistoryException e) {
1013 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1017 history.modify(beans.toArray( new Bean[beans.size()] ));
1018 } catch (HistoryException e) {
1019 logger.log(Level.FINE, "Error writing a meta-data to history", e);
1021 state.itemStates.clear();
1024 for (ActiveStream item : items) {
1030 public class ActiveStream {
1032 /** Sample Stream */
1033 public StreamAccessor stream;
1035 /** Sample binding */
1036 public RecordBinding binding;
1038 /** Value binding (Generic) */
1039 public Binding valueBinding;
1041 /** Set true if this item has numeric value type */
1042 public boolean isNumeric;
1044 /** Set true if this item has end time */
1045 public boolean hasEndTime;
1047 /** Item entry in subscription state */
1048 public CollectorState.Item itemState;
1050 /** Interface to current last sample (in the stream) */
1051 public ValueBand current;
1053 /** Adapter that converts value to double value, null if not numeric */
1054 public Adapter valueToDouble, doubleToValue;
1058 if (stream != null) {
1062 } catch (AccessorException e) {
1063 logger.log(Level.FINE, "Error closing stream", e);
1070 public StreamAccessor openStream(String itemId, String mode)
1071 throws HistoryException
1073 return history.openStream(itemId, mode);
1077 public HistoryManager getHistory() {