1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2011 Association for Decentralized Information Management in
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.history.impl;
\r
14 import gnu.trove.set.hash.THashSet;
\r
16 import java.io.IOException;
\r
17 import java.util.ArrayList;
\r
18 import java.util.List;
\r
19 import java.util.Set;
\r
20 import java.util.UUID;
\r
21 import java.util.concurrent.CopyOnWriteArrayList;
\r
22 import java.util.logging.Level;
\r
23 import java.util.logging.Logger;
\r
25 import org.simantics.databoard.Bindings;
\r
26 import org.simantics.databoard.accessor.StreamAccessor;
\r
27 import org.simantics.databoard.accessor.error.AccessorException;
\r
28 import org.simantics.databoard.adapter.AdaptException;
\r
29 import org.simantics.databoard.adapter.Adapter;
\r
30 import org.simantics.databoard.adapter.AdapterConstructionException;
\r
31 import org.simantics.databoard.binding.Binding;
\r
32 import org.simantics.databoard.binding.DoubleBinding;
\r
33 import org.simantics.databoard.binding.FloatBinding;
\r
34 import org.simantics.databoard.binding.NumberBinding;
\r
35 import org.simantics.databoard.binding.RecordBinding;
\r
36 import org.simantics.databoard.binding.error.BindingConstructionException;
\r
37 import org.simantics.databoard.binding.error.BindingException;
\r
38 import org.simantics.databoard.binding.error.RuntimeBindingException;
\r
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;
\r
40 import org.simantics.databoard.binding.mutable.MutableVariant;
\r
41 import org.simantics.databoard.binding.mutable.Variant;
\r
42 import org.simantics.databoard.binding.reflection.VoidBinding;
\r
43 import org.simantics.databoard.serialization.Serializer;
\r
44 import org.simantics.databoard.type.Component;
\r
45 import org.simantics.databoard.type.Datatype;
\r
46 import org.simantics.databoard.type.NumberType;
\r
47 import org.simantics.databoard.type.RecordType;
\r
48 import org.simantics.databoard.util.Bean;
\r
49 import org.simantics.history.Collector;
\r
50 import org.simantics.history.HistoryException;
\r
51 import org.simantics.history.HistoryManager;
\r
52 import org.simantics.history.impl.CollectorState.Item;
\r
53 import org.simantics.history.impl.CollectorState.VariableState;
\r
54 import org.simantics.history.util.ValueBand;
\r
55 import org.simantics.history.util.WeightedMedian;
\r
56 import org.simantics.history.util.subscription.SubscriptionItem;
\r
59 * Collector is used to record data to streams in history manager.
\r
61 * For every item there is collector item state {@link CollectorState.Item},
\r
62 * that contains data required in writing process. This state is read and
\r
63 * written to history manager item (bean) as meta-data.
\r
65 * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}.
\r
67 * @author toni.kalajainen
\r
70 public class CollectorImpl implements Collector {
\r
73 static Logger logger = Logger.getLogger(CollectorImpl.class.getName());
\r
75 public static final int MEDIAN_LIMIT = 500;
\r
77 public static final double EPSILON = 1e-2;
\r
79 /** Collector state */
\r
80 CollectorState state;
\r
82 /** Open streams, corresponds to subscription items */
\r
83 CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();
\r
85 /** Associated history state */
\r
86 HistoryManager history;
\r
88 /** True when all items are open */
\r
89 boolean itemsOpen = true;
\r
91 public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;
\r
93 public FlushPolicy getFlushPolicy() {
\r
97 public void setFlushPolicy(FlushPolicy flushPolicy) {
\r
98 this.flushPolicy = flushPolicy;
\r
102 * Create a new collector that is associated with a history manager.
\r
105 * @throws HistoryException
\r
107 public CollectorImpl(HistoryManager history) {
\r
108 this.history = history;
\r
109 this.state = new CollectorState();
\r
111 this.state.id = UUID.randomUUID().toString();
\r
115 * Create a new collector that is associated with a history manager.
\r
118 * @param previousState
\r
119 * @throws HistoryException
\r
121 public CollectorImpl(HistoryManager history, CollectorState previousState) {
\r
122 this.history = history;
\r
123 this.state = (CollectorState) previousState.clone();
\r
127 public synchronized void addItem(Bean item) throws HistoryException {
\r
129 String id = (String) item.getField("id");
\r
132 CollectorState.Item itemState = null;
\r
133 if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);
\r
134 if ( itemState == null ) itemState = state.itemStates.get( id );
\r
135 if ( itemState == null ) {
\r
137 itemState = new CollectorState.Item();
\r
139 itemState.readAvailableFields( item );
\r
140 // Guess values from the existing data
\r
141 if (history.exists(id)) {
\r
142 readItemState(history, item, itemState);
\r
146 addItem(id, itemState);
\r
147 } catch (BindingException e) {
\r
148 throw new HistoryException( e );
\r
152 public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {
\r
155 state.itemStates.put(id, itemState);
\r
159 if (state.time.type() instanceof NumberType) {
\r
160 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);
\r
161 if ( !Double.isNaN(itemState.currentTime) ) {
\r
162 if ( itemState.currentTime > time ) {
\r
163 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);
\r
167 } catch (AdaptException e) {
\r
168 throw new HistoryException( e );
\r
172 public synchronized void addItems(Bean...items) throws HistoryException {
\r
173 for (Bean item : items) addItem(item);
\r
177 public synchronized void removeItem(String id) throws HistoryException {
\r
178 CollectorState.Item itemState = state.itemStates.remove( id );
\r
179 if (itemState!=null && history.exists(id)) {
\r
180 Bean bean = history.getItem(id);
\r
181 bean.readAvailableFields(itemState);
\r
183 bean = setItemState(bean, itemState);
\r
184 history.modify(bean);
\r
185 } catch (BindingException e) {
\r
187 logger.log(Level.FINE, "Failed to update bean", e);
\r
188 } catch (BindingConstructionException e) {
\r
190 logger.log(Level.FINE, "Failed to update bean", e);
\r
191 } catch (HistoryException e) {
\r
192 logger.log(Level.FINE, "Failed to update history item meta-data", e);
\r
196 ActiveStream as = getStream(id);
\r
197 if ( as != null ) {
\r
198 items.remove( as );
\r
203 ActiveStream getStream(String id) {
\r
204 for (ActiveStream as : items) {
\r
205 if (as.itemState.id.equals(id)) return as;
\r
211 * Set (or Add) item's configuration.
\r
214 * @throws HistoryException
\r
216 public synchronized void setItem(Bean item) throws HistoryException {
\r
218 String id = (String) item.getField("id");
\r
220 CollectorState.Item itemState = state.itemStates.get( id );
\r
221 if ( itemState==null ) {
\r
225 itemState.readAvailableFields(item);
\r
226 } catch (BindingException e) {
\r
227 throw new HistoryException( e );
\r
233 * Set item state to bean. If the bean does not have field collectorState,
\r
234 * a new bean class is created, and field added.
\r
238 * @return same or new bean
\r
239 * @throws BindingException
\r
240 * @throws BindingConstructionException
\r
242 public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException
\r
244 Binding binding = config.getFieldBinding("collectorState");
\r
245 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {
\r
246 RecordType rt = config.getBinding().type();
\r
247 RecordType newType = new RecordType();
\r
248 for (int i=0; i<rt.getComponentCount(); i++) {
\r
249 Component c = rt.getComponent(i);
\r
250 if ( c.name.equals("collectorState") ) continue;
\r
251 newType.addComponent(c.name, c.type);
\r
253 newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());
\r
254 Binding newBinding = Bindings.getBeanBinding(newType);
\r
255 Bean newConfig = (Bean) newBinding.createDefault();
\r
256 newConfig.readAvailableFields( config );
\r
257 config = newConfig;
\r
259 config.setField("collectorState", CollectorState.BINDING_ITEM, state);
\r
263 public SubscriptionItem[] getItems() {
\r
264 int c = state.itemStates.size();
\r
265 SubscriptionItem[] items = new SubscriptionItem[ c ];
\r
267 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );
\r
268 for (CollectorState.Item item : state.itemStates.values()) {
\r
269 _HistoryItem hi = new _HistoryItem();
\r
271 hi.readAvailableFields(item);
\r
272 //hi.collectorState = (Item) item.clone();
\r
274 // Workaround (clone doesn't work for WeightedMedian)
\r
276 byte[] data = s.serialize( item );
\r
277 hi.collectorState = (Item) s.deserialize(data);
\r
278 } catch (IOException e) {
\r
286 public static class _HistoryItem extends SubscriptionItem {
\r
287 public CollectorState.Item collectorState;
\r
291 * Creates and loads active item. TimeSeries stream file is opened.
\r
294 * @return new active item
\r
295 * @throws HistoryException
\r
297 ActiveStream openStream(CollectorState.Item itemState) throws HistoryException
\r
300 ActiveStream x = new ActiveStream();
\r
301 x.itemState = itemState;
\r
305 if ( !history.exists(x.itemState.id) ) {
\r
306 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");
\r
309 x.stream = history.openStream(x.itemState.id, "rw");
\r
313 Datatype sampleType = x.stream.type().componentType;
\r
314 itemState.format = sampleType;
\r
316 x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );
\r
317 } catch ( RuntimeException e ) {
\r
318 System.err.println("Could not create bean binding for type "+sampleType);
\r
319 x.binding = (RecordBinding) Bindings.getBinding( sampleType );
\r
323 x.valueBinding = x.binding.getComponentBinding("value");
\r
324 x.isNumeric = x.valueBinding instanceof NumberBinding;
\r
325 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;
\r
328 if ( x.isNumeric )
\r
330 x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);
\r
331 x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);
\r
332 } catch (AdapterConstructionException e1) {
\r
333 x.valueToDouble = null;
\r
336 // Create valueband, and read last entry into memory
\r
338 Object currentEntry = x.binding.createDefault();
\r
339 if (x.stream.size() >= 1) {
\r
340 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);
\r
342 x.current = new ValueBand(x.binding);
\r
343 x.current.setSample(currentEntry);
\r
344 } catch (AccessorException e) {
\r
345 throw new HistoryException(
\r
346 "Could not read sample from stream file, " + e, e);
\r
347 } catch (BindingException e) {
\r
348 throw new HistoryException(
\r
349 "Could not read sample from stream file, " + e, e);
\r
353 x.itemState = itemState;
\r
356 if (x.isNumeric && x.current.hasMedian()
\r
357 && x.itemState.median == null) {
\r
358 x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );
\r
365 * Read itemState from history. Item must exist.
\r
367 * @param si subscription item
\r
368 * @return state item
\r
369 * @throws RuntimeBindingException
\r
370 * @throws HistoryException
\r
372 static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {
\r
373 // Create item state
\r
374 item.readAvailableFields(si);
\r
375 // Read state from items
\r
376 StreamAccessor sa = history.openStream(item.id, "r");
\r
378 Datatype sampleType = sa.type().componentType;
\r
380 Binding sampleBinding = Bindings.getBinding(sampleType);
\r
381 if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");
\r
382 Object sample = sampleBinding.createDefaultUnchecked();
\r
383 ValueBand v = new ValueBand(sampleBinding, sample);
\r
384 item.count = sa.size();
\r
385 if ( item.count>0 ) {
\r
386 sa.get(0, sampleBinding, sample);
\r
387 item.firstTime = v.getTimeDouble();
\r
388 Binding b = v.getValueBinding();
\r
389 Object value = v.getValue(b);
\r
390 item.firstValue.setValue(b, value);
\r
392 sa.get(item.count-1, sampleBinding, sample);
\r
393 item.currentTime = v.getTimeDouble();
\r
394 b = v.getValueBinding();
\r
395 value = v.getValue(b);
\r
396 item.currentValue.setValue(b, value);
\r
398 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {
\r
399 DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();
\r
400 value = item.currentValue.getValue(db);
\r
401 item.isNaN = Double.isNaN( (Double) value );
\r
405 } catch (AccessorException e) {
\r
406 throw new HistoryException( e );
\r
407 } catch (AdaptException e) {
\r
408 throw new HistoryException( e );
\r
410 try { sa.close(); } catch (AccessorException e) {}
\r
415 public CollectorState getState() {
\r
416 return (CollectorState) state.clone();
\r
420 public void setState(Bean newState) {
\r
421 if (newState == null)
\r
423 if (state == null) {
\r
424 state = new CollectorState();
\r
427 state.readAvailableFields(newState);
\r
429 // Make sure that openAllItems ensures that all streams are opened properly.
\r
430 // This is because setItem will not invoke addItem if collectorState exists
\r
431 // for an item and therefore itemsOpen might not otherwise be set to false.
\r
436 public Object getTime(Binding binding) throws HistoryException {
\r
437 MutableVariant v = state.time;
\r
438 if (v==null) return null;
\r
440 return v.getValue(binding);
\r
441 } catch (AdaptException e) {
\r
442 throw new HistoryException(e);
\r
447 public Object getValue(String id, Binding binding) throws HistoryException {
\r
448 VariableState vs = state.values.get(id);
\r
449 if (vs==null) return null;
\r
451 if (!vs.isValid) return null;
\r
454 return vs.value.getValue(binding);
\r
455 } catch (AdaptException e) {
\r
456 throw new HistoryException(e);
\r
460 void openAllItems() throws HistoryException {
\r
461 if (itemsOpen) return;
\r
463 // Assert all items are open
\r
464 // 1. Collect all items that still need to be opened.
\r
465 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );
\r
466 for (ActiveStream as : items)
\r
467 itemsToOpen.remove(as.itemState.id);
\r
469 // 2. Open missing streams
\r
470 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());
\r
471 for (String itemId : itemsToOpen) {
\r
472 CollectorState.Item item = state.itemStates.get( itemId );
\r
473 ActiveStream as = openStream( item );
\r
477 // 3. Update list of open streams
\r
478 items.addAll(opened);
\r
482 private void updateDt(NumberBinding binding, Object time) {
\r
484 Double current = (Double)getTime(Bindings.DOUBLE);
\r
485 if(current != null) {
\r
486 double t = binding.getValue(time).doubleValue();
\r
487 state.dT = t-current;
\r
488 if(state.dT > 0) return;
\r
490 } catch (Throwable t) {
\r
496 public void beginStep(NumberBinding binding, Object time) throws HistoryException {
\r
498 updateDt(binding, time);
\r
499 state.time.setValue(binding, time);
\r
503 public void setValue(String id, Binding binding, Object value) throws HistoryException {
\r
504 VariableState vs = state.values.get( id );
\r
506 vs = new VariableState();
\r
507 vs.value = new MutableVariant(binding, value);
\r
508 state.values.put(id, vs);
\r
512 vs.value.setValue(VoidBinding.VOID_BINDING, null);
\r
514 vs.isValid = false;
\r
518 vs.value.setValue(binding, value);
\r
521 if (binding instanceof DoubleBinding) {
\r
522 DoubleBinding db = (DoubleBinding) binding;
\r
524 double _v = db.getValue_(value);
\r
525 vs.isNan = Double.isNaN( _v );
\r
526 } catch (BindingException e) {
\r
528 vs.isValid = false;
\r
530 } else if (binding instanceof FloatBinding) {
\r
531 FloatBinding db = (FloatBinding) binding;
\r
533 float _v = db.getValue_(value);
\r
534 vs.isNan = Float.isNaN( _v );
\r
535 } catch (BindingException e) {
\r
537 vs.isValid = false;
\r
545 public boolean getValue(String id, MutableVariant result) {
\r
546 VariableState vs = state.values.get(id);
\r
547 if (vs==null) return false;
\r
549 if (!vs.isValid) return false;
\r
551 result.setValue(vs.value.getBinding(), vs.value.getValue());
\r
556 public void endStep() throws HistoryException {
\r
557 // Write values to streams
\r
558 for (ActiveStream i : items)
\r
561 putValueToStream(i);
\r
562 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {
\r
565 } catch (AdaptException e) {
\r
566 throw new HistoryException( e );
\r
567 // FileAccessor fa = (FileAccessor) i.stream;
\r
568 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
\r
569 } catch (AccessorException e) {
\r
570 throw new HistoryException( e );
\r
571 // FileAccessor fa = (FileAccessor) i.stream;
\r
572 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
\r
573 } catch (BindingException e) {
\r
574 throw new HistoryException( e );
\r
575 // FileAccessor fa = (FileAccessor) i.stream;
\r
576 // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);
\r
581 private int sampleNum(double time, double interval) {
\r
582 // 1.0 is needed to cover the case of 0.0
\r
583 return (int)(1.0 + (time+EPSILON*state.dT)/interval);
\r
586 private boolean isExactInterval(double time, double interval) {
\r
587 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);
\r
591 * This method puts a time,value to a stream.
\r
593 * The stream is "packed" when the new value is equal to previous, or the
\r
594 * value is within deadband (in comparison to the first value), or if
\r
595 * the interval zone is not exceeded.
\r
597 * The stream can be packed two different ways, depending on the sample
\r
598 * format. The sample format can be (a) single value or (b) a time range of
\r
599 * multiple values, Ranged Sample.
\r
601 * Single value is packed by removing middle values, in other words,
\r
602 * moving the timecode of the last sample.
\r
604 * Ranged sample is packed by extending the endTime code of the last sample.
\r
605 * Other fields, such as avg, sum, count, lastValue are also updated.
\r
608 * @throws AdaptException
\r
609 * @throws AccessorException
\r
610 * @throws BindingException
\r
611 * @throws HistoryException
\r
613 private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException
\r
616 Variant time = state.time;
\r
617 double currentTime = Double.NaN;
\r
618 if (time != null) {
\r
619 Double x = (Double) time.getValue(Bindings.DOUBLE);
\r
620 currentTime = x == null ? Double.NaN : x;
\r
624 boolean isFirstValue = i.stream.size() == 0;
\r
625 // Is this item disabled
\r
626 boolean isDisabled = !i.itemState.enabled;
\r
627 // Discontinuety, first value after being disabled
\r
628 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;
\r
630 // Update currently disabled status
\r
631 i.itemState.isDisabled = isDisabled;
\r
633 // Update time codes
\r
635 i.itemState.count++;
\r
636 i.itemState.lastDisabledTime = currentTime;
\r
637 if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;
\r
641 // Is nan value, for doubles and floats
\r
642 boolean isNanValue;
\r
644 boolean isValidValue;
\r
645 // Value as variant
\r
646 MutableVariant value;
\r
647 // Get current value as double
\r
648 double currentValueDouble;
\r
650 VariableState vs = state.values.get( i.itemState.variableId );
\r
652 isNanValue = vs.isNan;
\r
653 isValidValue = vs.isValid;
\r
655 if (i.isNumeric && isValidValue && !isNanValue) {
\r
656 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );
\r
657 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {
\r
658 currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;
\r
659 Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);
\r
660 value = new MutableVariant( i.valueBinding, x );
\r
663 currentValueDouble = Double.NaN;
\r
668 isValidValue = false;
\r
669 value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );
\r
670 currentValueDouble = Double.NaN;
\r
674 double prevValueDouble = Double.NaN;
\r
675 Variant pv = i.itemState.currentValue;
\r
676 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {
\r
677 prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );
\r
679 prevValueDouble = Double.NaN;
\r
682 // The time of the first sample of region
\r
683 double firstTime = i.itemState.firstTime;
\r
685 double prevTime = i.itemState.currentTime;
\r
687 boolean restep = prevTime == currentTime;
\r
689 // There was change in NaN
\r
690 boolean isNaNChange = i.itemState.isNaN != isNanValue;
\r
692 // There was change in validity of the value
\r
693 boolean isValidChange = i.itemState.isValid != isValidValue;
\r
695 //boolean wasValidValue = !i.itemState.isValid && isValidValue;
\r
697 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );
\r
699 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric;
\r
701 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&
\r
702 i.itemState.deadband==Double.MAX_VALUE &&
\r
703 i.binding.getComponentIndex("min")>=0 &&
\r
704 i.binding.getComponentIndex("max")>=0;
\r
707 if (isValidValue && i.isNumeric && i.itemState.count>=1) {
\r
709 if (i.itemState.isValid) {
\r
710 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );
\r
711 double diff = Math.abs( currentValueDouble - firstValueDouble );
\r
713 i.itemState.ooDeadband |= diff >= i.itemState.deadband;
\r
715 i.itemState.ooDeadband |= diff != 0.0;
\r
717 i.itemState.ooDeadband = true;
\r
720 // NaN change is deadband change
\r
721 i.itemState.ooDeadband |= isNaNChange;
\r
723 // The dead band cannot be have broken in first sample
\r
724 i.itemState.ooDeadband = false;
\r
729 boolean ooInterval = true;
\r
730 if ( hasInterval ) {
\r
732 // System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);
\r
734 // First band is a special case
\r
736 // If this is the first sample store the current time as first time
\r
737 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;
\r
738 int sn1 = sampleNum(firstTime,i.itemState.interval);
\r
739 int sn2 = sampleNum(currentTime,i.itemState.interval);
\r
740 // Wait until next sample if this position is not already exactly at sample border
\r
741 // Always start minmax immediately
\r
742 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;
\r
744 int sn1 = sampleNum(prevTime,i.itemState.interval);
\r
745 int sn2 = sampleNum(currentTime,i.itemState.interval);
\r
746 // System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);
\r
747 ooInterval = sn2 > sn1;
\r
752 // double n = (currentTime+EPSILON) / i.itemState.interval;
\r
753 // double n2 = n-Math.floor(n);
\r
756 // double deltaTime = currentTime - firstTime;
\r
757 // ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;
\r
760 // Should new sample be created, or old sample extended / forwarded in time
\r
761 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;
\r
763 // Update item min-max, avg, median
\r
764 if ( isValidValue && i.isNumeric ) {
\r
765 if (i.current.hasAvg() && i.isNumeric) {
\r
766 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;
\r
767 if (isValidValue && i.isNumeric) {
\r
768 double timeDelta = currentTime - i.itemState.currentTime;
\r
769 i.itemState.sum += timeDelta * currentValueDouble;
\r
773 if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {
\r
774 double deltaTime = currentTime - prevTime;
\r
775 i.itemState.median.add( deltaTime, prevValueDouble );
\r
779 // Increase the counter that tracks the number of samples acquired into the band
\r
780 if (!restep) i.itemState.count++;
\r
782 // Remember values
\r
783 i.itemState.currentTime = currentTime;
\r
784 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
\r
785 i.itemState.isNaN = isNanValue;
\r
786 i.itemState.isValid = isValidValue;
\r
788 // Put discontinuation marker
\r
789 if ( (wasDisabled && i.current.supportsNullValue())) {
\r
791 if (i.current.isNullValue()) {
\r
792 if (i.current.hasEndTime()) {
\r
793 i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );
\r
796 if (i.current.hasCount()) {
\r
797 i.current.setCount( i.current.getCount()+1 );
\r
800 // Add the band to the file stream
\r
801 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );
\r
806 i.current.setValueNull();
\r
807 i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);
\r
808 i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);
\r
809 i.stream.addNoflush( i.binding, i.current.getSample() );
\r
812 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2);
\r
814 if(i.stream.size() > 0 && !i.itemState.ooDeadband) {
\r
816 /// Extend existing band
\r
817 // Extend endTime value
\r
818 if (!i.hasEndTime) {
\r
819 i.current.setTime( time.getBinding(), time.getValue() );
\r
821 i.current.setEndTime( time.getBinding(), time.getValue() );
\r
824 if (isValidValue) {
\r
826 if (i.current.hasLastValue()) {
\r
827 i.current.setLastValue( value.getBinding(), value.getValue() );
\r
831 if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {
\r
832 double duration = (currentTime - i.itemState.firstTime);
\r
833 if(duration < 1e-9) {
\r
834 i.current.setAvg( Bindings.DOUBLE, 0.0 );
\r
836 i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);
\r
841 if (i.current.hasMin() && i.isNumeric && !isNanValue) {
\r
842 Binding minBinding = i.current.getMinBinding();
\r
843 Object prevMinValue = i.current.getMin();
\r
844 Object currentValueWithMinBinding = value.getValue( minBinding );
\r
845 int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );
\r
846 if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );
\r
848 if (i.current.hasMax() && i.isNumeric && !isNanValue) {
\r
849 Binding maxBinding = i.current.getMaxBinding();
\r
850 Object prevMaxValue = i.current.getMax();
\r
851 Object currentValueWithMaxBinding = value.getValue( maxBinding );
\r
852 int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );
\r
853 if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );
\r
857 if (i.current.hasMedian() && i.isNumeric && !isNanValue) {
\r
858 Double median = i.itemState.median.getMedian();
\r
859 i.current.setMedian( Bindings.DOUBLE, median);
\r
862 i.current.setValueNull();
\r
866 if (i.current.hasCount()) {
\r
867 i.current.setCount( i.itemState.count );
\r
870 // Write entry to file stream
\r
871 // if (i.current.getTimeDouble()<0)
\r
872 // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));
\r
873 i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );
\r
878 // Start a new value band
\r
879 i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );
\r
880 i.itemState.firstTime = currentTime;
\r
881 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );
\r
882 i.itemState.currentTime = currentTime;
\r
883 i.itemState.isNaN = isNanValue;
\r
884 i.itemState.isValid = isValidValue;
\r
885 i.itemState.sum = 0;
\r
886 i.itemState.count = 1;
\r
887 i.itemState.ooDeadband = false;
\r
888 if (i.itemState.median != null) {
\r
889 i.itemState.median.clear();
\r
890 if (!Double.isNaN(prevValueDouble) ) {
\r
891 double deltaTime = currentTime - prevTime;
\r
892 i.itemState.median.add( deltaTime, prevValueDouble );
\r
899 // Start new band with current value & time
\r
900 i.current.setTime( time.getBinding(), time.getValue() );
\r
903 if (i.current.hasEndTime()) {
\r
904 i.current.setEndTime( time.getBinding(), time.getValue() );
\r
907 if (isValidValue) {
\r
908 i.current.setValue( value.getBinding(), value.getValue() );
\r
909 i.current.setMax( value.getBinding(), value.getValue() );
\r
910 i.current.setMin( value.getBinding(), value.getValue() );
\r
911 i.current.setLastValue( value.getBinding(), value.getValue() );
\r
912 i.current.setAvg( value.getBinding(), value.getValue() );
\r
913 i.current.setMedian( value.getBinding(), value.getValue() );
\r
915 i.current.setValueNull();
\r
919 // Sample Count=1 for new bands (time, value) "added"
\r
920 if (i.current.hasCount()) {
\r
921 i.current.setCount( 1 );
\r
924 // Add the band to the file stream
\r
925 i.stream.addNoflush( i.binding, i.current.getSample() );
\r
932 public void flush() /*throws HistoryException*/ {
\r
933 for (ActiveStream i : items) {
\r
936 } catch (AccessorException e) {
\r
937 logger.log(Level.FINE, "File history flush failed.", e);
\r
938 // throw new HistoryException(e);
\r
943 public void flush(Set<String> ids) /*throws HistoryException*/ {
\r
944 for (ActiveStream i : items) {
\r
945 if (!ids.contains(i.itemState.id)) continue;
\r
948 } catch (AccessorException e) {
\r
949 logger.log(Level.FINE, "File history flush failed.", e);
\r
950 // throw new HistoryException(e);
\r
957 public synchronized void close()
\r
959 // Write items back to history
\r
960 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );
\r
961 for (CollectorState.Item itemState : state.itemStates.values()) {
\r
963 if (!history.exists(itemState.id)) continue;
\r
964 Bean bean = history.getItem(itemState.id);
\r
965 bean.readAvailableFields(itemState);
\r
966 beans.add( setItemState(bean, itemState) );
\r
967 } catch (BindingException e) {
\r
968 logger.log(Level.FINE, "Error writing a meta-data to history", e);
\r
969 } catch (BindingConstructionException e) {
\r
970 logger.log(Level.FINE, "Error writing a meta-data to history", e);
\r
971 } catch (HistoryException e) {
\r
972 logger.log(Level.FINE, "Error writing a meta-data to history", e);
\r
976 history.modify(beans.toArray( new Bean[beans.size()] ));
\r
977 } catch (HistoryException e) {
\r
978 logger.log(Level.FINE, "Error writing a meta-data to history", e);
\r
980 state.itemStates.clear();
\r
983 for (ActiveStream item : items) {
\r
989 public class ActiveStream {
\r
991 /** Sample Stream */
\r
992 public StreamAccessor stream;
\r
994 /** Sample binding */
\r
995 public RecordBinding binding;
\r
997 /** Value binding (Generic) */
\r
998 public Binding valueBinding;
\r
1000 /** Set true if this item has numeric value type */
\r
1001 public boolean isNumeric;
\r
1003 /** Set true if this item has end time */
\r
1004 public boolean hasEndTime;
\r
1006 /** Item entry in subscription state */
\r
1007 public CollectorState.Item itemState;
\r
1009 /** Interface to current last sample (in the stream) */
\r
1010 public ValueBand current;
\r
1012 /** Adapter that converts value to double value, null if not numeric */
\r
1013 public Adapter valueToDouble, doubleToValue;
\r
1017 if (stream != null) {
\r
1021 } catch (AccessorException e) {
\r
1022 logger.log(Level.FINE, "Error closing stream", e);
\r
1029 public StreamAccessor openStream(String itemId, String mode)
\r
1030 throws HistoryException
\r
1032 return history.openStream(itemId, mode);
\r
1036 public HistoryManager getHistory() {
\r