X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.simulation%2Fsrc%2Forg%2Fsimantics%2Fsimulation%2Fhistory%2FHistoryUtil.java;h=343adefc8686fc2053b6c4b41e9167d0e937372d;hp=53a76d1360d783011320f451eb86838aa170d1bb;hb=b000e272429e157638c0384878b07b8dcd758472;hpb=b93b08597911347520cbd9e900be026645f7b743 diff --git a/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java b/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java index 53a76d136..343adefc8 100644 --- a/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java +++ b/bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2014 Association for Decentralized Information Management in + * Copyright (c) 2007, 2018 Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -8,7 +8,7 @@ * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation - * Semantum Oy - archive implementation + * Semantum Oy - archive implementation, gitlab simantics/platform#227 *******************************************************************************/ package org.simantics.simulation.history; @@ -16,6 +16,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -41,6 +42,7 @@ import org.simantics.databoard.container.DataContainer; import org.simantics.databoard.container.DataContainers; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.util.Bean; +import org.simantics.databoard.util.binary.BinaryFile; import org.simantics.databoard.util.binary.OutputStreamWriteable; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.ReadGraph; @@ -52,19 +54,22 @@ import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ServiceNotFoundException; import org.simantics.db.request.Read; import org.simantics.fastlz.FastLZ; -import org.simantics.history.Collector; import org.simantics.history.HistoryException; import org.simantics.history.HistoryManager; import org.simantics.history.ItemManager; +import org.simantics.history.impl.CollectorImpl; import org.simantics.history.impl.CollectorState; import org.simantics.history.impl.CollectorState.VariableState; import org.simantics.history.util.Stream; import org.simantics.history.util.ValueBand; +import org.simantics.history.util.WeightedMedian; import org.simantics.layer0.Layer0; import org.simantics.simulation.Activator; import org.simantics.simulation.ontology.HistoryResource; import org.simantics.simulation.ontology.SimulationResource; import org.simantics.utils.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import gnu.trove.map.TObjectLongMap; import gnu.trove.map.hash.TObjectLongHashMap; @@ -77,21 +82,25 @@ import net.jpountz.lz4.LZ4BlockOutputStream; */ public class HistoryUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class); + private static final boolean DEBUG = false; private static final boolean PROFILE = true; - private enum Compression { + public enum Compression { FLZ, LZ4, + NONE, } private static final Compression USED_COMPRESSION = Compression.FLZ; + private static final String APPLICATION_X_TAR = "application/x-tar"; private static final String APPLICATION_X_LZ4 = "application/x-lz4"; private static final String APPLICATION_X_FLZ = "application/x-flz"; private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream - private static class ArchivedHistory { + public static class ArchivedHistory { public long totalSampleSize = 0L; public TObjectLongMap itemSizes = new TObjectLongHashMap(); } @@ -322,7 +331,7 @@ public class HistoryUtil { rab.position(0); rab.skipBytes(4); - ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab); + ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab); rab.position(0L); rab.writeInt((int)(rab.length() - 4L)); @@ -343,80 +352,56 @@ public class HistoryUtil { }; } - private static ArchivedHistory exportCompressedArchive( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, + public static ArchivedHistory exportCompressedArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, RandomAccessBinary rab) throws IOException, HistoryException { - switch (USED_COMPRESSION) { - case FLZ: - return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab); - case LZ4: - return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab); - default: - throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION); - } + return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab); } - private static ArchivedHistory exportArchiveLZ4( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, + public static ArchivedHistory exportCompressedArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, + Compression compression, RandomAccessBinary rab) throws IOException, HistoryException { - OutputStream closeable = null; - try { - DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null)); - closeable = new RABOutputStream( rab ); - closeable = new LZ4BlockOutputStream( closeable ); - - ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end ); - - closeable.flush(); - closeable.close(); - closeable = null; - - return archive; - } finally { - FileUtils.uncheckedClose(closeable); + switch (compression) { + case FLZ: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab))); + case LZ4: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab))); + case NONE: + DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null)); + return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab)); + default: + throw new UnsupportedOperationException("Unsupported compression: " + compression); } } - @SuppressWarnings("resource") - private static ArchivedHistory exportArchiveFLZ( - final HistoryManager history, - final Bean collectorState, - final Resource historyResource, - final Double timeShift, - final double from, - final double end, - RandomAccessBinary rab) + private static ArchivedHistory exportArchive( + HistoryManager history, + Bean collectorState, + Double timeShift, + double from, + double end, + OutputStream out) throws IOException, HistoryException { - OutputStream closeable = null; - try { - DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null)); - closeable = new RABOutputStream( rab ); - closeable = FastLZ.write( closeable ); - - ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end ); - - closeable.flush(); - closeable.close(); - closeable = null; - + try (OutputStream os = out) { + ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end ); + os.flush(); return archive; - } finally { - FileUtils.uncheckedClose(closeable); } } @@ -611,7 +596,7 @@ public class HistoryUtil { try { if (PROFILE) profile(null, "importing history items from archived format to disk workarea"); - importHistoryArchive(graph, historyResource, archive, history, result); + importHistoryArchive(graph, archive, history, result); } catch (IOException e) { throw new DatabaseException(e); } catch (HistoryException e) { @@ -670,6 +655,24 @@ public class HistoryUtil { } } + /** + * @param history + * @param path + * @return + * @throws HistoryException + * @throws IOException + */ + public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException { + HistoryImportResult result = new HistoryImportResult(); + try (RandomAccessBinary rab = new BinaryFile(path.toFile())) { + importHistoryArchive(history, rab, result); + return result; + } catch (IOException e) { + LOGGER.error("Failed to import history from archive {}", path); + throw e; + } + } + /** * Import all history items from database archive into a history manager. * @@ -680,37 +683,51 @@ public class HistoryUtil { * @return true if successful * @throws DatabaseException */ - @SuppressWarnings("resource") - private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result) - throws DatabaseException, IOException, HistoryException - { + private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException { RandomAccessBinary rab = graph.getRandomAccessBinary(archive); + // This is here because the data is encoded as an L0.ByteArray + // in the database which means the first integer describes the + // length of the byte array. We skip that length data here. rab.position(4); + return importHistoryArchive(history, rab, result); + } + + /** + * Import all history items from database archive into a history manager. + * + * @param graph + * @param historyResource + * @param archive + * @param history + * @return true if successful + * @throws DatabaseException + */ + private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result) + throws IOException, HistoryException + { DataContainer dc = DataContainers.readHeader(rab); if (DEBUG) System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position())); - if (APPLICATION_X_LZ4.equals(dc.format)) { - if (dc.version == ARCHIVE_VERSION_1) { - InputStream in = new RABInputStream(rab); - //in = new BufferedInputStream(in); - in = new LZ4BlockInputStream(in); - return extractHistoryArchiveTar(graph, history, in, result); - } - } else if (APPLICATION_X_FLZ.equals(dc.format)) { - if (dc.version == ARCHIVE_VERSION_1) { - InputStream in = null; - try { - in = new RABInputStream(rab); - in = FastLZ.read(in); - return extractHistoryArchiveTar(graph, history, in, result); - } finally { - FileUtils.uncheckedClose(in); - } + if (dc.version == ARCHIVE_VERSION_1) { + if (APPLICATION_X_LZ4.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result); + } else if (APPLICATION_X_FLZ.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result); + } else if (APPLICATION_X_TAR.equals(dc.format)) { + return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result); } + throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version); + } else { + throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format); + } + } + + private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException { + try (InputStream _in = in) { + return extractHistoryArchiveTar(history, in, result); } - throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version); } /** @@ -722,8 +739,8 @@ public class HistoryUtil { * @return true if successful * @throws DatabaseException */ - private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result) - throws DatabaseException, IOException, HistoryException + private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result) + throws IOException, HistoryException { // tar is not closed on purpose because we do not want to close RandomAccessBinary rab. TarArchiveInputStream tar = new TarArchiveInputStream(in); @@ -783,9 +800,9 @@ public class HistoryUtil { //component(3): = Boolean //component(4): = Boolean //component(5): = Boolean - //component(devil): = Boolean + //component(6): = Boolean //component(7): = Boolean - //component(music): = Boolean + //component(8): = Boolean //component(9): = Boolean //component(10): = Boolean // @@ -949,17 +966,16 @@ public class HistoryUtil { return 0L; } - public static void truncateHistory(double toBeforeTime, HistoryManager history, Collector collector) throws AccessorException, BindingException, HistoryException { + public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException { Double t = toBeforeTime; Binding timeBinding = null; - CollectorState state = collector != null ? (CollectorState) collector.getState() : null; Bean[] items = history.getItems(); //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items"); for (Bean item : items) { String id = (String) item.getField("id"); - StreamAccessor sa = history.openStream(id, "w"); + StreamAccessor sa = history.openStream(id, "rw"); try { Stream s = new Stream(sa); timeBinding = s.timeBinding; @@ -972,7 +988,8 @@ public class HistoryUtil { if (state != null) { Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null; - Object prevValue = newSize > 0 ? sa.get(newSize - 1, s.valueBinding) : null; + Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null; + Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null; boolean isNan = isNaN(prevValue); VariableState vs = state.values.get(id); @@ -984,18 +1001,18 @@ public class HistoryUtil { CollectorState.Item is = state.itemStates.get(id); if (is != null) { - is.firstTime = Double.NaN; - is.firstValue = null; - is.currentTime = toTime(prevTime); - is.currentValue = prevValue != null ? new MutableVariant(s.valueBinding, prevValue) : null; - is.isNaN = isNaN(is.currentValue); - is.isValid = is.currentValue != null; + is.firstTime = toTime(prevTime); + is.firstValue = toValue(s.valueBinding, prevValue); + is.currentTime = is.firstTime; + is.currentValue = toValue(s.valueBinding, prevValue); + is.isNaN = isNan; + is.isValid = prevValue != null; is.sum = Double.NaN; is.count = 0; is.ooDeadband = false; is.firstDisabledTime = Double.NaN; is.lastDisabledTime = Double.NaN; - is.median = null; + is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT ); } } } @@ -1007,14 +1024,19 @@ public class HistoryUtil { if (timeBinding != null && state != null) { state.time.setValue(timeBinding, t); state.dT = 1.0; - collector.setState(state); } + + return state; } private static double toTime(Object time) { return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN; } + private static MutableVariant toValue(Binding valueBinding, Object value) { + return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked()); + } + private static boolean isNaN(Object value) { return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false; }