X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.history%2Fsrc%2Forg%2Fsimantics%2Fhistory%2Fimpl%2FCollectorImpl.java;h=87062a9dabecae759aa0f5dffd442a496af156e5;hp=c001adc52a97cb4fe00763d9b5b1608f0b45d6a5;hb=fee871302ad4c53ca2c2307bd13366d99116d930;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java b/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java index c001adc52..87062a9da 100644 --- a/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java +++ b/bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java @@ -1,1040 +1,1045 @@ -/******************************************************************************* - * Copyright (c) 2007, 2011 Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.history.impl; - -import gnu.trove.set.hash.THashSet; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.accessor.StreamAccessor; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.adapter.AdaptException; -import org.simantics.databoard.adapter.Adapter; -import org.simantics.databoard.adapter.AdapterConstructionException; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.DoubleBinding; -import org.simantics.databoard.binding.FloatBinding; -import org.simantics.databoard.binding.NumberBinding; -import org.simantics.databoard.binding.RecordBinding; -import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.error.BindingException; -import org.simantics.databoard.binding.error.RuntimeBindingException; -import org.simantics.databoard.binding.impl.DoubleBindingDefault; -import org.simantics.databoard.binding.mutable.MutableVariant; -import org.simantics.databoard.binding.mutable.Variant; -import org.simantics.databoard.binding.reflection.VoidBinding; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.type.Component; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.type.NumberType; -import org.simantics.databoard.type.RecordType; -import org.simantics.databoard.util.Bean; -import org.simantics.history.Collector; -import org.simantics.history.HistoryException; -import org.simantics.history.HistoryManager; -import org.simantics.history.impl.CollectorState.Item; -import org.simantics.history.impl.CollectorState.VariableState; -import org.simantics.history.util.ValueBand; -import org.simantics.history.util.WeightedMedian; -import org.simantics.history.util.subscription.SubscriptionItem; - -/** - * Collector is used to record data to streams in history manager. - * - * For every item there is collector item state {@link CollectorState.Item}, - * that contains data required in writing process. This state is read and - * written to history manager item (bean) as meta-data. - * - * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. - * - * @author toni.kalajainen - * - */ -public class CollectorImpl implements Collector { - - /** Logger */ - static Logger logger = Logger.getLogger(CollectorImpl.class.getName()); - - public static final int MEDIAN_LIMIT = 500; - - public static final double EPSILON = 1e-2; - - /** Collector state */ - CollectorState state; - - /** Open streams, corresponds to subscription items */ - CopyOnWriteArrayList items = new CopyOnWriteArrayList(); - - /** Associated history state */ - HistoryManager history; - - /** True when all items are open */ - boolean itemsOpen = true; - - public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep; - - public FlushPolicy getFlushPolicy() { - return flushPolicy; - } - - public void setFlushPolicy(FlushPolicy flushPolicy) { - this.flushPolicy = flushPolicy; - } - - /** - * Create a new collector that is associated with a history manager. - * - * @param history - * @throws HistoryException - */ - public CollectorImpl(HistoryManager history) { - this.history = history; - this.state = new CollectorState(); - this.state.init(); - this.state.id = UUID.randomUUID().toString(); - } - - /** - * Create a new collector that is associated with a history manager. - * - * @param history - * @param previousState - * @throws HistoryException - */ - public CollectorImpl(HistoryManager history, CollectorState previousState) { - this.history = history; - this.state = (CollectorState) previousState.clone(); - } - - @Override - public synchronized void addItem(Bean item) throws HistoryException { - try { - String id = (String) item.getField("id"); - - // Item State - CollectorState.Item itemState = null; - if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM); - if ( itemState == null ) itemState = state.itemStates.get( id ); - if ( itemState == null ) { - // Create new item - itemState = new CollectorState.Item(); - itemState.init(); - itemState.readAvailableFields( item ); - // Guess values from the existing data - if (history.exists(id)) { - readItemState(history, item, itemState); - } - } - - addItem(id, itemState); - } catch (BindingException e) { - throw new HistoryException( e ); - } - } - - public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException { - try { - // Item State - state.itemStates.put(id, itemState); - itemsOpen = false; - - // Time - if (state.time.type() instanceof NumberType) { - double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE); - if ( !Double.isNaN(itemState.currentTime) ) { - if ( itemState.currentTime > time ) { - state.time.setValue(Bindings.DOUBLE, itemState.currentTime); - } - } - } - } catch (AdaptException e) { - throw new HistoryException( e ); - } - } - - public synchronized void addItems(Bean...items) throws HistoryException { - for (Bean item : items) addItem(item); - } - - @Override - public synchronized void removeItem(String id) throws HistoryException { - CollectorState.Item itemState = state.itemStates.remove( id ); - if (itemState!=null && history.exists(id)) { - Bean bean = history.getItem(id); - bean.readAvailableFields(itemState); - try { - bean = setItemState(bean, itemState); - history.modify(bean); - } catch (BindingException e) { - // Unexpected - logger.log(Level.FINE, "Failed to update bean", e); - } catch (BindingConstructionException e) { - // Unexpected - logger.log(Level.FINE, "Failed to update bean", e); - } catch (HistoryException e) { - logger.log(Level.FINE, "Failed to update history item meta-data", e); - } - } - - ActiveStream as = getStream(id); - if ( as != null ) { - items.remove( as ); - as.close(); - } - } - - ActiveStream getStream(String id) { - for (ActiveStream as : items) { - if (as.itemState.id.equals(id)) return as; - } - return null; - } - - /** - * Set (or Add) item's configuration. - * - * @param item item - * @throws HistoryException - */ - public synchronized void setItem(Bean item) throws HistoryException { - try { - String id = (String) item.getField("id"); - - CollectorState.Item itemState = state.itemStates.get( id ); - if ( itemState==null ) { - addItem(item); - return; - } - itemState.readAvailableFields(item); - } catch (BindingException e) { - throw new HistoryException( e ); - } - } - - - /** - * Set item state to bean. If the bean does not have field collectorState, - * a new bean class is created, and field added. - * - * @param config - * @param state - * @return same or new bean - * @throws BindingException - * @throws BindingConstructionException - */ - public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException - { - Binding binding = config.getFieldBinding("collectorState"); - if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) { - RecordType rt = config.getBinding().type(); - RecordType newType = new RecordType(); - for (int i=0; i0; - - // Value adapter - if ( x.isNumeric ) - try { - x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false); - x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false); - } catch (AdapterConstructionException e1) { - x.valueToDouble = null; - } - - // Create valueband, and read last entry into memory - try { - Object currentEntry = x.binding.createDefault(); - if (x.stream.size() >= 1) { - x.stream.get(x.stream.size() - 1, x.binding, currentEntry); - } - x.current = new ValueBand(x.binding); - x.current.setSample(currentEntry); - } catch (AccessorException e) { - throw new HistoryException( - "Could not read sample from stream file, " + e, e); - } catch (BindingException e) { - throw new HistoryException( - "Could not read sample from stream file, " + e, e); - } - - // Item State - x.itemState = itemState; - - // Median - if (x.isNumeric && x.current.hasMedian() - && x.itemState.median == null) { - x.itemState.median = new WeightedMedian( MEDIAN_LIMIT ); - } - - return x; - } - - /** - * Read itemState from history. Item must exist. - * - * @param si subscription item - * @return state item - * @throws RuntimeBindingException - * @throws HistoryException - */ - static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException { - // Create item state - item.readAvailableFields(si); - // Read state from items - StreamAccessor sa = history.openStream(item.id, "r"); - try { - Datatype sampleType = sa.type().componentType; - - Binding sampleBinding = Bindings.getBinding(sampleType); - if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")"); - Object sample = sampleBinding.createDefaultUnchecked(); - ValueBand v = new ValueBand(sampleBinding, sample); - item.count = sa.size(); - if ( item.count>0 ) { - sa.get(0, sampleBinding, sample); - item.firstTime = v.getTimeDouble(); - Binding b = v.getValueBinding(); - Object value = v.getValue(b); - item.firstValue.setValue(b, value); - - sa.get(item.count-1, sampleBinding, sample); - item.currentTime = v.getTimeDouble(); - b = v.getValueBinding(); - value = v.getValue(b); - item.currentValue.setValue(b, value); - - if ( item.currentValue.getBinding() instanceof DoubleBinding ) { - DoubleBinding db = (DoubleBinding) item.currentValue.getBinding(); - value = item.currentValue.getValue(db); - item.isNaN = Double.isNaN( (Double) value ); - } - } - - } catch (AccessorException e) { - throw new HistoryException( e ); - } catch (AdaptException e) { - throw new HistoryException( e ); - } finally { - try { sa.close(); } catch (AccessorException e) {} - } - } - - @Override - public CollectorState getState() { - return (CollectorState) state.clone(); - } - - @Override - public void setState(Bean newState) { - if (newState == null) - return; - if (state == null) { - state = new CollectorState(); - state.init(); - } - state.readAvailableFields(newState); - - // Make sure that openAllItems ensures that all streams are opened properly. - // This is because setItem will not invoke addItem if collectorState exists - // for an item and therefore itemsOpen might not otherwise be set to false. - itemsOpen = false; - } - - @Override - public Object getTime(Binding binding) throws HistoryException { - MutableVariant v = state.time; - if (v==null) return null; - try { - return v.getValue(binding); - } catch (AdaptException e) { - throw new HistoryException(e); - } - } - - @Override - public Object getValue(String id, Binding binding) throws HistoryException { - VariableState vs = state.values.get(id); - if (vs==null) return null; - - if (!vs.isValid) return null; - - try { - return vs.value.getValue(binding); - } catch (AdaptException e) { - throw new HistoryException(e); - } - } - - void openAllItems() throws HistoryException { - if (itemsOpen) return; - - // Assert all items are open - // 1. Collect all items that still need to be opened. - Set itemsToOpen = new THashSet( state.itemStates.keySet() ); - for (ActiveStream as : items) - itemsToOpen.remove(as.itemState.id); - - // 2. Open missing streams - List opened = new ArrayList(itemsToOpen.size()); - for (String itemId : itemsToOpen) { - CollectorState.Item item = state.itemStates.get( itemId ); - ActiveStream as = openStream( item ); - opened.add(as); - } - - // 3. Update list of open streams - items.addAll(opened); - itemsOpen = true; - } - - private void updateDt(NumberBinding binding, Object time) { - try { - Double current = (Double)getTime(Bindings.DOUBLE); - if(current != null) { - double t = binding.getValue(time).doubleValue(); - state.dT = t-current; - if(state.dT > 0) return; - } - } catch (Throwable t) { - } - state.dT = 1.0; - } - - @Override - public void beginStep(NumberBinding binding, Object time) throws HistoryException { - openAllItems(); - updateDt(binding, time); - state.time.setValue(binding, time); - } - - @Override - public void setValue(String id, Binding binding, Object value) throws HistoryException { - VariableState vs = state.values.get( id ); - if (vs == null) { - vs = new VariableState(); - vs.value = new MutableVariant(binding, value); - state.values.put(id, vs); - } - - if (value==null) { - vs.value.setValue(VoidBinding.VOID_BINDING, null); - vs.isNan = true; - vs.isValid = false; - return; - } - - vs.value.setValue(binding, value); - vs.isValid = true; - - if (binding instanceof DoubleBinding) { - DoubleBinding db = (DoubleBinding) binding; - try { - double _v = db.getValue_(value); - vs.isNan = Double.isNaN( _v ); - } catch (BindingException e) { - vs.isNan = true; - vs.isValid = false; - } - } else if (binding instanceof FloatBinding) { - FloatBinding db = (FloatBinding) binding; - try { - float _v = db.getValue_(value); - vs.isNan = Float.isNaN( _v ); - } catch (BindingException e) { - vs.isNan = true; - vs.isValid = false; - } - } else { - vs.isNan = false; - } - } - - @Override - public boolean getValue(String id, MutableVariant result) { - VariableState vs = state.values.get(id); - if (vs==null) return false; - - if (!vs.isValid) return false; - - result.setValue(vs.value.getBinding(), vs.value.getValue()); - return true; - } - - @Override - public void endStep() throws HistoryException { - // Write values to streams - for (ActiveStream i : items) - { - try { - putValueToStream(i); - if (flushPolicy == FlushPolicy.FlushOnEveryStep) { - i.stream.flush(); - } - } catch (AdaptException e) { - throw new HistoryException( e ); -// FileAccessor fa = (FileAccessor) i.stream; -// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); - } catch (AccessorException e) { - throw new HistoryException( e ); -// FileAccessor fa = (FileAccessor) i.stream; -// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); - } catch (BindingException e) { - throw new HistoryException( e ); -// FileAccessor fa = (FileAccessor) i.stream; -// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); - } - } - } - - private int sampleNum(double time, double interval) { - // 1.0 is needed to cover the case of 0.0 - return (int)(1.0 + (time+EPSILON*state.dT)/interval); - } - - private boolean isExactInterval(double time, double interval) { - return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval); - } - - /** - * This method puts a time,value to a stream. - * - * The stream is "packed" when the new value is equal to previous, or the - * value is within deadband (in comparison to the first value), or if - * the interval zone is not exceeded. - * - * The stream can be packed two different ways, depending on the sample - * format. The sample format can be (a) single value or (b) a time range of - * multiple values, Ranged Sample. - * - * Single value is packed by removing middle values, in other words, - * moving the timecode of the last sample. - * - * Ranged sample is packed by extending the endTime code of the last sample. - * Other fields, such as avg, sum, count, lastValue are also updated. - * - * @param i - * @throws AdaptException - * @throws AccessorException - * @throws BindingException - * @throws HistoryException - */ - private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException - { - // Current Time - Variant time = state.time; - double currentTime = Double.NaN; - if (time != null) { - Double x = (Double) time.getValue(Bindings.DOUBLE); - currentTime = x == null ? Double.NaN : x; - } - - // Is first value - boolean isFirstValue = i.stream.size() == 0; - // Is this item disabled - boolean isDisabled = !i.itemState.enabled; - // Discontinuety, first value after being disabled - boolean wasDisabled = !isFirstValue && i.itemState.isDisabled; - - // Update currently disabled status - i.itemState.isDisabled = isDisabled; - - // Update time codes - if (isDisabled) { - i.itemState.count++; - i.itemState.lastDisabledTime = currentTime; - if (!wasDisabled) i.itemState.firstDisabledTime = currentTime; - return; - } - - // Is nan value, for doubles and floats - boolean isNanValue; - // Is valid value - boolean isValidValue; - // Value as variant - MutableVariant value; - // Get current value as double - double currentValueDouble; - - VariableState vs = state.values.get( i.itemState.variableId ); - if ( vs!=null ) { - isNanValue = vs.isNan; - isValidValue = vs.isValid; - value = vs.value; - if (i.isNumeric && isValidValue && !isNanValue) { - currentValueDouble = (Double) value.getValue( Bindings.DOUBLE ); - if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) { - currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias; - Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble); - value = new MutableVariant( i.valueBinding, x ); - } - } else { - currentValueDouble = Double.NaN; - } - - } else { - isNanValue = true; - isValidValue = false; - value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() ); - currentValueDouble = Double.NaN; - } - - // Prev Value - double prevValueDouble = Double.NaN; - Variant pv = i.itemState.currentValue; - if (pv!=null && pv.getBinding() instanceof NumberBinding ) { - prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE ); - } else { - prevValueDouble = Double.NaN; - } - - // The time of the first sample of region - double firstTime = i.itemState.firstTime; - // Prev time - double prevTime = i.itemState.currentTime; - - boolean restep = prevTime == currentTime; - - // There was change in NaN - boolean isNaNChange = i.itemState.isNaN != isNanValue; - - // There was change in validity of the value - boolean isValidChange = i.itemState.isValid != isValidValue; - - //boolean wasValidValue = !i.itemState.isValid && isValidValue; - - boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE ); - - boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; - - boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE && - i.itemState.deadband==Double.MAX_VALUE && - i.binding.getComponentIndex("min")>=0 && - i.binding.getComponentIndex("max")>=0; - - // Deadband - if (isValidValue && i.isNumeric && i.itemState.count>=1) { - - if (i.itemState.isValid) { - double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE ); - double diff = Math.abs( currentValueDouble - firstValueDouble ); - if( hasDeadband ) - i.itemState.ooDeadband |= diff >= i.itemState.deadband; - else - i.itemState.ooDeadband |= diff != 0.0; - } else { - i.itemState.ooDeadband = true; - } - - // NaN change is deadband change - i.itemState.ooDeadband |= isNaNChange; - } else { - // The dead band cannot be have broken in first sample - i.itemState.ooDeadband = false; - } - - - // Interval - boolean ooInterval = true; - if ( hasInterval ) { - -// System.err.println(i.itemState.id + " " + firstTime + " " + currentTime); - - // First band is a special case - if(isFirstValue) { - // If this is the first sample store the current time as first time - if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime; - int sn1 = sampleNum(firstTime,i.itemState.interval); - int sn2 = sampleNum(currentTime,i.itemState.interval); - // Wait until next sample if this position is not already exactly at sample border - // Always start minmax immediately - if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return; - } else { - int sn1 = sampleNum(prevTime,i.itemState.interval); - int sn2 = sampleNum(currentTime,i.itemState.interval); -// System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime); - ooInterval = sn2 > sn1; - } - - -// -// double n = (currentTime+EPSILON) / i.itemState.interval; -// double n2 = n-Math.floor(n); -// -// -// double deltaTime = currentTime - firstTime; -// ooInterval = (deltaTime - i.itemState.interval) > -EPSILON; - } - - // Should new sample be created, or old sample extended / forwarded in time - boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange; - - // Update item min-max, avg, median - if ( isValidValue && i.isNumeric ) { - if (i.current.hasAvg() && i.isNumeric) { - if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0; - if (isValidValue && i.isNumeric) { - double timeDelta = currentTime - i.itemState.currentTime; - i.itemState.sum += timeDelta * currentValueDouble; - } - } - - if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) { - double deltaTime = currentTime - prevTime; - i.itemState.median.add( deltaTime, prevValueDouble ); - } - } - - // Increase the counter that tracks the number of samples acquired into the band - if (!restep) i.itemState.count++; - - // Remember values - i.itemState.currentTime = currentTime; - i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); - i.itemState.isNaN = isNanValue; - i.itemState.isValid = isValidValue; - - // Put discontinuation marker - if ( (wasDisabled && i.current.supportsNullValue())) { - // Extend marker - if (i.current.isNullValue()) { - if (i.current.hasEndTime()) { - i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime ); - } - - if (i.current.hasCount()) { - i.current.setCount( i.current.getCount()+1 ); - } - - // Add the band to the file stream - i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() ); - } - - // Add marker - i.current.reset(); - i.current.setValueNull(); - i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime); - i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime); - i.stream.addNoflush( i.binding, i.current.getSample() ); - } - - boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); - - if(i.stream.size() > 0 && !i.itemState.ooDeadband) { - - /// Extend existing band - // Extend endTime value - if (!i.hasEndTime) { - i.current.setTime( time.getBinding(), time.getValue() ); - } else { - i.current.setEndTime( time.getBinding(), time.getValue() ); - } - - if (isValidValue) { - // Set last value - if (i.current.hasLastValue()) { - i.current.setLastValue( value.getBinding(), value.getValue() ); - } - - // Add sum - if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) { - double duration = (currentTime - i.itemState.firstTime); - if(duration < 1e-9) { - i.current.setAvg( Bindings.DOUBLE, 0.0 ); - } else { - i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration); - } - } - - // Update min-max - if (i.current.hasMin() && i.isNumeric && !isNanValue) { - Binding minBinding = i.current.getMinBinding(); - Object prevMinValue = i.current.getMin(); - Object currentValueWithMinBinding = value.getValue( minBinding ); - int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding ); - if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding ); - } - if (i.current.hasMax() && i.isNumeric && !isNanValue) { - Binding maxBinding = i.current.getMaxBinding(); - Object prevMaxValue = i.current.getMax(); - Object currentValueWithMaxBinding = value.getValue( maxBinding ); - int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding ); - if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding ); - } - - // Update median - if (i.current.hasMedian() && i.isNumeric && !isNanValue) { - Double median = i.itemState.median.getMedian(); - i.current.setMedian( Bindings.DOUBLE, median); - } - } else { - i.current.setValueNull(); - } - - // Add count - if (i.current.hasCount()) { - i.current.setCount( i.itemState.count ); - } - - // Write entry to file stream - // if (i.current.getTimeDouble()<0) - // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample())); - i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() ); - } - - if (condition) { - - // Start a new value band - i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() ); - i.itemState.firstTime = currentTime; - i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); - i.itemState.currentTime = currentTime; - i.itemState.isNaN = isNanValue; - i.itemState.isValid = isValidValue; - i.itemState.sum = 0; - i.itemState.count = 1; - i.itemState.ooDeadband = false; - if (i.itemState.median != null) { - i.itemState.median.clear(); - if (!Double.isNaN(prevValueDouble) ) { - double deltaTime = currentTime - prevTime; - i.itemState.median.add( deltaTime, prevValueDouble ); - } - } - - // New Sample - i.current.reset(); - - // Start new band with current value & time - i.current.setTime( time.getBinding(), time.getValue() ); - - // Is valid value - if (i.current.hasEndTime()) { - i.current.setEndTime( time.getBinding(), time.getValue() ); - } - - if (isValidValue) { - i.current.setValue( value.getBinding(), value.getValue() ); - i.current.setMax( value.getBinding(), value.getValue() ); - i.current.setMin( value.getBinding(), value.getValue() ); - i.current.setLastValue( value.getBinding(), value.getValue() ); - i.current.setAvg( value.getBinding(), value.getValue() ); - i.current.setMedian( value.getBinding(), value.getValue() ); - } else { - i.current.setValueNull(); - } - - - // Sample Count=1 for new bands (time, value) "added" - if (i.current.hasCount()) { - i.current.setCount( 1 ); - } - - // Add the band to the file stream - i.stream.addNoflush( i.binding, i.current.getSample() ); - - } - - } - - @Override - public void flush() /*throws HistoryException*/ { - for (ActiveStream i : items) { - try { - i.stream.flush(); - } catch (AccessorException e) { - logger.log(Level.FINE, "File history flush failed.", e); -// throw new HistoryException(e); - } - } - } - - public void flush(Set ids) /*throws HistoryException*/ { - for (ActiveStream i : items) { - if (!ids.contains(i.itemState.id)) continue; - try { - i.stream.flush(); - } catch (AccessorException e) { - logger.log(Level.FINE, "File history flush failed.", e); -// throw new HistoryException(e); - } - } - } - - - @Override - public synchronized void close() - { - // Write items back to history - List beans = new ArrayList( state.itemStates.size() ); - for (CollectorState.Item itemState : state.itemStates.values()) { - try { - if (!history.exists(itemState.id)) continue; - Bean bean = history.getItem(itemState.id); - bean.readAvailableFields(itemState); - beans.add( setItemState(bean, itemState) ); - } catch (BindingException e) { - logger.log(Level.FINE, "Error writing a meta-data to history", e); - } catch (BindingConstructionException e) { - logger.log(Level.FINE, "Error writing a meta-data to history", e); - } catch (HistoryException e) { - logger.log(Level.FINE, "Error writing a meta-data to history", e); - } - } - try { - history.modify(beans.toArray( new Bean[beans.size()] )); - } catch (HistoryException e) { - logger.log(Level.FINE, "Error writing a meta-data to history", e); - } - state.itemStates.clear(); - - // Close streams - for (ActiveStream item : items) { - item.close(); - } - items.clear(); - } - - public class ActiveStream { - - /** Sample Stream */ - public StreamAccessor stream; - - /** Sample binding */ - public RecordBinding binding; - - /** Value binding (Generic) */ - public Binding valueBinding; - - /** Set true if this item has numeric value type */ - public boolean isNumeric; - - /** Set true if this item has end time */ - public boolean hasEndTime; - - /** Item entry in subscription state */ - public CollectorState.Item itemState; - - /** Interface to current last sample (in the stream) */ - public ValueBand current; - - /** Adapter that converts value to double value, null if not numeric */ - public Adapter valueToDouble, doubleToValue; - - void close() { - try { - if (stream != null) { - stream.close(); - } - stream = null; - } catch (AccessorException e) { - logger.log(Level.FINE, "Error closing stream", e); - } - } - - } - - @Override - public StreamAccessor openStream(String itemId, String mode) - throws HistoryException - { - return history.openStream(itemId, mode); - } - - @Override - public HistoryManager getHistory() { - return history; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2011 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.history.impl; + +import gnu.trove.set.hash.THashSet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.accessor.StreamAccessor; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.adapter.AdaptException; +import org.simantics.databoard.adapter.Adapter; +import org.simantics.databoard.adapter.AdapterConstructionException; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.DoubleBinding; +import org.simantics.databoard.binding.FloatBinding; +import org.simantics.databoard.binding.NumberBinding; +import org.simantics.databoard.binding.RecordBinding; +import org.simantics.databoard.binding.error.BindingConstructionException; +import org.simantics.databoard.binding.error.BindingException; +import org.simantics.databoard.binding.error.RuntimeBindingException; +import org.simantics.databoard.binding.impl.DoubleBindingDefault; +import org.simantics.databoard.binding.mutable.MutableVariant; +import org.simantics.databoard.binding.mutable.Variant; +import org.simantics.databoard.binding.reflection.VoidBinding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Component; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.type.NumberType; +import org.simantics.databoard.type.RecordType; +import org.simantics.databoard.util.Bean; +import org.simantics.history.Collector; +import org.simantics.history.HistoryException; +import org.simantics.history.HistoryManager; +import org.simantics.history.impl.CollectorState.Item; +import org.simantics.history.impl.CollectorState.VariableState; +import org.simantics.history.util.ValueBand; +import org.simantics.history.util.WeightedMedian; +import org.simantics.history.util.subscription.SubscriptionItem; + +/** + * Collector is used to record data to streams in history manager. + * + * For every item there is collector item state {@link CollectorState.Item}, + * that contains data required in writing process. This state is read and + * written to history manager item (bean) as meta-data. + * + * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. + * + * @author toni.kalajainen + * + */ +public class CollectorImpl implements Collector { + + /** Logger */ + static Logger logger = Logger.getLogger(CollectorImpl.class.getName()); + + public static final int MEDIAN_LIMIT = 500; + + public static final double EPSILON = 1e-2; + + /** Collector state */ + CollectorState state; + + /** Open streams, corresponds to subscription items */ + CopyOnWriteArrayList items = new CopyOnWriteArrayList(); + + /** Associated history state */ + HistoryManager history; + + /** True when all items are open */ + boolean itemsOpen = true; + + public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep; + + public FlushPolicy getFlushPolicy() { + return flushPolicy; + } + + public void setFlushPolicy(FlushPolicy flushPolicy) { + this.flushPolicy = flushPolicy; + } + + /** + * Create a new collector that is associated with a history manager. + * + * @param history + * @throws HistoryException + */ + public CollectorImpl(HistoryManager history) { + this.history = history; + this.state = new CollectorState(); + this.state.init(); + this.state.id = UUID.randomUUID().toString(); + } + + /** + * Create a new collector that is associated with a history manager. + * + * @param history + * @param previousState + * @throws HistoryException + */ + public CollectorImpl(HistoryManager history, CollectorState previousState) { + this.history = history; + this.state = (CollectorState) previousState.clone(); + } + + @Override + public synchronized void addItem(Bean item) throws HistoryException { + try { + String id = (String) item.getField("id"); + + // Item State + CollectorState.Item itemState = null; + if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM); + if ( itemState == null ) itemState = state.itemStates.get( id ); + if ( itemState == null ) { + // Create new item + itemState = new CollectorState.Item(); + itemState.init(); + itemState.readAvailableFields( item ); + // Guess values from the existing data + if (history.exists(id)) { + readItemState(history, item, itemState); + } + } + + addItem(id, itemState); + } catch (BindingException e) { + throw new HistoryException( e ); + } + } + + public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException { + try { + // Item State + state.itemStates.put(id, itemState); + itemsOpen = false; + + // Time + if (state.time.type() instanceof NumberType) { + double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE); + if ( !Double.isNaN(itemState.currentTime) ) { + if ( itemState.currentTime > time ) { + state.time.setValue(Bindings.DOUBLE, itemState.currentTime); + } + } + } + } catch (AdaptException e) { + throw new HistoryException( e ); + } + } + + public synchronized void addItems(Bean...items) throws HistoryException { + for (Bean item : items) addItem(item); + } + + @Override + public synchronized void removeItem(String id) throws HistoryException { + CollectorState.Item itemState = state.itemStates.remove( id ); + if (itemState!=null && history.exists(id)) { + Bean bean = history.getItem(id); + bean.readAvailableFields(itemState); + try { + bean = setItemState(bean, itemState); + history.modify(bean); + } catch (BindingException e) { + // Unexpected + logger.log(Level.FINE, "Failed to update bean", e); + } catch (BindingConstructionException e) { + // Unexpected + logger.log(Level.FINE, "Failed to update bean", e); + } catch (HistoryException e) { + logger.log(Level.FINE, "Failed to update history item meta-data", e); + } + } + + ActiveStream as = getStream(id); + if ( as != null ) { + items.remove( as ); + as.close(); + } + } + + ActiveStream getStream(String id) { + for (ActiveStream as : items) { + if (as.itemState.id.equals(id)) return as; + } + return null; + } + + /** + * Set (or Add) item's configuration. + * + * @param item item + * @throws HistoryException + */ + public synchronized void setItem(Bean item) throws HistoryException { + try { + String id = (String) item.getField("id"); + + CollectorState.Item itemState = state.itemStates.get( id ); + if ( itemState==null ) { + addItem(item); + return; + } + itemState.readAvailableFields(item); + } catch (BindingException e) { + throw new HistoryException( e ); + } + } + + + /** + * Set item state to bean. If the bean does not have field collectorState, + * a new bean class is created, and field added. + * + * @param config + * @param state + * @return same or new bean + * @throws BindingException + * @throws BindingConstructionException + */ + public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException + { + Binding binding = config.getFieldBinding("collectorState"); + if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) { + RecordType rt = config.getBinding().type(); + RecordType newType = new RecordType(); + for (int i=0; i0; + + // Value adapter + if ( x.isNumeric ) + try { + x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false); + x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false); + } catch (AdapterConstructionException e1) { + x.valueToDouble = null; + } + + // Create valueband, and read last entry into memory + try { + Object currentEntry = x.binding.createDefault(); + if (x.stream.size() >= 1) { + x.stream.get(x.stream.size() - 1, x.binding, currentEntry); + } + x.current = new ValueBand(x.binding); + x.current.setSample(currentEntry); + } catch (AccessorException e) { + throw new HistoryException( + "Could not read sample from stream file, " + e, e); + } catch (BindingException e) { + throw new HistoryException( + "Could not read sample from stream file, " + e, e); + } + + // Item State + x.itemState = itemState; + + // Median + if (x.isNumeric && x.current.hasMedian() + && x.itemState.median == null) { + x.itemState.median = new WeightedMedian( MEDIAN_LIMIT ); + } + + return x; + } + + /** + * Read itemState from history. Item must exist. + * + * @param si subscription item + * @return state item + * @throws RuntimeBindingException + * @throws HistoryException + */ + static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException { + // Create item state + item.readAvailableFields(si); + // Read state from items + StreamAccessor sa = history.openStream(item.id, "r"); + try { + Datatype sampleType = sa.type().componentType; + + Binding sampleBinding = Bindings.getBinding(sampleType); + if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")"); + Object sample = sampleBinding.createDefaultUnchecked(); + ValueBand v = new ValueBand(sampleBinding, sample); + item.count = sa.size(); + if ( item.count>0 ) { + sa.get(0, sampleBinding, sample); + item.firstTime = v.getTimeDouble(); + Binding b = v.getValueBinding(); + Object value = v.getValue(b); + item.firstValue.setValue(b, value); + + sa.get(item.count-1, sampleBinding, sample); + item.currentTime = v.getTimeDouble(); + b = v.getValueBinding(); + value = v.getValue(b); + item.currentValue.setValue(b, value); + + if ( item.currentValue.getBinding() instanceof DoubleBinding ) { + DoubleBinding db = (DoubleBinding) item.currentValue.getBinding(); + value = item.currentValue.getValue(db); + item.isNaN = Double.isNaN( (Double) value ); + } + } + + } catch (AccessorException e) { + throw new HistoryException( e ); + } catch (AdaptException e) { + throw new HistoryException( e ); + } finally { + try { sa.close(); } catch (AccessorException e) {} + } + } + + @Override + public CollectorState getState() { + return (CollectorState) state.clone(); + } + + @Override + public void setState(Bean newState) { + if (newState == null) + return; + if (state == null) { + state = new CollectorState(); + state.init(); + } + state.readAvailableFields(newState); + + // Make sure that openAllItems ensures that all streams are opened properly. + // This is because setItem will not invoke addItem if collectorState exists + // for an item and therefore itemsOpen might not otherwise be set to false. + itemsOpen = false; + } + + @Override + public Object getTime(Binding binding) throws HistoryException { + MutableVariant v = state.time; + if (v==null) return null; + try { + return v.getValue(binding); + } catch (AdaptException e) { + throw new HistoryException(e); + } + } + + @Override + public Object getValue(String id, Binding binding) throws HistoryException { + VariableState vs = state.values.get(id); + if (vs==null) return null; + + if (!vs.isValid) return null; + + try { + return vs.value.getValue(binding); + } catch (AdaptException e) { + throw new HistoryException(e); + } + } + + void openAllItems() throws HistoryException { + if (itemsOpen) return; + + // Assert all items are open + // 1. Collect all items that still need to be opened. + Set itemsToOpen = new THashSet( state.itemStates.keySet() ); + for (ActiveStream as : items) + itemsToOpen.remove(as.itemState.id); + + // 2. Open missing streams + List opened = new ArrayList(itemsToOpen.size()); + for (String itemId : itemsToOpen) { + CollectorState.Item item = state.itemStates.get( itemId ); + ActiveStream as = openStream( item ); + opened.add(as); + } + + // 3. Update list of open streams + items.addAll(opened); + itemsOpen = true; + } + + private void updateDt(NumberBinding binding, Object time) { + try { + Double current = (Double)getTime(Bindings.DOUBLE); + if(current != null) { + double t = binding.getValue(time).doubleValue(); + state.dT = t-current; + if(state.dT > 0) return; + } + } catch (Throwable t) { + } + state.dT = 1.0; + } + + @Override + public void beginStep(NumberBinding binding, Object time) throws HistoryException { + openAllItems(); + updateDt(binding, time); + state.time.setValue(binding, time); + } + + @Override + public void setValue(String id, Binding binding, Object value) throws HistoryException { + VariableState vs = state.values.get( id ); + if (vs == null) { + vs = new VariableState(); + vs.value = new MutableVariant(binding, value); + state.values.put(id, vs); + } + + if (value==null) { + vs.value.setValue(VoidBinding.VOID_BINDING, null); + vs.isNan = true; + vs.isValid = false; + return; + } + + vs.value.setValue(binding, value); + vs.isValid = true; + + if (binding instanceof DoubleBinding) { + DoubleBinding db = (DoubleBinding) binding; + try { + double _v = db.getValue_(value); + vs.isNan = Double.isNaN( _v ); + } catch (BindingException e) { + vs.isNan = true; + vs.isValid = false; + } + } else if (binding instanceof FloatBinding) { + FloatBinding db = (FloatBinding) binding; + try { + float _v = db.getValue_(value); + vs.isNan = Float.isNaN( _v ); + } catch (BindingException e) { + vs.isNan = true; + vs.isValid = false; + } + } else { + vs.isNan = false; + } + } + + @Override + public boolean getValue(String id, MutableVariant result) { + VariableState vs = state.values.get(id); + if (vs==null) return false; + + if (!vs.isValid) return false; + + result.setValue(vs.value.getBinding(), vs.value.getValue()); + return true; + } + + @Override + public void endStep() throws HistoryException { + // Write values to streams + for (ActiveStream i : items) + { + try { + putValueToStream(i); + if (flushPolicy == FlushPolicy.FlushOnEveryStep) { + i.stream.flush(); + } + } catch (AdaptException e) { + throw new HistoryException( e ); +// FileAccessor fa = (FileAccessor) i.stream; +// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); + } catch (AccessorException e) { + throw new HistoryException( e ); +// FileAccessor fa = (FileAccessor) i.stream; +// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); + } catch (BindingException e) { + throw new HistoryException( e ); +// FileAccessor fa = (FileAccessor) i.stream; +// logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); + } + } + } + + private int sampleNum(double time, double interval) { + // 1.0 is needed to cover the case of 0.0 + return (int)(1.0 + (time+EPSILON*state.dT)/interval); + } + + private boolean isExactInterval(double time, double interval) { + return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval); + } + + /** + * This method puts a time,value to a stream. + * + * The stream is "packed" when the new value is equal to previous, or the + * value is within deadband (in comparison to the first value), or if + * the interval zone is not exceeded. + * + * The stream can be packed two different ways, depending on the sample + * format. The sample format can be (a) single value or (b) a time range of + * multiple values, Ranged Sample. + * + * Single value is packed by removing middle values, in other words, + * moving the timecode of the last sample. + * + * Ranged sample is packed by extending the endTime code of the last sample. + * Other fields, such as avg, sum, count, lastValue are also updated. + * + * @param i + * @throws AdaptException + * @throws AccessorException + * @throws BindingException + * @throws HistoryException + */ + private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException + { + // Current Time + Variant time = state.time; + double currentTime = Double.NaN; + if (time != null) { + Double x = (Double) time.getValue(Bindings.DOUBLE); + currentTime = x == null ? Double.NaN : x; + } + + // Is first value + boolean isFirstValue = i.stream.size() == 0; + // Is this item disabled + boolean isDisabled = !i.itemState.enabled; + // Discontinuety, first value after being disabled + boolean wasDisabled = !isFirstValue && i.itemState.isDisabled; + + // Update currently disabled status + i.itemState.isDisabled = isDisabled; + + // Update time codes + if (isDisabled) { + i.itemState.count++; + i.itemState.lastDisabledTime = currentTime; + if (!wasDisabled) i.itemState.firstDisabledTime = currentTime; + return; + } + + // Is nan value, for doubles and floats + boolean isNanValue; + // Is valid value + boolean isValidValue; + // Value as variant + MutableVariant value; + // Get current value as double + double currentValueDouble; + + VariableState vs = state.values.get( i.itemState.variableId ); + if ( vs!=null ) { + isNanValue = vs.isNan; + isValidValue = vs.isValid; + value = vs.value; + if (i.isNumeric && isValidValue && !isNanValue) { + currentValueDouble = (Double) value.getValue( Bindings.DOUBLE ); + if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) { + currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias; + Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble); + value = new MutableVariant( i.valueBinding, x ); + } + } else { + currentValueDouble = Double.NaN; + } + + } else { + isNanValue = true; + isValidValue = false; + value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() ); + currentValueDouble = Double.NaN; + } + + // Prev Value + double prevValueDouble = Double.NaN; + Variant pv = i.itemState.currentValue; + if (pv!=null && pv.getBinding() instanceof NumberBinding ) { + prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE ); + } else { + prevValueDouble = Double.NaN; + } + + // The time of the first sample of region + double firstTime = i.itemState.firstTime; + // Prev time + double prevTime = i.itemState.currentTime; + + boolean restep = prevTime == currentTime; + + // There was change in NaN + boolean isNaNChange = i.itemState.isNaN != isNanValue; + + // There was change in validity of the value + boolean isValidChange = i.itemState.isValid != isValidValue; + + //boolean wasValidValue = !i.itemState.isValid && isValidValue; + + boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE ); + + boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; + + boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE && + i.itemState.deadband==Double.MAX_VALUE && + i.binding.getComponentIndex("min")>=0 && + i.binding.getComponentIndex("max")>=0; + + // Deadband + if (isValidValue && i.isNumeric && i.itemState.count>=1) { + + if (i.itemState.isValid) { + double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE ); + double diff = Math.abs( currentValueDouble - firstValueDouble ); + if( hasDeadband ) + i.itemState.ooDeadband |= diff >= i.itemState.deadband; + else + i.itemState.ooDeadband |= diff != 0.0; + } else { + i.itemState.ooDeadband = true; + } + + // NaN change is deadband change + i.itemState.ooDeadband |= isNaNChange; + } else { + // The dead band cannot be have broken in first sample + i.itemState.ooDeadband = false; + } + + + // Interval + boolean ooInterval = true; + if ( hasInterval ) { + +// System.err.println(i.itemState.id + " " + firstTime + " " + currentTime); + + // First band is a special case + if(isFirstValue) { + // If this is the first sample store the current time as first time + if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime; + int sn1 = sampleNum(firstTime,i.itemState.interval); + int sn2 = sampleNum(currentTime,i.itemState.interval); + // Wait until next sample if this position is not already exactly at sample border + // Always start minmax immediately + if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return; + } else { + int sn1 = sampleNum(prevTime,i.itemState.interval); + int sn2 = sampleNum(currentTime,i.itemState.interval); +// System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime); + ooInterval = sn2 > sn1; + } + + +// +// double n = (currentTime+EPSILON) / i.itemState.interval; +// double n2 = n-Math.floor(n); +// +// +// double deltaTime = currentTime - firstTime; +// ooInterval = (deltaTime - i.itemState.interval) > -EPSILON; + } + + // Should new sample be created, or old sample extended / forwarded in time + boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange; + + // Update item min-max, avg, median + if ( isValidValue && i.isNumeric ) { + if (i.current.hasAvg() && i.isNumeric) { + if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0; + if (isValidValue && i.isNumeric) { + double timeDelta = currentTime - i.itemState.currentTime; + i.itemState.sum += timeDelta * currentValueDouble; + } + } + + if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) { + double deltaTime = currentTime - prevTime; + i.itemState.median.add( deltaTime, prevValueDouble ); + } + } + + // Increase the counter that tracks the number of samples acquired into the band + if (!restep) i.itemState.count++; + + // Remember values + i.itemState.currentTime = currentTime; + i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); + i.itemState.isNaN = isNanValue; + i.itemState.isValid = isValidValue; + + // Put discontinuation marker + if ( (wasDisabled && i.current.supportsNullValue())) { + // Extend marker + if (i.current.isNullValue()) { + if (i.current.hasEndTime()) { + i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime ); + } + + if (i.current.hasCount()) { + i.current.setCount( i.current.getCount()+1 ); + } + + // Add the band to the file stream + i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() ); + } + + // Add marker + i.current.reset(); + i.current.setValueNull(); + i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime); + i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime); + i.stream.addNoflush( i.binding, i.current.getSample() ); + } + + boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); + + if (i.stream.size() > 0) { + + /// Extend existing band + // Extend endTime value + if (!i.hasEndTime) { + i.current.setTime( time.getBinding(), time.getValue() ); + } else { + i.current.setEndTime( time.getBinding(), time.getValue() ); + } + if (i.itemState.ooDeadband || isMinMaxFormat) { + if (isValidValue) { + // Set last value + if (i.current.hasLastValue()) { + i.current.setLastValue( value.getBinding(), value.getValue() ); + } + + if (i.isNumeric && !isNanValue) { + // Add sum + if (i.current.hasAvg() && !restep) { + double duration = (currentTime - i.itemState.firstTime); + if(duration < 1e-9) { + i.current.setAvg( Bindings.DOUBLE, 0.0 ); + } else { + i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration); + } + } + + // Update min-max + if (i.current.hasMin()) { + Binding minBinding = i.current.getMinBinding(); + Object prevMinValue = i.current.getMin(); + Object currentValueWithMinBinding = value.getValue( minBinding ); + int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding ); + if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding ); + } + if (i.current.hasMax()) { + Binding maxBinding = i.current.getMaxBinding(); + Object prevMaxValue = i.current.getMax(); + Object currentValueWithMaxBinding = value.getValue( maxBinding ); + int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding ); + if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding ); + } + + // Update median + if (i.current.hasMedian()) { + Double median = i.itemState.median.getMedian(); + i.current.setMedian( Bindings.DOUBLE, median); + } + } + } else { + i.current.setValueNull(); + } + + // Update count + if (i.current.hasCount()) { + i.current.setCount( i.itemState.count ); + } + } + + // Write entry to file stream + // if (i.current.getTimeDouble()<0) + // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample())); + i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() ); + //System.err.println(i.itemState.id + " update " + i.current.getSample()); + } + + if (condition) { + + // Start a new value band + i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() ); + i.itemState.firstTime = currentTime; + i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); + i.itemState.currentTime = currentTime; + i.itemState.isNaN = isNanValue; + i.itemState.isValid = isValidValue; + i.itemState.sum = 0; + i.itemState.count = 1; + i.itemState.ooDeadband = false; + if (i.itemState.median != null) { + i.itemState.median.clear(); + if (!Double.isNaN(prevValueDouble) ) { + double deltaTime = currentTime - prevTime; + i.itemState.median.add( deltaTime, prevValueDouble ); + } + } + + // New Sample + i.current.reset(); + + // Start new band with current value & time + i.current.setTime( time.getBinding(), time.getValue() ); + + // Is valid value + if (i.current.hasEndTime()) { + i.current.setEndTime( time.getBinding(), time.getValue() ); + } + + if (isValidValue) { + i.current.setValue( value.getBinding(), value.getValue() ); + i.current.setMax( value.getBinding(), value.getValue() ); + i.current.setMin( value.getBinding(), value.getValue() ); + i.current.setLastValue( value.getBinding(), value.getValue() ); + i.current.setAvg( value.getBinding(), value.getValue() ); + i.current.setMedian( value.getBinding(), value.getValue() ); + } else { + i.current.setValueNull(); + } + + + // Sample Count=1 for new bands (time, value) "added" + if (i.current.hasCount()) { + i.current.setCount( 1 ); + } + + // Add the band to the file stream + i.stream.addNoflush( i.binding, i.current.getSample() ); + //System.err.println(i.itemState.id + " add " + i.current.getSample()); + + } + + } + + @Override + public void flush() /*throws HistoryException*/ { + for (ActiveStream i : items) { + try { + i.stream.flush(); + } catch (AccessorException e) { + logger.log(Level.FINE, "File history flush failed.", e); +// throw new HistoryException(e); + } + } + } + + public void flush(Set ids) /*throws HistoryException*/ { + for (ActiveStream i : items) { + if (!ids.contains(i.itemState.id)) continue; + try { + i.stream.flush(); + } catch (AccessorException e) { + logger.log(Level.FINE, "File history flush failed.", e); +// throw new HistoryException(e); + } + } + } + + + @Override + public synchronized void close() + { + // Write items back to history + List beans = new ArrayList( state.itemStates.size() ); + for (CollectorState.Item itemState : state.itemStates.values()) { + try { + if (!history.exists(itemState.id)) continue; + Bean bean = history.getItem(itemState.id); + bean.readAvailableFields(itemState); + beans.add( setItemState(bean, itemState) ); + } catch (BindingException e) { + logger.log(Level.FINE, "Error writing a meta-data to history", e); + } catch (BindingConstructionException e) { + logger.log(Level.FINE, "Error writing a meta-data to history", e); + } catch (HistoryException e) { + logger.log(Level.FINE, "Error writing a meta-data to history", e); + } + } + try { + history.modify(beans.toArray( new Bean[beans.size()] )); + } catch (HistoryException e) { + logger.log(Level.FINE, "Error writing a meta-data to history", e); + } + state.itemStates.clear(); + + // Close streams + for (ActiveStream item : items) { + item.close(); + } + items.clear(); + } + + public class ActiveStream { + + /** Sample Stream */ + public StreamAccessor stream; + + /** Sample binding */ + public RecordBinding binding; + + /** Value binding (Generic) */ + public Binding valueBinding; + + /** Set true if this item has numeric value type */ + public boolean isNumeric; + + /** Set true if this item has end time */ + public boolean hasEndTime; + + /** Item entry in subscription state */ + public CollectorState.Item itemState; + + /** Interface to current last sample (in the stream) */ + public ValueBand current; + + /** Adapter that converts value to double value, null if not numeric */ + public Adapter valueToDouble, doubleToValue; + + void close() { + try { + if (stream != null) { + stream.close(); + } + stream = null; + } catch (AccessorException e) { + logger.log(Level.FINE, "Error closing stream", e); + } + } + + } + + @Override + public StreamAccessor openStream(String itemId, String mode) + throws HistoryException + { + return history.openStream(itemId, mode); + } + + @Override + public HistoryManager getHistory() { + return history; + } + +}