X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.simulation%2Fsrc%2Forg%2Fsimantics%2Fsimulation%2Fhistory%2FHistoryUtil.java;h=39c084cad4dd5f89c711c480cfeaeef983d542da;hp=6e779b374f802e11b17c1c658fa021252b3d7350;hb=662ee6d0e68c386fdbc22300bdadb58d61de5b17;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java b/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java index 6e779b374..39c084cad 100644 --- a/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java +++ b/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java @@ -1,949 +1,1041 @@ -/******************************************************************************* - * Copyright (c) 2007, 2014 Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - * Semantum Oy - archive implementation - *******************************************************************************/ -package org.simantics.simulation.history; - -import gnu.trove.map.TObjectLongMap; -import gnu.trove.map.hash.TObjectLongHashMap; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import net.jpountz.lz4.LZ4BlockInputStream; -import net.jpountz.lz4.LZ4BlockOutputStream; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.Status; -import org.simantics.databoard.Bindings; -import org.simantics.databoard.accessor.StreamAccessor; -import org.simantics.databoard.accessor.binary.BinaryObject; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.binding.ArrayBinding; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.RecordBinding; -import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.error.BindingException; -import org.simantics.databoard.binding.impl.ObjectArrayBinding; -import org.simantics.databoard.binding.mutable.Variant; -import org.simantics.databoard.container.DataContainer; -import org.simantics.databoard.container.DataContainers; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.util.Bean; -import org.simantics.databoard.util.binary.OutputStreamWriteable; -import org.simantics.databoard.util.binary.RandomAccessBinary; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.WriteGraph; -import org.simantics.db.common.request.UniqueRead; -import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.ServiceNotFoundException; -import org.simantics.db.request.Read; -import org.simantics.fastlz.FastLZ; -import org.simantics.history.HistoryException; -import org.simantics.history.HistoryManager; -import org.simantics.history.ItemManager; -import org.simantics.history.util.Stream; -import org.simantics.history.util.ValueBand; -import org.simantics.layer0.Layer0; -import org.simantics.simulation.Activator; -import org.simantics.simulation.ontology.HistoryResource; -import org.simantics.simulation.ontology.SimulationResource; -import org.simantics.utils.FileUtils; - -/** - * @author Toni Kalajainen - * @author Tuukka Lehtonen - */ -public class HistoryUtil { - - private static final boolean DEBUG = false; - private static final boolean PROFILE = true; - - private enum Compression { - FLZ, - LZ4, - } - - private static final Compression USED_COMPRESSION = Compression.FLZ; - - private static final String APPLICATION_X_LZ4 = "application/x-lz4"; - private static final String APPLICATION_X_FLZ = "application/x-flz"; - private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream - - private static class ArchivedHistory { - public long totalSampleSize = 0L; - public TObjectLongMap itemSizes = new TObjectLongHashMap(); - } - - /** - * Create request that exports history to graph. - * - * If removeOldUnused is true, items that do not exist in source history - * are deleted from graph. - * - * If item already exists in the graph, it is overwritten. - * - * @param history source history - * @param g write graph - * @param historyResource Instance of HIS.History to write data to - * @param removeOldUnused - * @param timeShift adjust time values by this value - * @param from time - * @param end time - * @return WriteRequest - */ - public static WriteRequest export(final HistoryManager history, - final Resource historyResource, - final boolean removeOldUnused, - final Double timeShift, - final double from, final double end) - { - return new WriteRequest() { - public void perform(WriteGraph graph) throws DatabaseException { - HistoryResource HIS = HistoryResource.getInstance(graph); - Layer0 L0 = Layer0.getInstance(graph); - long totalSampleSize = 0L; - long s = System.nanoTime(); - - try { - Bean[] items = history.getItems(); - if (PROFILE) - profile(null, "exporting " + items.length + " history items to database"); - - ItemManager im = new ItemManager( items ); - Map oldResourceMap = new HashMap(); - for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) - { - if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; - Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); - if ( bean == null ) continue; - String id = (String) bean.getField("id"); - - if (!im.exists(id)) { - // There is item in graph that does not exist in source history - if ( removeOldUnused ) { - graph.deny(oldItem); - } - } else { - // Item already exists, to-be-overwritten - oldResourceMap.put(id, oldItem); - } - } - - for (Bean newItem : im.values()) - { - //System.out.println("WRITING ITEM: " + newItem); - String id = (String) newItem.getField("id"); - Resource historyItemResource = oldResourceMap.get(id); - Resource data = null; - - if ( historyItemResource==null ) { - historyItemResource = graph.newResource(); - graph.claim(historyResource, L0.ConsistsOf, historyItemResource); - graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item); - graph.claimLiteral(historyItemResource, L0.HasName, id); - } else { - data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series); - } - - Variant v = new Variant(newItem.getBinding(), newItem); - graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT); - - if (data == null) { - data = graph.newResource(); - graph.claim(data, L0.InstanceOf, L0.Variant); - graph.claim(historyItemResource, HIS.History_Item_Series, data); - } else { - graph.denyValue(data); - } - - - // Write stream - StreamAccessor sa = history.openStream(id, "r"); - try { - //System.out.println("WRITING stream of size=" + sa.size()); - RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); - Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding); - Integer constantSampleSize = serializer.getConstantSize(); - ValueBand vb = new ValueBand(sampleBinding); - Stream stream = new Stream(sa, sampleBinding); - ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding); - Object array = null; - //System.out.println("WRITING stream: sample size=" + constantSampleSize); - - int count = 0; - readArray: { - // Start index - int startIndex = stream.binarySearch(Bindings.DOUBLE, from); - if ( startIndex < -stream.count() ) break readArray; - if ( startIndex<0 ) startIndex = -2-startIndex; - if ( startIndex == -1 ) startIndex = 0; - - // End index - int endIndex = stream.binarySearch(Bindings.DOUBLE, end); - if ( endIndex == -1 ) break readArray; - if ( endIndex<0 ) endIndex = -1-endIndex; - if ( endIndex == sa.size() ) endIndex = sa.size()-1; - if ( endIndexnull to skip collector state saving - * @param historyResource - * Instance of HIS.History to write data to - * @param timeShift - * adjust time values by this value - * @param from - * time - * @param end - * time - * @return WriteRequest - */ - public static WriteRequest exportArchive( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end) - { - return new WriteRequest() { - public void perform(WriteGraph graph) throws DatabaseException { - HistoryResource HIS = HistoryResource.getInstance(graph); - Layer0 L0 = Layer0.getInstance(graph); - - long s = System.nanoTime(); - if (PROFILE) - profile(null, "archiving history items to database"); - - // Remove all possibly existing old items in the history. - for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf)) - if (graph.isInstanceOf(entity, HIS.History_Item)) - graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity); - - // Create new literal for history archival - //graph.deny(historyResource, HIS.History_archive); - graph.denyValue(historyResource, HIS.History_archive); - Resource archiveLiteral = graph.newResource(); - graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray); - graph.claim(historyResource, HIS.History_archive, archiveLiteral); - - OutputStream closeable = null; - try { - RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral); - rab.position(0); - rab.skipBytes(4); - - ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab); - - rab.position(0L); - rab.writeInt((int)(rab.length() - 4L)); - - graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG); - - if (PROFILE) - profile(s, "archived history items of size " + archive.totalSampleSize + " to database"); - - } catch (HistoryException e) { - throw new DatabaseException( e ); - } catch (IOException e) { - throw new DatabaseException( e ); - } finally { - FileUtils.uncheckedClose(closeable); - } - } - }; - } - - private static ArchivedHistory exportCompressedArchive( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, - RandomAccessBinary rab) - throws IOException, HistoryException - { - switch (USED_COMPRESSION) { - case FLZ: - return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab); - case LZ4: - return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab); - default: - throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION); - } - } - - private static ArchivedHistory exportArchiveLZ4( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, - RandomAccessBinary rab) - throws IOException, HistoryException - { - OutputStream closeable = null; - try { - DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null)); - closeable = new RABOutputStream( rab ); - closeable = new LZ4BlockOutputStream( closeable ); - - ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end ); - - closeable.flush(); - closeable.close(); - closeable = null; - - return archive; - } finally { - FileUtils.uncheckedClose(closeable); - } - } - - @SuppressWarnings("resource") - private static ArchivedHistory exportArchiveFLZ( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, - RandomAccessBinary rab) - throws IOException, HistoryException - { - OutputStream closeable = null; - try { - DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null)); - closeable = new RABOutputStream( rab ); - closeable = FastLZ.write( closeable ); - - ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end ); - - closeable.flush(); - closeable.close(); - closeable = null; - - return archive; - } finally { - FileUtils.uncheckedClose(closeable); - } - } - - /** - * Store the specified history manager's contents into a single archive - * file. - * - * @param outputStream - * the output stream where the history archive is written - * @param history - * source history - * @param collectorState - * complete state of the history collector at the time of saving - * or null to not save any state - * @param timeShift - * adjust time values by this value - * @param from - * time - * @param end - * time - * @return ArchivedHistory instance describing the created archive - * @throws HistoryException - * when there's a problem with reading the history data from the - * history work area - * @throws IOException - * when there's a problem writing the resulting history archive - * file - */ - private static ArchivedHistory exportArchive( - OutputStream outputStream, - HistoryManager history, - Bean collectorState, - Double timeShift, - double from, - double end) - throws IOException, HistoryException - { - ArchivedHistory result = new ArchivedHistory(); - - Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); - boolean hasTimeShift = timeShift != null && timeShift != 0.0; - - TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream); - OutputStreamWriteable dataOutput = new OutputStreamWriteable(out); - - try { - if (collectorState != null) { - // Write complete collector state into the archive - if (DEBUG) - System.out.println("WRITING collector state: " + collectorState); - byte[] serializedCollectorState = beanSerializer.serialize( collectorState ); - putNextEntry(out, "state", serializedCollectorState.length); - out.write(serializedCollectorState); - out.closeArchiveEntry(); - } - - ItemManager im = new ItemManager( history.getItems() ); - - for (Bean item : im.values()) { - if (DEBUG) - System.out.println("STORING ITEM: " + item); - String id = (String) item.getField("id"); - - // Write data stream metadata - byte[] metadata = beanSerializer.serialize( item ); - putNextEntry(out, id + ".txt", metadata.length); - out.write(metadata); - out.closeArchiveEntry(); - if (DEBUG) - System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata)); - - - // Write data stream as a separate entry - StreamAccessor sa = history.openStream(id, "r"); - try { - if (DEBUG) - System.out.println("STREAM SIZE=" + sa.size()); - RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); - Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding); - Integer constantSampleSize = sampleSerializer.getConstantSize(); - - if (constantSampleSize == null) { - // Variable length samples - no support for now. - System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment."); - continue; - } else { - ValueBand vb = new ValueBand(sampleBinding); - Stream stream = new Stream(sa, sampleBinding); - if (DEBUG) - System.out.println("WRITING stream: sample size=" + constantSampleSize); - - readArray: { - // Start index - int startIndex = stream.binarySearch(Bindings.DOUBLE, from); - if ( startIndex < -stream.count() ) break readArray; - if ( startIndex<0 ) startIndex = -2-startIndex; - if ( startIndex == -1 ) startIndex = 0; - - // End index - int endIndex = stream.binarySearch(Bindings.DOUBLE, end); - if ( endIndex == -1 ) break readArray; - if ( endIndex<0 ) endIndex = -1-endIndex; - if ( endIndex == sa.size() ) endIndex = sa.size()-1; - if ( endIndextrue if successful - */ - private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException - { - HistoryResource HIS = HistoryResource.getInstance(graph); - Layer0 L0 = Layer0.getInstance(graph); - - try { - for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) { - if (!graph.isInstanceOf(oldItem, HIS.History_Item)) - continue; - - Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); - String id = (String) bean.getFieldUnchecked("id"); - Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN); - Binding arrayBinding = Bindings.getBinding( array.getClass() ); - history.modify(bean); - StreamAccessor sa = history.openStream(id, "rw"); - try { - sa.setValue(arrayBinding, array); - } finally { - try { - sa.close(); - } catch (AccessorException e) { - } - } - } - - return true; - } catch (AccessorException e) { - throw new DatabaseException(e); - } catch (BindingConstructionException e) { - throw new DatabaseException(e); - } catch (HistoryException e) { - throw new DatabaseException(e); - } - } - - /** - * Import all history items from database archive into a history manager. - * - * @param graph - * @param historyResource - * @param archive - * @param history - * @return true if successful - * @throws DatabaseException - */ - @SuppressWarnings("resource") - private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result) - throws DatabaseException, IOException, HistoryException - { - RandomAccessBinary rab = graph.getRandomAccessBinary(archive); - rab.position(4); - DataContainer dc = DataContainers.readHeader(rab); - - if (DEBUG) - System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position())); - - if (APPLICATION_X_LZ4.equals(dc.format)) { - if (dc.version == ARCHIVE_VERSION_1) { - InputStream in = new RABInputStream(rab); - //in = new BufferedInputStream(in); - in = new LZ4BlockInputStream(in); - return extractHistoryArchiveTar(graph, history, in, result); - } - } else if (APPLICATION_X_FLZ.equals(dc.format)) { - if (dc.version == ARCHIVE_VERSION_1) { - InputStream in = null; - try { - in = new RABInputStream(rab); - in = FastLZ.read(in); - return extractHistoryArchiveTar(graph, history, in, result); - } finally { - FileUtils.uncheckedClose(in); - } - } - } - throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version); - } - - /** - * Import all history items from graph into a history manager - * - * @param graph database access - * @param history destination history - * @param rab the archive to extract from - * @return true if successful - * @throws DatabaseException - */ - private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result) - throws DatabaseException, IOException, HistoryException - { - // tar is not closed on purpose because we do not want to close RandomAccessBinary rab. - TarArchiveInputStream tar = new TarArchiveInputStream(in); - - Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); - - Bean lastItem = null; - String lastItemId = null; - while (true) { - TarArchiveEntry entry = tar.getNextTarEntry(); - if (entry == null) - break; - - String name = entry.getName(); - boolean state = name.equals("state"); - boolean metadata = name.endsWith(".txt"); - boolean data = name.endsWith(".data"); - - if (state) { - if (result != null) { - byte[] st = new byte[(int) entry.getSize()]; - tar.read(st); - result.collectorState = (Bean) beanSerializer.deserialize(st); - if (DEBUG) - System.out.println("READ collector state: " + result.collectorState); - } else { - tar.skip(entry.getSize()); - } - } else if (metadata) { - byte[] md = new byte[(int) entry.getSize()]; - tar.read(md); - if (DEBUG) { - System.out.println("READING Item metadata: " + name); - System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md)); - } - - try { - Bean bean = (Bean) beanSerializer.deserialize(md); - if (DEBUG) - System.out.println("READ Item metadata: " + bean); - history.modify(bean); - lastItem = bean; - lastItemId = (String) bean.getFieldUnchecked("id"); - } catch (ClassFormatError e) { - // This is here because LZ4BlockInput/OutputStream seemed to - // be causing weird deserialization errors to occur for - // single data items when trying to deserialize beans. The - // reason being that the serialized data that was read only - // contains the first 512 bytes of the data. After that all - // data is zeros, which causes things like this to happen in - // deserialization: - //signature: rsstzzzzzzzze_b7b532e9 - //component(o): id = String - //component(1): variableId = String - //component(2): format = Temp1 - //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType]) - //component(3): = Boolean - //component(4): = Boolean - //component(5): = Boolean - //component(devil): = Boolean - //component(7): = Boolean - //component(music): = Boolean - //component(9): = Boolean - //component(10): = Boolean - // - // For this reason we've switched the default compression to FastLZ - // for now. This is reflected in the input format also. - - Activator.getDefault().getLog().log( - new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));; - } - } else if (data && lastItem != null) { - StreamAccessor sa = history.openStream(lastItemId, "rw"); - try { - if (sa instanceof BinaryObject) { - BinaryObject bo = (BinaryObject) sa; - RandomAccessBinary output = bo.getBinary(); - output.position(0L); - output.setLength(0L); - long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize()); - if (copiedBytes != entry.getSize()) { - System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes"); - } - } else { - System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa); - } - } finally { - try { - sa.close(); - lastItem = null; - lastItemId = null; - } catch (AccessorException e) { - } - } - } - } - - return true; - } - - /** - * Get request that clears history - * @return cleaning request - */ - public static WriteRequest clear(final Resource history) { - return new WriteRequest() { - @Override - public void perform(WriteGraph graph) throws DatabaseException { - HistoryResource HIS = HistoryResource.getInstance(graph); - Layer0 L0 = Layer0.getInstance(graph); - - // Separate items format - for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf)) - { - if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; - Resource info = graph.getPossibleObject(oldItem, HIS.History_Item); - if (info!=null) { - graph.deny(oldItem, HIS.History_Item_Info); - graph.deny(info); - } - Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series); - if (data!=null) { - graph.deny(oldItem, HIS.History_Item_Series); - graph.deny(data); - } - graph.deny(history, L0.ConsistsOf, oldItem); - graph.deny(oldItem); - } - - // Archived format - graph.denyValue(history, HIS.History_archive); - } - }; - } - - - private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException { - TarArchiveEntry entry = new TarArchiveEntry(entryName); - entry.setSize(size); - out.putArchiveEntry(entry); - } - - /** - * Shall not close the underlying RandomAccessBinary. - */ - private static class RABOutputStream extends OutputStream { - - private final RandomAccessBinary output; - - public RABOutputStream(RandomAccessBinary output) { - this.output = output; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - output.write(b, off, len); - } - - @Override - public void write(int b) throws IOException { - output.write(b); - } - - } - - /** - * Shall not close the underlying RandomAccessBinary. - */ - private static class RABInputStream extends InputStream { - - private final RandomAccessBinary input; - - public RABInputStream(RandomAccessBinary input) { - this.input = input; - } - - @Override - public int available() throws IOException { - long avail = input.length() - input.position(); - if ((avail & 0xffffffff00000000L) != 0) - throw new IOException("Number of available bytes truncated in long->int conversion: " + avail); - return (int) avail; - } - - @Override - public int read() throws IOException { - try { - return input.readUnsignedByte(); - } catch (EOFException e) { - return -1; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - long available = input.length() - input.position(); - if (available == 0) - return -1; - int l = (int) Math.min(available, (long) len); - input.readFully(b, off, l); - return l; - } - - @Override - public long skip(long n) throws IOException { - long count = 0; - while (count < n) { - int l = (int) Math.min(n, (long) Integer.MAX_VALUE); - count += input.skipBytes(l); - } - return count; - } - - } - - private static long profile(Long t1, String string) { - if (PROFILE) { - long t2 = System.nanoTime(); - long delta = t1 == null ? 0 : t2-t1; - System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms")); - return t2; - } - return 0L; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2018 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + * Semantum Oy - archive implementation, gitlab simantics/platform#227 + *******************************************************************************/ +package org.simantics.simulation.history; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.accessor.StreamAccessor; +import org.simantics.databoard.accessor.binary.BinaryObject; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.binding.ArrayBinding; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.RecordBinding; +import org.simantics.databoard.binding.error.BindingConstructionException; +import org.simantics.databoard.binding.error.BindingException; +import org.simantics.databoard.binding.impl.ObjectArrayBinding; +import org.simantics.databoard.binding.mutable.MutableVariant; +import org.simantics.databoard.binding.mutable.Variant; +import org.simantics.databoard.container.DataContainer; +import org.simantics.databoard.container.DataContainers; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.util.Bean; +import org.simantics.databoard.util.binary.BinaryFile; +import org.simantics.databoard.util.binary.OutputStreamWriteable; +import org.simantics.databoard.util.binary.RandomAccessBinary; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.WriteGraph; +import org.simantics.db.common.request.UniqueRead; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.request.Read; +import org.simantics.fastlz.FastLZ; +import org.simantics.history.HistoryException; +import org.simantics.history.HistoryManager; +import org.simantics.history.ItemManager; +import org.simantics.history.impl.CollectorImpl; +import org.simantics.history.impl.CollectorState; +import org.simantics.history.impl.CollectorState.VariableState; +import org.simantics.history.util.Stream; +import org.simantics.history.util.ValueBand; +import org.simantics.history.util.WeightedMedian; +import org.simantics.layer0.Layer0; +import org.simantics.simulation.Activator; +import org.simantics.simulation.ontology.HistoryResource; +import org.simantics.simulation.ontology.SimulationResource; +import org.simantics.utils.FileUtils; + +import gnu.trove.map.TObjectLongMap; +import gnu.trove.map.hash.TObjectLongHashMap; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; + +/** + * @author Toni Kalajainen + * @author Tuukka Lehtonen + */ +public class HistoryUtil { + + private static final boolean DEBUG = false; + private static final boolean PROFILE = true; + + public enum Compression { + FLZ, + LZ4, + NONE, + } + + private static final Compression USED_COMPRESSION = Compression.FLZ; + + private static final String APPLICATION_X_TAR = "application/x-tar"; + private static final String APPLICATION_X_LZ4 = "application/x-lz4"; + private static final String APPLICATION_X_FLZ = "application/x-flz"; + private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream + + public static class ArchivedHistory { + public long totalSampleSize = 0L; + public TObjectLongMap itemSizes = new TObjectLongHashMap(); + } + + /** + * Create request that exports history to graph. + * + * If removeOldUnused is true, items that do not exist in source history + * are deleted from graph. + * + * If item already exists in the graph, it is overwritten. + * + * @param history source history + * @param g write graph + * @param historyResource Instance of HIS.History to write data to + * @param removeOldUnused + * @param timeShift adjust time values by this value + * @param from time + * @param end time + * @return WriteRequest + */ + public static WriteRequest export(final HistoryManager history, + final Resource historyResource, + final boolean removeOldUnused, + final Double timeShift, + final double from, final double end) + { + return new WriteRequest() { + public void perform(WriteGraph graph) throws DatabaseException { + HistoryResource HIS = HistoryResource.getInstance(graph); + Layer0 L0 = Layer0.getInstance(graph); + long totalSampleSize = 0L; + long s = System.nanoTime(); + + try { + Bean[] items = history.getItems(); + if (PROFILE) + profile(null, "exporting " + items.length + " history items to database"); + + ItemManager im = new ItemManager( items ); + Map oldResourceMap = new HashMap(); + for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) + { + if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; + Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); + if ( bean == null ) continue; + String id = (String) bean.getField("id"); + + if (!im.exists(id)) { + // There is item in graph that does not exist in source history + if ( removeOldUnused ) { + graph.deny(oldItem); + } + } else { + // Item already exists, to-be-overwritten + oldResourceMap.put(id, oldItem); + } + } + + for (Bean newItem : im.values()) + { + //System.out.println("WRITING ITEM: " + newItem); + String id = (String) newItem.getField("id"); + Resource historyItemResource = oldResourceMap.get(id); + Resource data = null; + + if ( historyItemResource==null ) { + historyItemResource = graph.newResource(); + graph.claim(historyResource, L0.ConsistsOf, historyItemResource); + graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item); + graph.claimLiteral(historyItemResource, L0.HasName, id); + } else { + data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series); + } + + Variant v = new Variant(newItem.getBinding(), newItem); + graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT); + + if (data == null) { + data = graph.newResource(); + graph.claim(data, L0.InstanceOf, L0.Variant); + graph.claim(historyItemResource, HIS.History_Item_Series, data); + } else { + graph.denyValue(data); + } + + + // Write stream + StreamAccessor sa = history.openStream(id, "r"); + try { + //System.out.println("WRITING stream of size=" + sa.size()); + RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); + Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding); + Integer constantSampleSize = serializer.getConstantSize(); + ValueBand vb = new ValueBand(sampleBinding); + Stream stream = new Stream(sa, sampleBinding); + ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding); + Object array = null; + //System.out.println("WRITING stream: sample size=" + constantSampleSize); + + int count = 0; + readArray: { + // Start index + int startIndex = stream.binarySearch(Bindings.DOUBLE, from); + if ( startIndex < -stream.count() ) break readArray; + if ( startIndex<0 ) startIndex = -2-startIndex; + if ( startIndex == -1 ) startIndex = 0; + + // End index + int endIndex = stream.binarySearch(Bindings.DOUBLE, end); + if ( endIndex == -1 ) break readArray; + if ( endIndex<0 ) endIndex = -1-endIndex; + if ( endIndex == sa.size() ) endIndex = sa.size()-1; + if ( endIndexnull to skip collector state saving + * @param historyResource + * Instance of HIS.History to write data to + * @param timeShift + * adjust time values by this value + * @param from + * time + * @param end + * time + * @return WriteRequest + */ + public static WriteRequest exportArchive( + final HistoryManager history, + final Bean collectorState, + final Resource historyResource, + final Double timeShift, + final double from, + final double end) + { + return new WriteRequest() { + public void perform(WriteGraph graph) throws DatabaseException { + HistoryResource HIS = HistoryResource.getInstance(graph); + Layer0 L0 = Layer0.getInstance(graph); + + long s = System.nanoTime(); + if (PROFILE) + profile(null, "archiving history items to database"); + + // Remove all possibly existing old items in the history. + for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf)) + if (graph.isInstanceOf(entity, HIS.History_Item)) + graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity); + + // Create new literal for history archival + //graph.deny(historyResource, HIS.History_archive); + graph.denyValue(historyResource, HIS.History_archive); + Resource archiveLiteral = graph.newResource(); + graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray); + graph.claim(historyResource, HIS.History_archive, archiveLiteral); + + OutputStream closeable = null; + try { + RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral); + rab.position(0); + rab.skipBytes(4); + + ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab); + + rab.position(0L); + rab.writeInt((int)(rab.length() - 4L)); + + graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG); + + if (PROFILE) + profile(s, "archived history items of size " + archive.totalSampleSize + " to database"); + + } catch (HistoryException e) { + throw new DatabaseException( e ); + } catch (IOException e) { + throw new DatabaseException( e ); + } finally { + FileUtils.uncheckedClose(closeable); + } + } + }; + } + + public static ArchivedHistory exportCompressedArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, + RandomAccessBinary rab) + throws IOException, HistoryException + { + return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab); + } + + public static ArchivedHistory exportCompressedArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, + Compression compression, + RandomAccessBinary rab) + throws IOException, HistoryException + { + switch (compression) { + case FLZ: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab))); + case LZ4: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab))); + case NONE: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab)); + default: + throw new UnsupportedOperationException("Unsupported compression: " + compression); + } + } + + private static ArchivedHistory exportArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, + OutputStream out) + throws IOException, HistoryException + { + try (OutputStream os = out) { + ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end ); + os.flush(); + return archive; + } + } + + /** + * Store the specified history manager's contents into a single archive + * file. + * + * @param outputStream + * the output stream where the history archive is written + * @param history + * source history + * @param collectorState + * complete state of the history collector at the time of saving + * or null to not save any state + * @param timeShift + * adjust time values by this value + * @param from + * time + * @param end + * time + * @return ArchivedHistory instance describing the created archive + * @throws HistoryException + * when there's a problem with reading the history data from the + * history work area + * @throws IOException + * when there's a problem writing the resulting history archive + * file + */ + private static ArchivedHistory exportArchive( + OutputStream outputStream, + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end) + throws IOException, HistoryException + { + ArchivedHistory result = new ArchivedHistory(); + + Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); + boolean hasTimeShift = timeShift != null && timeShift != 0.0; + + TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream); + OutputStreamWriteable dataOutput = new OutputStreamWriteable(out); + + try { + if (collectorState != null) { + // Write complete collector state into the archive + if (DEBUG) + System.out.println("WRITING collector state: " + collectorState); + byte[] serializedCollectorState = beanSerializer.serialize( collectorState ); + putNextEntry(out, "state", serializedCollectorState.length); + out.write(serializedCollectorState); + out.closeArchiveEntry(); + } + + ItemManager im = new ItemManager( history.getItems() ); + + for (Bean item : im.values()) { + if (DEBUG) + System.out.println("STORING ITEM: " + item); + String id = (String) item.getField("id"); + + // Write data stream metadata + byte[] metadata = beanSerializer.serialize( item ); + putNextEntry(out, id + ".txt", metadata.length); + out.write(metadata); + out.closeArchiveEntry(); + if (DEBUG) + System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata)); + + + // Write data stream as a separate entry + StreamAccessor sa = history.openStream(id, "r"); + try { + if (DEBUG) + System.out.println("STREAM SIZE=" + sa.size()); + RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType ); + Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding); + Integer constantSampleSize = sampleSerializer.getConstantSize(); + + if (constantSampleSize == null) { + // Variable length samples - no support for now. + System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment."); + continue; + } else { + ValueBand vb = new ValueBand(sampleBinding); + Stream stream = new Stream(sa, sampleBinding); + if (DEBUG) + System.out.println("WRITING stream: sample size=" + constantSampleSize); + + readArray: { + // Start index + int startIndex = stream.binarySearch(Bindings.DOUBLE, from); + if ( startIndex < -stream.count() ) break readArray; + if ( startIndex<0 ) startIndex = -2-startIndex; + if ( startIndex == -1 ) startIndex = 0; + + // End index + int endIndex = stream.binarySearch(Bindings.DOUBLE, end); + if ( endIndex == -1 ) break readArray; + if ( endIndex<0 ) endIndex = -1-endIndex; + if ( endIndex == sa.size() ) endIndex = sa.size()-1; + if ( endIndextrue if successful + */ + private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException + { + HistoryResource HIS = HistoryResource.getInstance(graph); + Layer0 L0 = Layer0.getInstance(graph); + + try { + for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) { + if (!graph.isInstanceOf(oldItem, HIS.History_Item)) + continue; + + Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN); + String id = (String) bean.getFieldUnchecked("id"); + Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN); + Binding arrayBinding = Bindings.getBinding( array.getClass() ); + history.modify(bean); + StreamAccessor sa = history.openStream(id, "rw"); + try { + sa.setValue(arrayBinding, array); + } finally { + try { + sa.close(); + } catch (AccessorException e) { + } + } + } + + return true; + } catch (AccessorException e) { + throw new DatabaseException(e); + } catch (BindingConstructionException e) { + throw new DatabaseException(e); + } catch (HistoryException e) { + throw new DatabaseException(e); + } + } + + /** + * @param history + * @param path + * @return + * @throws HistoryException + * @throws IOException + */ + public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException { + HistoryImportResult result = new HistoryImportResult(); + try (RandomAccessBinary rab = new BinaryFile(path.toFile())) { + importHistoryArchive(history, rab, result); + return result; + } + } + + /** + * Import all history items from database archive into a history manager. + * + * @param graph + * @param historyResource + * @param archive + * @param history + * @return true if successful + * @throws DatabaseException + */ + private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException { + RandomAccessBinary rab = graph.getRandomAccessBinary(archive); + // This is here because the data is encoded as an L0.ByteArray + // in the database which means the first integer describes the + // length of the byte array. We skip that length data here. + rab.position(4); + return importHistoryArchive(history, rab, result); + } + + /** + * Import all history items from database archive into a history manager. + * + * @param graph + * @param historyResource + * @param archive + * @param history + * @return true if successful + * @throws DatabaseException + */ + private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result) + throws IOException, HistoryException + { + DataContainer dc = DataContainers.readHeader(rab); + + if (DEBUG) + System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position())); + + if (dc.version == ARCHIVE_VERSION_1) { + if (APPLICATION_X_LZ4.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result); + } else if (APPLICATION_X_FLZ.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result); + } else if (APPLICATION_X_TAR.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result); + } + throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version); + } else { + throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format); + } + } + + private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException { + try (InputStream _in = in) { + return extractHistoryArchiveTar(history, in, result); + } + } + + /** + * Import all history items from graph into a history manager + * + * @param graph database access + * @param history destination history + * @param rab the archive to extract from + * @return true if successful + * @throws DatabaseException + */ + private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result) + throws IOException, HistoryException + { + // tar is not closed on purpose because we do not want to close RandomAccessBinary rab. + TarArchiveInputStream tar = new TarArchiveInputStream(in); + + Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN); + + Bean lastItem = null; + String lastItemId = null; + while (true) { + TarArchiveEntry entry = tar.getNextTarEntry(); + if (entry == null) + break; + + String name = entry.getName(); + boolean state = name.equals("state"); + boolean metadata = name.endsWith(".txt"); + boolean data = name.endsWith(".data"); + + if (state) { + if (result != null) { + byte[] st = new byte[(int) entry.getSize()]; + tar.read(st); + result.collectorState = (Bean) beanSerializer.deserialize(st); + if (DEBUG) + System.out.println("READ collector state: " + result.collectorState); + } else { + tar.skip(entry.getSize()); + } + } else if (metadata) { + byte[] md = new byte[(int) entry.getSize()]; + tar.read(md); + if (DEBUG) { + System.out.println("READING Item metadata: " + name); + System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md)); + } + + try { + Bean bean = (Bean) beanSerializer.deserialize(md); + if (DEBUG) + System.out.println("READ Item metadata: " + bean); + history.modify(bean); + lastItem = bean; + lastItemId = (String) bean.getFieldUnchecked("id"); + } catch (ClassFormatError e) { + // This is here because LZ4BlockInput/OutputStream seemed to + // be causing weird deserialization errors to occur for + // single data items when trying to deserialize beans. The + // reason being that the serialized data that was read only + // contains the first 512 bytes of the data. After that all + // data is zeros, which causes things like this to happen in + // deserialization: + //signature: rsstzzzzzzzze_b7b532e9 + //component(o): id = String + //component(1): variableId = String + //component(2): format = Temp1 + //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType]) + //component(3): = Boolean + //component(4): = Boolean + //component(5): = Boolean + //component(6): = Boolean + //component(7): = Boolean + //component(8): = Boolean + //component(9): = Boolean + //component(10): = Boolean + // + // For this reason we've switched the default compression to FastLZ + // for now. This is reflected in the input format also. + + Activator.getDefault().getLog().log( + new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));; + } + } else if (data && lastItem != null) { + StreamAccessor sa = history.openStream(lastItemId, "rw"); + try { + if (sa instanceof BinaryObject) { + BinaryObject bo = (BinaryObject) sa; + RandomAccessBinary output = bo.getBinary(); + output.position(0L); + output.setLength(0L); + long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize()); + if (copiedBytes != entry.getSize()) { + System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes"); + } + } else { + System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa); + } + } finally { + try { + sa.close(); + lastItem = null; + lastItemId = null; + } catch (AccessorException e) { + } + } + } + } + + return true; + } + + /** + * Get request that clears history + * @return cleaning request + */ + public static WriteRequest clear(final Resource history) { + return new WriteRequest() { + @Override + public void perform(WriteGraph graph) throws DatabaseException { + HistoryResource HIS = HistoryResource.getInstance(graph); + Layer0 L0 = Layer0.getInstance(graph); + + // Separate items format + for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf)) + { + if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue; + Resource info = graph.getPossibleObject(oldItem, HIS.History_Item); + if (info!=null) { + graph.deny(oldItem, HIS.History_Item_Info); + graph.deny(info); + } + Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series); + if (data!=null) { + graph.deny(oldItem, HIS.History_Item_Series); + graph.deny(data); + } + graph.deny(history, L0.ConsistsOf, oldItem); + graph.deny(oldItem); + } + + // Archived format + graph.denyValue(history, HIS.History_archive); + } + }; + } + + + private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException { + TarArchiveEntry entry = new TarArchiveEntry(entryName); + entry.setSize(size); + out.putArchiveEntry(entry); + } + + /** + * Shall not close the underlying RandomAccessBinary. + */ + private static class RABOutputStream extends OutputStream { + + private final RandomAccessBinary output; + + public RABOutputStream(RandomAccessBinary output) { + this.output = output; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + output.write(b, off, len); + } + + @Override + public void write(int b) throws IOException { + output.write(b); + } + + } + + /** + * Shall not close the underlying RandomAccessBinary. + */ + private static class RABInputStream extends InputStream { + + private final RandomAccessBinary input; + + public RABInputStream(RandomAccessBinary input) { + this.input = input; + } + + @Override + public int available() throws IOException { + long avail = input.length() - input.position(); + if ((avail & 0xffffffff00000000L) != 0) + throw new IOException("Number of available bytes truncated in long->int conversion: " + avail); + return (int) avail; + } + + @Override + public int read() throws IOException { + try { + return input.readUnsignedByte(); + } catch (EOFException e) { + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + long available = input.length() - input.position(); + if (available == 0) + return -1; + int l = (int) Math.min(available, (long) len); + input.readFully(b, off, l); + return l; + } + + @Override + public long skip(long n) throws IOException { + long count = 0; + while (count < n) { + int l = (int) Math.min(n, (long) Integer.MAX_VALUE); + count += input.skipBytes(l); + } + return count; + } + + } + + private static long profile(Long t1, String string) { + if (PROFILE) { + long t2 = System.nanoTime(); + long delta = t1 == null ? 0 : t2-t1; + System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms")); + return t2; + } + return 0L; + } + + public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException { + Double t = toBeforeTime; + Binding timeBinding = null; + + Bean[] items = history.getItems(); + //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items"); + + for (Bean item : items) { + String id = (String) item.getField("id"); + StreamAccessor sa = history.openStream(id, "rw"); + try { + Stream s = new Stream(sa); + timeBinding = s.timeBinding; + int currentSize = sa.size(); + int index = s.binarySearch(timeBinding, toBeforeTime); + int newSize = truncationSize(index); + if (newSize < currentSize) { + //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize); + sa.setSize(newSize); + + if (state != null) { + Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null; + Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null; + Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null; + boolean isNan = isNaN(prevValue); + + VariableState vs = state.values.get(id); + if (vs != null && vs.value != null && prevValue != null) { + vs.value.setValue(vs.value.getBinding(), prevValue); + vs.isValid = true; + vs.isNan = isNan; + } + + CollectorState.Item is = state.itemStates.get(id); + if (is != null) { + is.firstTime = toTime(prevTime); + is.firstValue = toValue(s.valueBinding, prevValue); + is.currentTime = is.firstTime; + is.currentValue = toValue(s.valueBinding, prevValue); + is.isNaN = isNan; + is.isValid = prevValue != null; + is.sum = Double.NaN; + is.count = 0; + is.ooDeadband = false; + is.firstDisabledTime = Double.NaN; + is.lastDisabledTime = Double.NaN; + is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT ); + } + } + } + } finally { + sa.close(); + } + } + + if (timeBinding != null && state != null) { + state.time.setValue(timeBinding, t); + state.dT = 1.0; + } + + return state; + } + + private static double toTime(Object time) { + return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN; + } + + private static MutableVariant toValue(Binding valueBinding, Object value) { + return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked()); + } + + private static boolean isNaN(Object value) { + return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false; + } + + private static int truncationSize(int binarySearchResult) { + return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1); + } + +}