/*******************************************************************************
- * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
* Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
*
* Contributors:
* VTT Technical Research Centre of Finland - initial API and implementation
- * Semantum Oy - archive implementation
+ * Semantum Oy - archive implementation, gitlab simantics/platform#227
*******************************************************************************/
package org.simantics.simulation.history;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.simantics.databoard.container.DataContainers;
import org.simantics.databoard.serialization.Serializer;
import org.simantics.databoard.util.Bean;
+import org.simantics.databoard.util.binary.BinaryFile;
import org.simantics.databoard.util.binary.OutputStreamWriteable;
import org.simantics.databoard.util.binary.RandomAccessBinary;
import org.simantics.db.ReadGraph;
import org.simantics.db.exception.ServiceNotFoundException;
import org.simantics.db.request.Read;
import org.simantics.fastlz.FastLZ;
-import org.simantics.history.Collector;
import org.simantics.history.HistoryException;
import org.simantics.history.HistoryManager;
import org.simantics.history.ItemManager;
+import org.simantics.history.impl.CollectorImpl;
import org.simantics.history.impl.CollectorState;
import org.simantics.history.impl.CollectorState.VariableState;
import org.simantics.history.util.Stream;
import org.simantics.history.util.ValueBand;
+import org.simantics.history.util.WeightedMedian;
import org.simantics.layer0.Layer0;
import org.simantics.simulation.Activator;
import org.simantics.simulation.ontology.HistoryResource;
private static final boolean DEBUG = false;
private static final boolean PROFILE = true;
- private enum Compression {
+ public enum Compression {
FLZ,
LZ4,
+ NONE,
}
private static final Compression USED_COMPRESSION = Compression.FLZ;
+ private static final String APPLICATION_X_TAR = "application/x-tar";
private static final String APPLICATION_X_LZ4 = "application/x-lz4";
private static final String APPLICATION_X_FLZ = "application/x-flz";
private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
- private static class ArchivedHistory {
+ public static class ArchivedHistory {
public long totalSampleSize = 0L;
public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
}
rab.position(0);
rab.skipBytes(4);
- ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
+ ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
rab.position(0L);
rab.writeInt((int)(rab.length() - 4L));
};
}
- private static ArchivedHistory exportCompressedArchive(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
RandomAccessBinary rab)
throws IOException, HistoryException
{
- switch (USED_COMPRESSION) {
- case FLZ:
- return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
- case LZ4:
- return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
- default:
- throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
- }
+ return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
}
- private static ArchivedHistory exportArchiveLZ4(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ Compression compression,
RandomAccessBinary rab)
throws IOException, HistoryException
{
- OutputStream closeable = null;
- try {
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
- closeable = new RABOutputStream( rab );
- closeable = new LZ4BlockOutputStream( closeable );
-
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
- closeable.flush();
- closeable.close();
- closeable = null;
-
- return archive;
- } finally {
- FileUtils.uncheckedClose(closeable);
+ switch (compression) {
+ case FLZ:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
+ case LZ4:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
+ case NONE:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
+ default:
+ throw new UnsupportedOperationException("Unsupported compression: " + compression);
}
}
- @SuppressWarnings("resource")
- private static ArchivedHistory exportArchiveFLZ(
- final HistoryManager history,
- final Bean collectorState,
- final Resource historyResource,
- final Double timeShift,
- final double from,
- final double end,
- RandomAccessBinary rab)
+ private static ArchivedHistory exportArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ OutputStream out)
throws IOException, HistoryException
{
- OutputStream closeable = null;
- try {
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
- closeable = new RABOutputStream( rab );
- closeable = FastLZ.write( closeable );
-
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
- closeable.flush();
- closeable.close();
- closeable = null;
-
+ try (OutputStream os = out) {
+ ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
+ os.flush();
return archive;
- } finally {
- FileUtils.uncheckedClose(closeable);
}
}
try {
if (PROFILE)
profile(null, "importing history items from archived format to disk workarea");
- importHistoryArchive(graph, historyResource, archive, history, result);
+ importHistoryArchive(graph, archive, history, result);
} catch (IOException e) {
throw new DatabaseException(e);
} catch (HistoryException e) {
}
}
+ /**
+ * @param history
+ * @param path
+ * @return
+ * @throws HistoryException
+ * @throws IOException
+ */
+ public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
+ HistoryImportResult result = new HistoryImportResult();
+ try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
+ importHistoryArchive(history, rab, result);
+ return result;
+ }
+ }
+
/**
* Import all history items from database archive into a history manager.
*
* @return <code>true</code> if successful
* @throws DatabaseException
*/
- @SuppressWarnings("resource")
- private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
- throws DatabaseException, IOException, HistoryException
- {
+ private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
+ // This is here because the data is encoded as an L0.ByteArray
+ // in the database which means the first integer describes the
+ // length of the byte array. We skip that length data here.
rab.position(4);
+ return importHistoryArchive(history, rab, result);
+ }
+
+ /**
+ * Import all history items from database archive into a history manager.
+ *
+ * @param graph
+ * @param historyResource
+ * @param archive
+ * @param history
+ * @return <code>true</code> if successful
+ * @throws DatabaseException
+ */
+ private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
+ throws IOException, HistoryException
+ {
DataContainer dc = DataContainers.readHeader(rab);
if (DEBUG)
System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
- if (APPLICATION_X_LZ4.equals(dc.format)) {
- if (dc.version == ARCHIVE_VERSION_1) {
- InputStream in = new RABInputStream(rab);
- //in = new BufferedInputStream(in);
- in = new LZ4BlockInputStream(in);
- return extractHistoryArchiveTar(graph, history, in, result);
- }
- } else if (APPLICATION_X_FLZ.equals(dc.format)) {
- if (dc.version == ARCHIVE_VERSION_1) {
- InputStream in = null;
- try {
- in = new RABInputStream(rab);
- in = FastLZ.read(in);
- return extractHistoryArchiveTar(graph, history, in, result);
- } finally {
- FileUtils.uncheckedClose(in);
- }
+ if (dc.version == ARCHIVE_VERSION_1) {
+ if (APPLICATION_X_LZ4.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_FLZ.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_TAR.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
}
+ throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
+ } else {
+ throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
+ }
+ }
+
+ private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
+ try (InputStream _in = in) {
+ return extractHistoryArchiveTar(history, in, result);
}
- throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
}
/**
* @return <code>true</code> if successful
* @throws DatabaseException
*/
- private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
- throws DatabaseException, IOException, HistoryException
+ private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
+ throws IOException, HistoryException
{
// tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
TarArchiveInputStream tar = new TarArchiveInputStream(in);
//component(3): = Boolean
//component(4): = Boolean
//component(5): = Boolean
- //component(devil): = Boolean
+ //component(6): = Boolean
//component(7): = Boolean
- //component(music): = Boolean
+ //component(8): = Boolean
//component(9): = Boolean
//component(10): = Boolean
//
return 0L;
}
- public static void truncateHistory(double toBeforeTime, HistoryManager history, Collector collector) throws AccessorException, BindingException, HistoryException {
+ public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
Double t = toBeforeTime;
Binding timeBinding = null;
- CollectorState state = collector != null ? (CollectorState) collector.getState() : null;
Bean[] items = history.getItems();
//System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
for (Bean item : items) {
String id = (String) item.getField("id");
- StreamAccessor sa = history.openStream(id, "w");
+ StreamAccessor sa = history.openStream(id, "rw");
try {
Stream s = new Stream(sa);
timeBinding = s.timeBinding;
if (state != null) {
Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
- Object prevValue = newSize > 0 ? sa.get(newSize - 1, s.valueBinding) : null;
+ Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
+ Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
boolean isNan = isNaN(prevValue);
VariableState vs = state.values.get(id);
CollectorState.Item is = state.itemStates.get(id);
if (is != null) {
- is.firstTime = Double.NaN;
- is.firstValue = null;
- is.currentTime = toTime(prevTime);
- is.currentValue = prevValue != null ? new MutableVariant(s.valueBinding, prevValue) : null;
- is.isNaN = isNaN(is.currentValue);
- is.isValid = is.currentValue != null;
+ is.firstTime = toTime(prevTime);
+ is.firstValue = toValue(s.valueBinding, prevValue);
+ is.currentTime = is.firstTime;
+ is.currentValue = toValue(s.valueBinding, prevValue);
+ is.isNaN = isNan;
+ is.isValid = prevValue != null;
is.sum = Double.NaN;
is.count = 0;
is.ooDeadband = false;
is.firstDisabledTime = Double.NaN;
is.lastDisabledTime = Double.NaN;
- is.median = null;
+ is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
}
}
}
if (timeBinding != null && state != null) {
state.time.setValue(timeBinding, t);
state.dT = 1.0;
- collector.setState(state);
}
+
+ return state;
}
private static double toTime(Object time) {
return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
}
+ private static MutableVariant toValue(Binding valueBinding, Object value) {
+ return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
+ }
+
private static boolean isNaN(Object value) {
return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
}