/*******************************************************************************
- * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
* Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
*
* Contributors:
* VTT Technical Research Centre of Finland - initial API and implementation
- * Semantum Oy - archive implementation
+ * Semantum Oy - archive implementation, gitlab simantics/platform#227
*******************************************************************************/
package org.simantics.simulation.history;
-import gnu.trove.map.TObjectLongMap;
-import gnu.trove.map.hash.TObjectLongHashMap;
-
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import net.jpountz.lz4.LZ4BlockInputStream;
-import net.jpountz.lz4.LZ4BlockOutputStream;
-
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.simantics.databoard.binding.error.BindingConstructionException;
import org.simantics.databoard.binding.error.BindingException;
import org.simantics.databoard.binding.impl.ObjectArrayBinding;
+import org.simantics.databoard.binding.mutable.MutableVariant;
import org.simantics.databoard.binding.mutable.Variant;
import org.simantics.databoard.container.DataContainer;
import org.simantics.databoard.container.DataContainers;
import org.simantics.databoard.serialization.Serializer;
import org.simantics.databoard.util.Bean;
+import org.simantics.databoard.util.binary.BinaryFile;
import org.simantics.databoard.util.binary.OutputStreamWriteable;
import org.simantics.databoard.util.binary.RandomAccessBinary;
import org.simantics.db.ReadGraph;
import org.simantics.history.HistoryException;
import org.simantics.history.HistoryManager;
import org.simantics.history.ItemManager;
+import org.simantics.history.impl.CollectorImpl;
+import org.simantics.history.impl.CollectorState;
+import org.simantics.history.impl.CollectorState.VariableState;
import org.simantics.history.util.Stream;
import org.simantics.history.util.ValueBand;
+import org.simantics.history.util.WeightedMedian;
import org.simantics.layer0.Layer0;
import org.simantics.simulation.Activator;
import org.simantics.simulation.ontology.HistoryResource;
import org.simantics.simulation.ontology.SimulationResource;
import org.simantics.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.TObjectLongMap;
+import gnu.trove.map.hash.TObjectLongHashMap;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
/**
* @author Toni Kalajainen
*/
public class HistoryUtil {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class);
+
private static final boolean DEBUG = false;
private static final boolean PROFILE = true;
- private enum Compression {
+ public enum Compression {
FLZ,
LZ4,
+ NONE,
}
private static final Compression USED_COMPRESSION = Compression.FLZ;
+ private static final String APPLICATION_X_TAR = "application/x-tar";
private static final String APPLICATION_X_LZ4 = "application/x-lz4";
private static final String APPLICATION_X_FLZ = "application/x-flz";
private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
- private static class ArchivedHistory {
+ public static class ArchivedHistory {
public long totalSampleSize = 0L;
public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
}
rab.position(0);
rab.skipBytes(4);
- ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
+ ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
rab.position(0L);
rab.writeInt((int)(rab.length() - 4L));
};
}
- private static ArchivedHistory exportCompressedArchive(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
RandomAccessBinary rab)
throws IOException, HistoryException
{
- switch (USED_COMPRESSION) {
- case FLZ:
- return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
- case LZ4:
- return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
- default:
- throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
- }
+ return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
}
- private static ArchivedHistory exportArchiveLZ4(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ Compression compression,
RandomAccessBinary rab)
throws IOException, HistoryException
{
- OutputStream closeable = null;
- try {
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
- closeable = new RABOutputStream( rab );
- closeable = new LZ4BlockOutputStream( closeable );
-
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
- closeable.flush();
- closeable.close();
- closeable = null;
-
- return archive;
- } finally {
- FileUtils.uncheckedClose(closeable);
+ switch (compression) {
+ case FLZ:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
+ case LZ4:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
+ case NONE:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
+ default:
+ throw new UnsupportedOperationException("Unsupported compression: " + compression);
}
}
- @SuppressWarnings("resource")
- private static ArchivedHistory exportArchiveFLZ(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
- RandomAccessBinary rab)
+ private static ArchivedHistory exportArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ OutputStream out)
throws IOException, HistoryException
{
- OutputStream closeable = null;
- try {
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
- closeable = new RABOutputStream( rab );
- closeable = FastLZ.write( closeable );
-
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
- closeable.flush();
- closeable.close();
- closeable = null;
-
+ try (OutputStream os = out) {
+ ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
+ os.flush();
return archive;
- } finally {
- FileUtils.uncheckedClose(closeable);
}
}
try {
if (PROFILE)
profile(null, "importing history items from archived format to disk workarea");
- importHistoryArchive(graph, historyResource, archive, history, result);
+ importHistoryArchive(graph, archive, history, result);
} catch (IOException e) {
throw new DatabaseException(e);
} catch (HistoryException e) {
}
}
+ /**
+ * @param history
+ * @param path
+ * @return
+ * @throws HistoryException
+ * @throws IOException
+ */
+ public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
+ HistoryImportResult result = new HistoryImportResult();
+ try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
+ importHistoryArchive(history, rab, result);
+ return result;
+ } catch (IOException e) {
+ LOGGER.error("Failed to import history from archive {}", path);
+ throw e;
+ }
+ }
+
/**
* Import all history items from database archive into a history manager.
*
* @return <code>true</code> if successful
* @throws DatabaseException
*/
- @SuppressWarnings("resource")
- private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
- throws DatabaseException, IOException, HistoryException
- {
+ private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
+ // This is here because the data is encoded as an L0.ByteArray
+ // in the database which means the first integer describes the
+ // length of the byte array. We skip that length data here.
rab.position(4);
+ return importHistoryArchive(history, rab, result);
+ }
+
+ /**
+ * Import all history items from database archive into a history manager.
+ *
+ * @param graph
+ * @param historyResource
+ * @param archive
+ * @param history
+ * @return <code>true</code> if successful
+ * @throws DatabaseException
+ */
+ private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
+ throws IOException, HistoryException
+ {
DataContainer dc = DataContainers.readHeader(rab);
if (DEBUG)
System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
- if (APPLICATION_X_LZ4.equals(dc.format)) {
- if (dc.version == ARCHIVE_VERSION_1) {
- InputStream in = new RABInputStream(rab);
- //in = new BufferedInputStream(in);
- in = new LZ4BlockInputStream(in);
- return extractHistoryArchiveTar(graph, history, in, result);
- }
- } else if (APPLICATION_X_FLZ.equals(dc.format)) {
- if (dc.version == ARCHIVE_VERSION_1) {
- InputStream in = null;
- try {
- in = new RABInputStream(rab);
- in = FastLZ.read(in);
- return extractHistoryArchiveTar(graph, history, in, result);
- } finally {
- FileUtils.uncheckedClose(in);
- }
+ if (dc.version == ARCHIVE_VERSION_1) {
+ if (APPLICATION_X_LZ4.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_FLZ.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_TAR.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
}
+ throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
+ } else {
+ throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
+ }
+ }
+
+ private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
+ try (InputStream _in = in) {
+ return extractHistoryArchiveTar(history, in, result);
}
- throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
}
/**
* @return <code>true</code> if successful
* @throws DatabaseException
*/
- private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
- throws DatabaseException, IOException, HistoryException
+ private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
+ throws IOException, HistoryException
{
// tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
TarArchiveInputStream tar = new TarArchiveInputStream(in);
//component(3): = Boolean
//component(4): = Boolean
//component(5): = Boolean
- //component(devil): = Boolean
+ //component(6): = Boolean
//component(7): = Boolean
- //component(music): = Boolean
+ //component(8): = Boolean
//component(9): = Boolean
//component(10): = Boolean
//
return 0L;
}
+ public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
+ Double t = toBeforeTime;
+ Binding timeBinding = null;
+
+ Bean[] items = history.getItems();
+ //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
+
+ for (Bean item : items) {
+ String id = (String) item.getField("id");
+ StreamAccessor sa = history.openStream(id, "rw");
+ try {
+ Stream s = new Stream(sa);
+ timeBinding = s.timeBinding;
+ int currentSize = sa.size();
+ int index = s.binarySearch(timeBinding, toBeforeTime);
+ int newSize = truncationSize(index);
+ if (newSize < currentSize) {
+ //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
+ sa.setSize(newSize);
+
+ if (state != null) {
+ Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
+ Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
+ Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
+ boolean isNan = isNaN(prevValue);
+
+ VariableState vs = state.values.get(id);
+ if (vs != null && vs.value != null && prevValue != null) {
+ vs.value.setValue(vs.value.getBinding(), prevValue);
+ vs.isValid = true;
+ vs.isNan = isNan;
+ }
+
+ CollectorState.Item is = state.itemStates.get(id);
+ if (is != null) {
+ is.firstTime = toTime(prevTime);
+ is.firstValue = toValue(s.valueBinding, prevValue);
+ is.currentTime = is.firstTime;
+ is.currentValue = toValue(s.valueBinding, prevValue);
+ is.isNaN = isNan;
+ is.isValid = prevValue != null;
+ is.sum = Double.NaN;
+ is.count = 0;
+ is.ooDeadband = false;
+ is.firstDisabledTime = Double.NaN;
+ is.lastDisabledTime = Double.NaN;
+ is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
+ }
+ }
+ }
+ } finally {
+ sa.close();
+ }
+ }
+
+ if (timeBinding != null && state != null) {
+ state.time.setValue(timeBinding, t);
+ state.dT = 1.0;
+ }
+
+ return state;
+ }
+
+ private static double toTime(Object time) {
+ return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
+ }
+
+ private static MutableVariant toValue(Binding valueBinding, Object value) {
+ return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
+ }
+
+ private static boolean isNaN(Object value) {
+ return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
+ }
+
+ private static int truncationSize(int binarySearchResult) {
+ return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
+ }
+
}