/******************************************************************************* * Copyright (c) 2007, 2011 Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.history.impl; import gnu.trove.set.hash.THashSet; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import java.util.logging.Logger; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.StreamAccessor; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.adapter.AdaptException; import org.simantics.databoard.adapter.Adapter; import org.simantics.databoard.adapter.AdapterConstructionException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.DoubleBinding; import org.simantics.databoard.binding.FloatBinding; import org.simantics.databoard.binding.NumberBinding; import org.simantics.databoard.binding.RecordBinding; import org.simantics.databoard.binding.error.BindingConstructionException; import org.simantics.databoard.binding.error.BindingException; import org.simantics.databoard.binding.error.RuntimeBindingException; import org.simantics.databoard.binding.impl.DoubleBindingDefault; import org.simantics.databoard.binding.mutable.MutableVariant; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.binding.reflection.VoidBinding; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Component; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.type.NumberType; import org.simantics.databoard.type.RecordType; import org.simantics.databoard.util.Bean; import org.simantics.history.Collector; import org.simantics.history.HistoryException; import org.simantics.history.HistoryManager; import org.simantics.history.impl.CollectorState.Item; import org.simantics.history.impl.CollectorState.VariableState; import org.simantics.history.util.ValueBand; import org.simantics.history.util.WeightedMedian; import org.simantics.history.util.subscription.SubscriptionItem; /** * Collector is used to record data to streams in history manager. * * For every item there is collector item state {@link CollectorState.Item}, * that contains data required in writing process. This state is read and * written to history manager item (bean) as meta-data. * * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. * * @author toni.kalajainen * */ public class CollectorImpl implements Collector { /** Logger */ static Logger logger = Logger.getLogger(CollectorImpl.class.getName()); public static final int MEDIAN_LIMIT = 500; public static final double EPSILON = 1e-2; /** Collector state */ CollectorState state; /** Open streams, corresponds to subscription items */ CopyOnWriteArrayList items = new CopyOnWriteArrayList(); /** Associated history state */ HistoryManager history; /** True when all items are open */ boolean itemsOpen = true; public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep; public FlushPolicy getFlushPolicy() { return flushPolicy; } public void setFlushPolicy(FlushPolicy flushPolicy) { this.flushPolicy = flushPolicy; } /** * Create a new collector that is associated with a history manager. * * @param history * @throws HistoryException */ public CollectorImpl(HistoryManager history) { this.history = history; this.state = new CollectorState(); this.state.init(); this.state.id = UUID.randomUUID().toString(); } /** * Create a new collector that is associated with a history manager. * * @param history * @param previousState * @throws HistoryException */ public CollectorImpl(HistoryManager history, CollectorState previousState) { this.history = history; this.state = (CollectorState) previousState.clone(); } @Override public synchronized void addItem(Bean item) throws HistoryException { try { String id = (String) item.getField("id"); // Item State CollectorState.Item itemState = null; if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM); if ( itemState == null ) itemState = state.itemStates.get( id ); if ( itemState == null ) { // Create new item itemState = new CollectorState.Item(); itemState.init(); itemState.readAvailableFields( item ); // Guess values from the existing data if (history.exists(id)) { readItemState(history, item, itemState); } } addItem(id, itemState); } catch (BindingException e) { throw new HistoryException( e ); } } public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException { try { // Item State state.itemStates.put(id, itemState); itemsOpen = false; // Time if (state.time.type() instanceof NumberType) { double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE); if ( !Double.isNaN(itemState.currentTime) ) { if ( itemState.currentTime > time ) { state.time.setValue(Bindings.DOUBLE, itemState.currentTime); } } } } catch (AdaptException e) { throw new HistoryException( e ); } } public synchronized void addItems(Bean...items) throws HistoryException { for (Bean item : items) addItem(item); } @Override public synchronized void removeItem(String id) throws HistoryException { CollectorState.Item itemState = state.itemStates.remove( id ); if (itemState!=null && history.exists(id)) { Bean bean = history.getItem(id); bean.readAvailableFields(itemState); try { bean = setItemState(bean, itemState); history.modify(bean); } catch (BindingException e) { // Unexpected logger.log(Level.FINE, "Failed to update bean", e); } catch (BindingConstructionException e) { // Unexpected logger.log(Level.FINE, "Failed to update bean", e); } catch (HistoryException e) { logger.log(Level.FINE, "Failed to update history item meta-data", e); } } ActiveStream as = getStream(id); if ( as != null ) { items.remove( as ); as.close(); } } ActiveStream getStream(String id) { for (ActiveStream as : items) { if (as.itemState.id.equals(id)) return as; } return null; } /** * Set (or Add) item's configuration. * * @param item item * @throws HistoryException */ public synchronized void setItem(Bean item) throws HistoryException { try { String id = (String) item.getField("id"); CollectorState.Item itemState = state.itemStates.get( id ); if ( itemState==null ) { addItem(item); return; } itemState.readAvailableFields(item); } catch (BindingException e) { throw new HistoryException( e ); } } /** * Set item state to bean. If the bean does not have field collectorState, * a new bean class is created, and field added. * * @param config * @param state * @return same or new bean * @throws BindingException * @throws BindingConstructionException */ public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException { Binding binding = config.getFieldBinding("collectorState"); if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) { RecordType rt = config.getBinding().type(); RecordType newType = new RecordType(); for (int i=0; i0; // Value adapter if ( x.isNumeric ) try { x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false); x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false); } catch (AdapterConstructionException e1) { x.valueToDouble = null; } // Create valueband, and read last entry into memory try { Object currentEntry = x.binding.createDefault(); if (x.stream.size() >= 1) { x.stream.get(x.stream.size() - 1, x.binding, currentEntry); } x.current = new ValueBand(x.binding); x.current.setSample(currentEntry); } catch (AccessorException e) { throw new HistoryException( "Could not read sample from stream file, " + e, e); } catch (BindingException e) { throw new HistoryException( "Could not read sample from stream file, " + e, e); } // Item State x.itemState = itemState; // Median if (x.isNumeric && x.current.hasMedian() && x.itemState.median == null) { x.itemState.median = new WeightedMedian( MEDIAN_LIMIT ); } return x; } /** * Read itemState from history. Item must exist. * * @param si subscription item * @return state item * @throws RuntimeBindingException * @throws HistoryException */ static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException { // Create item state item.readAvailableFields(si); // Read state from items StreamAccessor sa = history.openStream(item.id, "r"); try { Datatype sampleType = sa.type().componentType; Binding sampleBinding = Bindings.getBinding(sampleType); if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")"); Object sample = sampleBinding.createDefaultUnchecked(); ValueBand v = new ValueBand(sampleBinding, sample); item.count = sa.size(); if ( item.count>0 ) { sa.get(0, sampleBinding, sample); item.firstTime = v.getTimeDouble(); Binding b = v.getValueBinding(); Object value = v.getValue(b); item.firstValue.setValue(b, value); sa.get(item.count-1, sampleBinding, sample); item.currentTime = v.getTimeDouble(); b = v.getValueBinding(); value = v.getValue(b); item.currentValue.setValue(b, value); if ( item.currentValue.getBinding() instanceof DoubleBinding ) { DoubleBinding db = (DoubleBinding) item.currentValue.getBinding(); value = item.currentValue.getValue(db); item.isNaN = Double.isNaN( (Double) value ); } } } catch (AccessorException e) { throw new HistoryException( e ); } catch (AdaptException e) { throw new HistoryException( e ); } finally { try { sa.close(); } catch (AccessorException e) {} } } @Override public CollectorState getState() { return (CollectorState) state.clone(); } @Override public void setState(Bean newState) { if (newState == null) return; if (state == null) { state = new CollectorState(); state.init(); } state.readAvailableFields(newState); // Make sure that openAllItems ensures that all streams are opened properly. // This is because setItem will not invoke addItem if collectorState exists // for an item and therefore itemsOpen might not otherwise be set to false. itemsOpen = false; } @Override public Object getTime(Binding binding) throws HistoryException { MutableVariant v = state.time; if (v==null) return null; try { return v.getValue(binding); } catch (AdaptException e) { throw new HistoryException(e); } } @Override public Object getValue(String id, Binding binding) throws HistoryException { VariableState vs = state.values.get(id); if (vs==null) return null; if (!vs.isValid) return null; try { return vs.value.getValue(binding); } catch (AdaptException e) { throw new HistoryException(e); } } void openAllItems() throws HistoryException { if (itemsOpen) return; // Assert all items are open // 1. Collect all items that still need to be opened. Set itemsToOpen = new THashSet( state.itemStates.keySet() ); for (ActiveStream as : items) itemsToOpen.remove(as.itemState.id); // 2. Open missing streams List opened = new ArrayList(itemsToOpen.size()); for (String itemId : itemsToOpen) { CollectorState.Item item = state.itemStates.get( itemId ); ActiveStream as = openStream( item ); opened.add(as); } // 3. Update list of open streams items.addAll(opened); itemsOpen = true; } private void updateDt(NumberBinding binding, Object time) { try { Double current = (Double)getTime(Bindings.DOUBLE); if(current != null) { double t = binding.getValue(time).doubleValue(); state.dT = t-current; if(state.dT > 0) return; } } catch (Throwable t) { } state.dT = 1.0; } @Override public void beginStep(NumberBinding binding, Object time) throws HistoryException { openAllItems(); updateDt(binding, time); state.time.setValue(binding, time); } @Override public void setValue(String id, Binding binding, Object value) throws HistoryException { VariableState vs = state.values.get( id ); if (vs == null) { vs = new VariableState(); vs.value = new MutableVariant(binding, value); state.values.put(id, vs); } if (value==null) { vs.value.setValue(VoidBinding.VOID_BINDING, null); vs.isNan = true; vs.isValid = false; return; } vs.value.setValue(binding, value); vs.isValid = true; if (binding instanceof DoubleBinding) { DoubleBinding db = (DoubleBinding) binding; try { double _v = db.getValue_(value); vs.isNan = Double.isNaN( _v ); } catch (BindingException e) { vs.isNan = true; vs.isValid = false; } } else if (binding instanceof FloatBinding) { FloatBinding db = (FloatBinding) binding; try { float _v = db.getValue_(value); vs.isNan = Float.isNaN( _v ); } catch (BindingException e) { vs.isNan = true; vs.isValid = false; } } else { vs.isNan = false; } } @Override public boolean getValue(String id, MutableVariant result) { VariableState vs = state.values.get(id); if (vs==null) return false; if (!vs.isValid) return false; result.setValue(vs.value.getBinding(), vs.value.getValue()); return true; } @Override public void endStep() throws HistoryException { // Write values to streams for (ActiveStream i : items) { try { putValueToStream(i); if (flushPolicy == FlushPolicy.FlushOnEveryStep) { i.stream.flush(); } } catch (AdaptException e) { throw new HistoryException( e ); // FileAccessor fa = (FileAccessor) i.stream; // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); } catch (AccessorException e) { throw new HistoryException( e ); // FileAccessor fa = (FileAccessor) i.stream; // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); } catch (BindingException e) { throw new HistoryException( e ); // FileAccessor fa = (FileAccessor) i.stream; // logger.log(Level.FINE, "Error writing a stream "+fa.file(), e); } } } private int sampleNum(double time, double interval) { // 1.0 is needed to cover the case of 0.0 return (int)(1.0 + (time+EPSILON*state.dT)/interval); } private boolean isExactInterval(double time, double interval) { return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval); } /** * This method puts a time,value to a stream. * * The stream is "packed" when the new value is equal to previous, or the * value is within deadband (in comparison to the first value), or if * the interval zone is not exceeded. * * The stream can be packed two different ways, depending on the sample * format. The sample format can be (a) single value or (b) a time range of * multiple values, Ranged Sample. * * Single value is packed by removing middle values, in other words, * moving the timecode of the last sample. * * Ranged sample is packed by extending the endTime code of the last sample. * Other fields, such as avg, sum, count, lastValue are also updated. * * @param i * @throws AdaptException * @throws AccessorException * @throws BindingException * @throws HistoryException */ private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException { // Current Time Variant time = state.time; double currentTime = Double.NaN; if (time != null) { Double x = (Double) time.getValue(Bindings.DOUBLE); currentTime = x == null ? Double.NaN : x; } // Is first value boolean isFirstValue = i.stream.size() == 0; // Is this item disabled boolean isDisabled = !i.itemState.enabled; // Discontinuety, first value after being disabled boolean wasDisabled = !isFirstValue && i.itemState.isDisabled; // Update currently disabled status i.itemState.isDisabled = isDisabled; // Update time codes if (isDisabled) { i.itemState.count++; i.itemState.lastDisabledTime = currentTime; if (!wasDisabled) i.itemState.firstDisabledTime = currentTime; return; } // Is nan value, for doubles and floats boolean isNanValue; // Is valid value boolean isValidValue; // Value as variant MutableVariant value; // Get current value as double double currentValueDouble; VariableState vs = state.values.get( i.itemState.variableId ); if ( vs!=null ) { isNanValue = vs.isNan; isValidValue = vs.isValid; value = vs.value; if (i.isNumeric && isValidValue && !isNanValue) { currentValueDouble = (Double) value.getValue( Bindings.DOUBLE ); if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) { currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias; Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble); value = new MutableVariant( i.valueBinding, x ); } } else { currentValueDouble = Double.NaN; } } else { isNanValue = true; isValidValue = false; value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() ); currentValueDouble = Double.NaN; } // Prev Value double prevValueDouble = Double.NaN; Variant pv = i.itemState.currentValue; if (pv!=null && pv.getBinding() instanceof NumberBinding ) { prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE ); } else { prevValueDouble = Double.NaN; } // The time of the first sample of region double firstTime = i.itemState.firstTime; // Prev time double prevTime = i.itemState.currentTime; boolean restep = prevTime == currentTime; // There was change in NaN boolean isNaNChange = i.itemState.isNaN != isNanValue; // There was change in validity of the value boolean isValidChange = i.itemState.isValid != isValidValue; //boolean wasValidValue = !i.itemState.isValid && isValidValue; boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE ); boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE && i.itemState.deadband==Double.MAX_VALUE && i.binding.getComponentIndex("min")>=0 && i.binding.getComponentIndex("max")>=0; // Deadband if (isValidValue && i.isNumeric && i.itemState.count>=1) { if (i.itemState.isValid) { double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE ); double diff = Math.abs( currentValueDouble - firstValueDouble ); if( hasDeadband ) i.itemState.ooDeadband |= diff >= i.itemState.deadband; else i.itemState.ooDeadband |= diff != 0.0; } else { i.itemState.ooDeadband = true; } // NaN change is deadband change i.itemState.ooDeadband |= isNaNChange; } else { // The dead band cannot be have broken in first sample i.itemState.ooDeadband = false; } // Interval boolean ooInterval = true; if ( hasInterval ) { // System.err.println(i.itemState.id + " " + firstTime + " " + currentTime); // First band is a special case if(isFirstValue) { // If this is the first sample store the current time as first time if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime; int sn1 = sampleNum(firstTime,i.itemState.interval); int sn2 = sampleNum(currentTime,i.itemState.interval); // Wait until next sample if this position is not already exactly at sample border // Always start minmax immediately if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return; } else { int sn1 = sampleNum(prevTime,i.itemState.interval); int sn2 = sampleNum(currentTime,i.itemState.interval); // System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime); ooInterval = sn2 > sn1; } // // double n = (currentTime+EPSILON) / i.itemState.interval; // double n2 = n-Math.floor(n); // // // double deltaTime = currentTime - firstTime; // ooInterval = (deltaTime - i.itemState.interval) > -EPSILON; } // Should new sample be created, or old sample extended / forwarded in time boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange; // Update item min-max, avg, median if ( isValidValue && i.isNumeric ) { if (i.current.hasAvg() && i.isNumeric) { if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0; if (isValidValue && i.isNumeric) { double timeDelta = currentTime - i.itemState.currentTime; i.itemState.sum += timeDelta * currentValueDouble; } } if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) { double deltaTime = currentTime - prevTime; i.itemState.median.add( deltaTime, prevValueDouble ); } } // Increase the counter that tracks the number of samples acquired into the band if (!restep) i.itemState.count++; // Remember values i.itemState.currentTime = currentTime; i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); i.itemState.isNaN = isNanValue; i.itemState.isValid = isValidValue; // Put discontinuation marker if ( (wasDisabled && i.current.supportsNullValue())) { // Extend marker if (i.current.isNullValue()) { if (i.current.hasEndTime()) { i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime ); } if (i.current.hasCount()) { i.current.setCount( i.current.getCount()+1 ); } // Add the band to the file stream i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() ); } // Add marker i.current.reset(); i.current.setValueNull(); i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime); i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime); i.stream.addNoflush( i.binding, i.current.getSample() ); } boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); if(i.stream.size() > 0 && !i.itemState.ooDeadband) { /// Extend existing band // Extend endTime value if (!i.hasEndTime) { i.current.setTime( time.getBinding(), time.getValue() ); } else { i.current.setEndTime( time.getBinding(), time.getValue() ); } if (isValidValue) { // Set last value if (i.current.hasLastValue()) { i.current.setLastValue( value.getBinding(), value.getValue() ); } // Add sum if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) { double duration = (currentTime - i.itemState.firstTime); if(duration < 1e-9) { i.current.setAvg( Bindings.DOUBLE, 0.0 ); } else { i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration); } } // Update min-max if (i.current.hasMin() && i.isNumeric && !isNanValue) { Binding minBinding = i.current.getMinBinding(); Object prevMinValue = i.current.getMin(); Object currentValueWithMinBinding = value.getValue( minBinding ); int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding ); if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding ); } if (i.current.hasMax() && i.isNumeric && !isNanValue) { Binding maxBinding = i.current.getMaxBinding(); Object prevMaxValue = i.current.getMax(); Object currentValueWithMaxBinding = value.getValue( maxBinding ); int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding ); if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding ); } // Update median if (i.current.hasMedian() && i.isNumeric && !isNanValue) { Double median = i.itemState.median.getMedian(); i.current.setMedian( Bindings.DOUBLE, median); } } else { i.current.setValueNull(); } // Add count if (i.current.hasCount()) { i.current.setCount( i.itemState.count ); } // Write entry to file stream // if (i.current.getTimeDouble()<0) // System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample())); i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() ); } if (condition) { // Start a new value band i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() ); i.itemState.firstTime = currentTime; i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() ); i.itemState.currentTime = currentTime; i.itemState.isNaN = isNanValue; i.itemState.isValid = isValidValue; i.itemState.sum = 0; i.itemState.count = 1; i.itemState.ooDeadband = false; if (i.itemState.median != null) { i.itemState.median.clear(); if (!Double.isNaN(prevValueDouble) ) { double deltaTime = currentTime - prevTime; i.itemState.median.add( deltaTime, prevValueDouble ); } } // New Sample i.current.reset(); // Start new band with current value & time i.current.setTime( time.getBinding(), time.getValue() ); // Is valid value if (i.current.hasEndTime()) { i.current.setEndTime( time.getBinding(), time.getValue() ); } if (isValidValue) { i.current.setValue( value.getBinding(), value.getValue() ); i.current.setMax( value.getBinding(), value.getValue() ); i.current.setMin( value.getBinding(), value.getValue() ); i.current.setLastValue( value.getBinding(), value.getValue() ); i.current.setAvg( value.getBinding(), value.getValue() ); i.current.setMedian( value.getBinding(), value.getValue() ); } else { i.current.setValueNull(); } // Sample Count=1 for new bands (time, value) "added" if (i.current.hasCount()) { i.current.setCount( 1 ); } // Add the band to the file stream i.stream.addNoflush( i.binding, i.current.getSample() ); } } @Override public void flush() /*throws HistoryException*/ { for (ActiveStream i : items) { try { i.stream.flush(); } catch (AccessorException e) { logger.log(Level.FINE, "File history flush failed.", e); // throw new HistoryException(e); } } } public void flush(Set ids) /*throws HistoryException*/ { for (ActiveStream i : items) { if (!ids.contains(i.itemState.id)) continue; try { i.stream.flush(); } catch (AccessorException e) { logger.log(Level.FINE, "File history flush failed.", e); // throw new HistoryException(e); } } } @Override public synchronized void close() { // Write items back to history List beans = new ArrayList( state.itemStates.size() ); for (CollectorState.Item itemState : state.itemStates.values()) { try { if (!history.exists(itemState.id)) continue; Bean bean = history.getItem(itemState.id); bean.readAvailableFields(itemState); beans.add( setItemState(bean, itemState) ); } catch (BindingException e) { logger.log(Level.FINE, "Error writing a meta-data to history", e); } catch (BindingConstructionException e) { logger.log(Level.FINE, "Error writing a meta-data to history", e); } catch (HistoryException e) { logger.log(Level.FINE, "Error writing a meta-data to history", e); } } try { history.modify(beans.toArray( new Bean[beans.size()] )); } catch (HistoryException e) { logger.log(Level.FINE, "Error writing a meta-data to history", e); } state.itemStates.clear(); // Close streams for (ActiveStream item : items) { item.close(); } items.clear(); } public class ActiveStream { /** Sample Stream */ public StreamAccessor stream; /** Sample binding */ public RecordBinding binding; /** Value binding (Generic) */ public Binding valueBinding; /** Set true if this item has numeric value type */ public boolean isNumeric; /** Set true if this item has end time */ public boolean hasEndTime; /** Item entry in subscription state */ public CollectorState.Item itemState; /** Interface to current last sample (in the stream) */ public ValueBand current; /** Adapter that converts value to double value, null if not numeric */ public Adapter valueToDouble, doubleToValue; void close() { try { if (stream != null) { stream.close(); } stream = null; } catch (AccessorException e) { logger.log(Level.FINE, "Error closing stream", e); } } } @Override public StreamAccessor openStream(String itemId, String mode) throws HistoryException { return history.openStream(itemId, mode); } @Override public HistoryManager getHistory() { return history; } }