/******************************************************************************* * Copyright (c) 2007, 2018 Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation * Semantum Oy - archive implementation, gitlab simantics/platform#227 *******************************************************************************/ package org.simantics.simulation.history; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.StreamAccessor; import org.simantics.databoard.accessor.binary.BinaryObject; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.binding.ArrayBinding; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.RecordBinding; import org.simantics.databoard.binding.error.BindingConstructionException; import org.simantics.databoard.binding.error.BindingException; import org.simantics.databoard.binding.impl.ObjectArrayBinding; import org.simantics.databoard.binding.mutable.MutableVariant; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.container.DataContainer; import org.simantics.databoard.container.DataContainers; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.util.Bean; import org.simantics.databoard.util.binary.BinaryFile; import org.simantics.databoard.util.binary.OutputStreamWriteable; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.WriteGraph; import org.simantics.db.common.request.UniqueRead; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ServiceNotFoundException; import org.simantics.db.request.Read; import org.simantics.fastlz.FastLZ; import org.simantics.history.HistoryException; import org.simantics.history.HistoryManager; import org.simantics.history.ItemManager; import org.simantics.history.impl.CollectorImpl; import org.simantics.history.impl.CollectorState; import org.simantics.history.impl.CollectorState.VariableState; import org.simantics.history.util.Stream; import org.simantics.history.util.ValueBand; import org.simantics.history.util.WeightedMedian; import org.simantics.layer0.Layer0; import org.simantics.simulation.Activator; import org.simantics.simulation.ontology.HistoryResource; import org.simantics.simulation.ontology.SimulationResource; import org.simantics.utils.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gnu.trove.map.TObjectLongMap; import gnu.trove.map.hash.TObjectLongHashMap; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; /** * @author Toni Kalajainen * @author Tuukka Lehtonen */ public class HistoryUtil { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class); private static final boolean DEBUG = false; private static final boolean PROFILE = true; public enum Compression { FLZ, LZ4, NONE, } private static final Compression USED_COMPRESSION = Compression.FLZ; private static final String APPLICATION_X_TAR = "application/x-tar"; private static final String APPLICATION_X_LZ4 = "application/x-lz4"; private static final String APPLICATION_X_FLZ = "application/x-flz"; private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream public static class ArchivedHistory { public long totalSampleSize = 0L; public TObjectLongMap itemSizes = new TObjectLongHashMap(); } /** * Create request that exports history to graph. * * If removeOldUnused is true, items that do not exist in source history * are deleted from graph. * * If item already exists in the graph, it is overwritten. * * @param history source history * @param g write graph * @param historyResource Instance of HIS.History to write data to * @param removeOldUnused * @param timeShift adjust time values by this value * @param from time * @param end time * @return WriteRequest */ public static WriteRequest export(final HistoryManager history, final Resource historyResource, final boolean removeOldUnused, final Double timeShift, final double from, final double end) { return new WriteRequest() { public void perform(WriteGraph graph) throws DatabaseException { HistoryResource HIS = HistoryResource.getInstance(graph); Layer0 L0 = Layer0.getInstance(graph); long totalSampleSize = 0L; long s = System.nanoTime(); try { Bean[] items = history.getItems(); if (PROFILE) profile(null, "exporting " + items.length + " history items to database"); ItemManager im = new ItemManager( items ); Map oldResourceMap = new HashMap(); for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) { if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); if ( bean == null ) continue; String id = (String) bean.getField("id"); if (!im.exists(id)) { // There is item in graph that does not exist in source history if ( removeOldUnused ) { graph.deny(oldItem); } } else { // Item already exists, to-be-overwritten oldResourceMap.put(id, oldItem); } } for (Bean newItem : im.values()) { //System.out.println("WRITING ITEM: " + newItem); String id = (String) newItem.getField("id"); Resource historyItemResource = oldResourceMap.get(id); Resource data = null; if ( historyItemResource==null ) { historyItemResource = graph.newResource(); graph.claim(historyResource, L0.ConsistsOf, historyItemResource); graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item); graph.claimLiteral(historyItemResource, L0.HasName, id); } else { data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series); } Variant v = new Variant(newItem.getBinding(), newItem); graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT); if (data == null) { data = graph.newResource(); graph.claim(data, L0.InstanceOf, L0.Variant); graph.claim(historyItemResource, HIS.History_Item_Series, data); } else { graph.denyValue(data); } // Write stream StreamAccessor sa = history.openStream(id, "r"); try { //System.out.println("WRITING stream of size=" + sa.size()); RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding); Integer constantSampleSize = serializer.getConstantSize(); ValueBand vb = new ValueBand(sampleBinding); Stream stream = new Stream(sa, sampleBinding); ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding); Object array = null; //System.out.println("WRITING stream: sample size=" + constantSampleSize); int count = 0; readArray: { // Start index int startIndex = stream.binarySearch(Bindings.DOUBLE, from); if ( startIndex < -stream.count() ) break readArray; if ( startIndex<0 ) startIndex = -2-startIndex; if ( startIndex == -1 ) startIndex = 0; // End index int endIndex = stream.binarySearch(Bindings.DOUBLE, end); if ( endIndex == -1 ) break readArray; if ( endIndex<0 ) endIndex = -1-endIndex; if ( endIndex == sa.size() ) endIndex = sa.size()-1; if ( endIndexnull to skip collector state saving * @param historyResource * Instance of HIS.History to write data to * @param timeShift * adjust time values by this value * @param from * time * @param end * time * @return WriteRequest */ public static WriteRequest exportArchive( final HistoryManager history, final Bean collectorState, final Resource historyResource, final Double timeShift, final double from, final double end) { return new WriteRequest() { public void perform(WriteGraph graph) throws DatabaseException { HistoryResource HIS = HistoryResource.getInstance(graph); Layer0 L0 = Layer0.getInstance(graph); long s = System.nanoTime(); if (PROFILE) profile(null, "archiving history items to database"); // Remove all possibly existing old items in the history. for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf)) if (graph.isInstanceOf(entity, HIS.History_Item)) graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity); // Create new literal for history archival //graph.deny(historyResource, HIS.History_archive); graph.denyValue(historyResource, HIS.History_archive); Resource archiveLiteral = graph.newResource(); graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray); graph.claim(historyResource, HIS.History_archive, archiveLiteral); OutputStream closeable = null; try { RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral); rab.position(0); rab.skipBytes(4); ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab); rab.position(0L); rab.writeInt((int)(rab.length() - 4L)); graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG); if (PROFILE) profile(s, "archived history items of size " + archive.totalSampleSize + " to database"); } catch (HistoryException e) { throw new DatabaseException( e ); } catch (IOException e) { throw new DatabaseException( e ); } finally { FileUtils.uncheckedClose(closeable); } } }; } public static ArchivedHistory exportCompressedArchive( HistoryManager history, Bean collectorState, Double timeShift, double from, double end, RandomAccessBinary rab) throws IOException, HistoryException { return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab); } public static ArchivedHistory exportCompressedArchive( HistoryManager history, Bean collectorState, Double timeShift, double from, double end, Compression compression, RandomAccessBinary rab) throws IOException, HistoryException { switch (compression) { case FLZ: DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null)); return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab))); case LZ4: DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null)); return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab))); case NONE: DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null)); return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab)); default: throw new UnsupportedOperationException("Unsupported compression: " + compression); } } private static ArchivedHistory exportArchive( HistoryManager history, Bean collectorState, Double timeShift, double from, double end, OutputStream out) throws IOException, HistoryException { try (OutputStream os = out) { ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end ); os.flush(); return archive; } } /** * Store the specified history manager's contents into a single archive * file. * * @param outputStream * the output stream where the history archive is written * @param history * source history * @param collectorState * complete state of the history collector at the time of saving * or null to not save any state * @param timeShift * adjust time values by this value * @param from * time * @param end * time * @return ArchivedHistory instance describing the created archive * @throws HistoryException * when there's a problem with reading the history data from the * history work area * @throws IOException * when there's a problem writing the resulting history archive * file */ private static ArchivedHistory exportArchive( OutputStream outputStream, HistoryManager history, Bean collectorState, Double timeShift, double from, double end) throws IOException, HistoryException { ArchivedHistory result = new ArchivedHistory(); Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); boolean hasTimeShift = timeShift != null && timeShift != 0.0; TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream); OutputStreamWriteable dataOutput = new OutputStreamWriteable(out); try { if (collectorState != null) { // Write complete collector state into the archive if (DEBUG) System.out.println("WRITING collector state: " + collectorState); byte[] serializedCollectorState = beanSerializer.serialize( collectorState ); putNextEntry(out, "state", serializedCollectorState.length); out.write(serializedCollectorState); out.closeArchiveEntry(); } ItemManager im = new ItemManager( history.getItems() ); for (Bean item : im.values()) { if (DEBUG) System.out.println("STORING ITEM: " + item); String id = (String) item.getField("id"); // Write data stream metadata byte[] metadata = beanSerializer.serialize( item ); putNextEntry(out, id + ".txt", metadata.length); out.write(metadata); out.closeArchiveEntry(); if (DEBUG) System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata)); // Write data stream as a separate entry StreamAccessor sa = history.openStream(id, "r"); try { if (DEBUG) System.out.println("STREAM SIZE=" + sa.size()); RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding); Integer constantSampleSize = sampleSerializer.getConstantSize(); if (constantSampleSize == null) { // Variable length samples - no support for now. System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment."); continue; } else { ValueBand vb = new ValueBand(sampleBinding); Stream stream = new Stream(sa, sampleBinding); if (DEBUG) System.out.println("WRITING stream: sample size=" + constantSampleSize); readArray: { // Start index int startIndex = stream.binarySearch(Bindings.DOUBLE, from); if ( startIndex < -stream.count() ) break readArray; if ( startIndex<0 ) startIndex = -2-startIndex; if ( startIndex == -1 ) startIndex = 0; // End index int endIndex = stream.binarySearch(Bindings.DOUBLE, end); if ( endIndex == -1 ) break readArray; if ( endIndex<0 ) endIndex = -1-endIndex; if ( endIndex == sa.size() ) endIndex = sa.size()-1; if ( endIndextrue if successful */ private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException { HistoryResource HIS = HistoryResource.getInstance(graph); Layer0 L0 = Layer0.getInstance(graph); try { for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) { if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); String id = (String) bean.getFieldUnchecked("id"); Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN); Binding arrayBinding = Bindings.getBinding( array.getClass() ); history.modify(bean); StreamAccessor sa = history.openStream(id, "rw"); try { sa.setValue(arrayBinding, array); } finally { try { sa.close(); } catch (AccessorException e) { } } } return true; } catch (AccessorException e) { throw new DatabaseException(e); } catch (BindingConstructionException e) { throw new DatabaseException(e); } catch (HistoryException e) { throw new DatabaseException(e); } } /** * @param history * @param path * @return * @throws HistoryException * @throws IOException */ public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException { HistoryImportResult result = new HistoryImportResult(); try (RandomAccessBinary rab = new BinaryFile(path.toFile())) { importHistoryArchive(history, rab, result); return result; } catch (IOException e) { LOGGER.error("Failed to import history from archive {}", path); throw e; } } /** * Import all history items from database archive into a history manager. * * @param graph * @param historyResource * @param archive * @param history * @return true if successful * @throws DatabaseException */ private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException { RandomAccessBinary rab = graph.getRandomAccessBinary(archive); // This is here because the data is encoded as an L0.ByteArray // in the database which means the first integer describes the // length of the byte array. We skip that length data here. rab.position(4); return importHistoryArchive(history, rab, result); } /** * Import all history items from database archive into a history manager. * * @param graph * @param historyResource * @param archive * @param history * @return true if successful * @throws DatabaseException */ private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result) throws IOException, HistoryException { DataContainer dc = DataContainers.readHeader(rab); if (DEBUG) System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position())); if (dc.version == ARCHIVE_VERSION_1) { if (APPLICATION_X_LZ4.equals(dc.format)) { return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result); } else if (APPLICATION_X_FLZ.equals(dc.format)) { return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result); } else if (APPLICATION_X_TAR.equals(dc.format)) { return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result); } throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version); } else { throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format); } } private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException { try (InputStream _in = in) { return extractHistoryArchiveTar(history, in, result); } } /** * Import all history items from graph into a history manager * * @param graph database access * @param history destination history * @param rab the archive to extract from * @return true if successful * @throws DatabaseException */ private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException { // tar is not closed on purpose because we do not want to close RandomAccessBinary rab. TarArchiveInputStream tar = new TarArchiveInputStream(in); Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); Bean lastItem = null; String lastItemId = null; while (true) { TarArchiveEntry entry = tar.getNextTarEntry(); if (entry == null) break; String name = entry.getName(); boolean state = name.equals("state"); boolean metadata = name.endsWith(".txt"); boolean data = name.endsWith(".data"); if (state) { if (result != null) { byte[] st = new byte[(int) entry.getSize()]; tar.read(st); result.collectorState = (Bean) beanSerializer.deserialize(st); if (DEBUG) System.out.println("READ collector state: " + result.collectorState); } else { tar.skip(entry.getSize()); } } else if (metadata) { byte[] md = new byte[(int) entry.getSize()]; tar.read(md); if (DEBUG) { System.out.println("READING Item metadata: " + name); System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md)); } try { Bean bean = (Bean) beanSerializer.deserialize(md); if (DEBUG) System.out.println("READ Item metadata: " + bean); history.modify(bean); lastItem = bean; lastItemId = (String) bean.getFieldUnchecked("id"); } catch (ClassFormatError e) { // This is here because LZ4BlockInput/OutputStream seemed to // be causing weird deserialization errors to occur for // single data items when trying to deserialize beans. The // reason being that the serialized data that was read only // contains the first 512 bytes of the data. After that all // data is zeros, which causes things like this to happen in // deserialization: //signature: rsstzzzzzzzze_b7b532e9 //component(o): id = String //component(1): variableId = String //component(2): format = Temp1 //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType]) //component(3): = Boolean //component(4): = Boolean //component(5): = Boolean //component(6): = Boolean //component(7): = Boolean //component(8): = Boolean //component(9): = Boolean //component(10): = Boolean // // For this reason we've switched the default compression to FastLZ // for now. This is reflected in the input format also. Activator.getDefault().getLog().log( new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));; } } else if (data && lastItem != null) { StreamAccessor sa = history.openStream(lastItemId, "rw"); try { if (sa instanceof BinaryObject) { BinaryObject bo = (BinaryObject) sa; RandomAccessBinary output = bo.getBinary(); output.position(0L); output.setLength(0L); long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize()); if (copiedBytes != entry.getSize()) { System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes"); } } else { System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa); } } finally { try { sa.close(); lastItem = null; lastItemId = null; } catch (AccessorException e) { } } } } return true; } /** * Get request that clears history * @return cleaning request */ public static WriteRequest clear(final Resource history) { return new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { HistoryResource HIS = HistoryResource.getInstance(graph); Layer0 L0 = Layer0.getInstance(graph); // Separate items format for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf)) { if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; Resource info = graph.getPossibleObject(oldItem, HIS.History_Item); if (info!=null) { graph.deny(oldItem, HIS.History_Item_Info); graph.deny(info); } Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series); if (data!=null) { graph.deny(oldItem, HIS.History_Item_Series); graph.deny(data); } graph.deny(history, L0.ConsistsOf, oldItem); graph.deny(oldItem); } // Archived format graph.denyValue(history, HIS.History_archive); } }; } private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException { TarArchiveEntry entry = new TarArchiveEntry(entryName); entry.setSize(size); out.putArchiveEntry(entry); } /** * Shall not close the underlying RandomAccessBinary. */ private static class RABOutputStream extends OutputStream { private final RandomAccessBinary output; public RABOutputStream(RandomAccessBinary output) { this.output = output; } @Override public void write(byte[] b, int off, int len) throws IOException { output.write(b, off, len); } @Override public void write(int b) throws IOException { output.write(b); } } /** * Shall not close the underlying RandomAccessBinary. */ private static class RABInputStream extends InputStream { private final RandomAccessBinary input; public RABInputStream(RandomAccessBinary input) { this.input = input; } @Override public int available() throws IOException { long avail = input.length() - input.position(); if ((avail & 0xffffffff00000000L) != 0) throw new IOException("Number of available bytes truncated in long->int conversion: " + avail); return (int) avail; } @Override public int read() throws IOException { try { return input.readUnsignedByte(); } catch (EOFException e) { return -1; } } @Override public int read(byte[] b, int off, int len) throws IOException { long available = input.length() - input.position(); if (available == 0) return -1; int l = (int) Math.min(available, (long) len); input.readFully(b, off, l); return l; } @Override public long skip(long n) throws IOException { long count = 0; while (count < n) { int l = (int) Math.min(n, (long) Integer.MAX_VALUE); count += input.skipBytes(l); } return count; } } private static long profile(Long t1, String string) { if (PROFILE) { long t2 = System.nanoTime(); long delta = t1 == null ? 0 : t2-t1; System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms")); return t2; } return 0L; } public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException { Double t = toBeforeTime; Binding timeBinding = null; Bean[] items = history.getItems(); //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items"); for (Bean item : items) { String id = (String) item.getField("id"); StreamAccessor sa = history.openStream(id, "rw"); try { Stream s = new Stream(sa); timeBinding = s.timeBinding; int currentSize = sa.size(); int index = s.binarySearch(timeBinding, toBeforeTime); int newSize = truncationSize(index); if (newSize < currentSize) { //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize); sa.setSize(newSize); if (state != null) { Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null; Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null; Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null; boolean isNan = isNaN(prevValue); VariableState vs = state.values.get(id); if (vs != null && vs.value != null && prevValue != null) { vs.value.setValue(vs.value.getBinding(), prevValue); vs.isValid = true; vs.isNan = isNan; } CollectorState.Item is = state.itemStates.get(id); if (is != null) { is.firstTime = toTime(prevTime); is.firstValue = toValue(s.valueBinding, prevValue); is.currentTime = is.firstTime; is.currentValue = toValue(s.valueBinding, prevValue); is.isNaN = isNan; is.isValid = prevValue != null; is.sum = Double.NaN; is.count = 0; is.ooDeadband = false; is.firstDisabledTime = Double.NaN; is.lastDisabledTime = Double.NaN; is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT ); } } } } finally { sa.close(); } } if (timeBinding != null && state != null) { state.time.setValue(timeBinding, t); state.dT = 1.0; } return state; } private static double toTime(Object time) { return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN; } private static MutableVariant toValue(Binding valueBinding, Object value) { return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked()); } private static boolean isNaN(Object value) { return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false; } private static int truncationSize(int binarySearchResult) { return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1); } }