]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
Merge changes I78c3a258,I7bf72f04
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
index 8b78fe18e50f203c13170986ce9f31045c844379..343adefc8686fc2053b6c4b41e9167d0e937372d 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
  * Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -8,24 +8,19 @@
  *
  * Contributors:
  *     VTT Technical Research Centre of Finland - initial API and implementation
- *     Semantum Oy - archive implementation
+ *     Semantum Oy - archive implementation, gitlab simantics/platform#227
  *******************************************************************************/
 package org.simantics.simulation.history;
 
-import gnu.trove.map.TObjectLongMap;
-import gnu.trove.map.hash.TObjectLongHashMap;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import net.jpountz.lz4.LZ4BlockInputStream;
-import net.jpountz.lz4.LZ4BlockOutputStream;
-
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
@@ -41,11 +36,13 @@ import org.simantics.databoard.binding.RecordBinding;
 import org.simantics.databoard.binding.error.BindingConstructionException;
 import org.simantics.databoard.binding.error.BindingException;
 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
+import org.simantics.databoard.binding.mutable.MutableVariant;
 import org.simantics.databoard.binding.mutable.Variant;
 import org.simantics.databoard.container.DataContainer;
 import org.simantics.databoard.container.DataContainers;
 import org.simantics.databoard.serialization.Serializer;
 import org.simantics.databoard.util.Bean;
+import org.simantics.databoard.util.binary.BinaryFile;
 import org.simantics.databoard.util.binary.OutputStreamWriteable;
 import org.simantics.databoard.util.binary.RandomAccessBinary;
 import org.simantics.db.ReadGraph;
@@ -60,13 +57,24 @@ import org.simantics.fastlz.FastLZ;
 import org.simantics.history.HistoryException;
 import org.simantics.history.HistoryManager;
 import org.simantics.history.ItemManager;
+import org.simantics.history.impl.CollectorImpl;
+import org.simantics.history.impl.CollectorState;
+import org.simantics.history.impl.CollectorState.VariableState;
 import org.simantics.history.util.Stream;
 import org.simantics.history.util.ValueBand;
+import org.simantics.history.util.WeightedMedian;
 import org.simantics.layer0.Layer0;
 import org.simantics.simulation.Activator;
 import org.simantics.simulation.ontology.HistoryResource;
 import org.simantics.simulation.ontology.SimulationResource;
 import org.simantics.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.TObjectLongMap;
+import gnu.trove.map.hash.TObjectLongHashMap;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
 
 /**
  * @author Toni Kalajainen
@@ -74,21 +82,25 @@ import org.simantics.utils.FileUtils;
  */
 public class HistoryUtil {
 
+       private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class);
+
        private static final boolean DEBUG = false;
        private static final boolean PROFILE = true;
 
-       private enum Compression {
+       public enum Compression {
                FLZ,
                LZ4,
+               NONE,
        }
 
        private static final Compression USED_COMPRESSION = Compression.FLZ;
 
+       private static final String APPLICATION_X_TAR = "application/x-tar";
        private static final String APPLICATION_X_LZ4 = "application/x-lz4";
        private static final String APPLICATION_X_FLZ = "application/x-flz";
        private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
 
-       private static class ArchivedHistory {
+       public static class ArchivedHistory {
                public long totalSampleSize = 0L;
                public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
        }
@@ -319,7 +331,7 @@ public class HistoryUtil {
                                        rab.position(0);
                                        rab.skipBytes(4);
 
-                                       ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
+                                       ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
 
                                        rab.position(0L);
                                        rab.writeInt((int)(rab.length() - 4L));
@@ -340,80 +352,56 @@ public class HistoryUtil {
                };
        }
 
-       private static ArchivedHistory exportCompressedArchive(
-                       final HistoryManager history,
-                       final Bean collectorState,
-                       final Resource historyResource,
-                       final Double timeShift, 
-                       final double from,
-                       final double end,
+       public static ArchivedHistory exportCompressedArchive(
+                       HistoryManager history,
+                       Bean collectorState,
+                       Double timeShift, 
+                       double from,
+                       double end,
                        RandomAccessBinary rab)
                                        throws IOException, HistoryException
        {
-               switch (USED_COMPRESSION) {
-               case FLZ:
-                       return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
-               case LZ4:
-                       return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
-               default:
-                       throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
-               }
+               return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
        }
 
-       private static ArchivedHistory exportArchiveLZ4(
-                       final HistoryManager history,
-                       final Bean collectorState,
-                       final Resource historyResource,
-                       final Double timeShift, 
-                       final double from,
-                       final double end,
+       public static ArchivedHistory exportCompressedArchive(
+                       HistoryManager history,
+                       Bean collectorState,
+                       Double timeShift, 
+                       double from,
+                       double end,
+                       Compression compression,
                        RandomAccessBinary rab)
                                        throws IOException, HistoryException
        {
-               OutputStream closeable = null;
-               try {
-                       DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
-                       closeable = new RABOutputStream( rab );
-                       closeable = new LZ4BlockOutputStream( closeable );
-
-                       ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
-                       closeable.flush();
-                       closeable.close();
-                       closeable = null;
-
-                       return archive;
-               } finally {
-                       FileUtils.uncheckedClose(closeable);
+               switch (compression) {
+                       case FLZ:
+                               DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
+                               return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
+                       case LZ4:
+                               DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
+                               return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
+                       case NONE:
+                               DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
+                               return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
+                       default:
+                               throw new UnsupportedOperationException("Unsupported compression: " + compression);
                }
        }
 
-       @SuppressWarnings("resource")
-       private static ArchivedHistory exportArchiveFLZ(
-                       final HistoryManager history,
-                       final Bean collectorState,
-                       final Resource historyResource,
-                       final Double timeShift, 
-                       final double from,
-                       final double end,
-                       RandomAccessBinary rab)
+       private static ArchivedHistory exportArchive(
+                       HistoryManager history,
+                       Bean collectorState,
+                       Double timeShift, 
+                       double from,
+                       double end,
+                       OutputStream out)
                                        throws IOException, HistoryException
        {
-               OutputStream closeable = null;
-               try {
-                       DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
-                       closeable = new RABOutputStream( rab );
-                       closeable = FastLZ.write( closeable );
-
-                       ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
-
-                       closeable.flush();
-                       closeable.close();
-                       closeable = null;
-
+               try (OutputStream os = out) {
+                       ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
+                       os.flush();
                        return archive;
-               } finally {
-                       FileUtils.uncheckedClose(closeable);
                }
        }
 
@@ -608,7 +596,7 @@ public class HistoryUtil {
                                        try {
                                                if (PROFILE)
                                                        profile(null, "importing history items from archived format to disk workarea");
-                                               importHistoryArchive(graph, historyResource, archive, history, result);
+                                               importHistoryArchive(graph, archive, history, result);
                                        } catch (IOException e) {
                                                throw new DatabaseException(e);
                                        } catch (HistoryException e) {
@@ -667,6 +655,24 @@ public class HistoryUtil {
                }
        }
 
+       /**
+        * @param history
+        * @param path
+        * @return
+        * @throws HistoryException 
+        * @throws IOException 
+        */
+       public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
+               HistoryImportResult result = new HistoryImportResult();
+               try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
+                       importHistoryArchive(history, rab, result);
+                       return result;
+               } catch (IOException e) {
+                       LOGGER.error("Failed to import history from archive {}", path);
+                       throw e;
+               }
+       }
+
        /**
         * Import all history items from database archive into a history manager.
         * 
@@ -677,37 +683,51 @@ public class HistoryUtil {
         * @return <code>true</code> if successful
         * @throws DatabaseException 
         */
-       @SuppressWarnings("resource")
-       private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
-                       throws DatabaseException, IOException, HistoryException
-       {
+       private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
                RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
+               // This is here because the data is encoded as an L0.ByteArray
+               // in the database which means the first integer describes the
+               // length of the byte array. We skip that length data here.
                rab.position(4);
+               return importHistoryArchive(history, rab, result);
+       }
+
+       /**
+        * Import all history items from database archive into a history manager.
+        * 
+        * @param graph
+        * @param historyResource
+        * @param archive 
+        * @param history
+        * @return <code>true</code> if successful
+        * @throws DatabaseException 
+        */
+       private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
+                       throws IOException, HistoryException
+       {
                DataContainer dc = DataContainers.readHeader(rab);
 
                if (DEBUG)
                        System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
 
-               if (APPLICATION_X_LZ4.equals(dc.format)) {
-                       if (dc.version == ARCHIVE_VERSION_1) {
-                               InputStream in = new RABInputStream(rab);
-                               //in = new BufferedInputStream(in);
-                               in = new LZ4BlockInputStream(in);
-                               return extractHistoryArchiveTar(graph, history, in, result);
-                       }
-               } else if (APPLICATION_X_FLZ.equals(dc.format)) {
-                       if (dc.version == ARCHIVE_VERSION_1) {
-                               InputStream in = null;
-                               try {
-                                       in = new RABInputStream(rab);
-                                       in = FastLZ.read(in);
-                                       return extractHistoryArchiveTar(graph, history, in, result);
-                               } finally {
-                                       FileUtils.uncheckedClose(in);
-                               }
+               if (dc.version == ARCHIVE_VERSION_1) {
+                       if (APPLICATION_X_LZ4.equals(dc.format)) {
+                               return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
+                       } else if (APPLICATION_X_FLZ.equals(dc.format)) {
+                               return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
+                       } else if (APPLICATION_X_TAR.equals(dc.format)) {
+                               return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
                        }
+                       throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
+               } else {
+                       throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
+               }
+       }
+
+       private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
+               try (InputStream _in = in) {
+                       return extractHistoryArchiveTar(history, in, result);
                }
-               throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
        }
 
        /**
@@ -719,8 +739,8 @@ public class HistoryUtil {
         * @return <code>true</code> if successful
         * @throws DatabaseException 
         */
-       private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
-                       throws DatabaseException, IOException, HistoryException
+       private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
+                       throws IOException, HistoryException
        {
                // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
                TarArchiveInputStream tar = new TarArchiveInputStream(in);
@@ -780,9 +800,9 @@ public class HistoryUtil {
                                        //component(3):  = Boolean
                                        //component(4):  = Boolean
                                        //component(5):  = Boolean
-                                       //component(devil):  = Boolean
+                                       //component(6):  = Boolean
                                        //component(7):  = Boolean
-                                       //component(music):  = Boolean
+                                       //component(8):  = Boolean
                                        //component(9):  = Boolean
                                        //component(10):  = Boolean
                                        //
@@ -946,4 +966,83 @@ public class HistoryUtil {
                return 0L;
        }
 
+       public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
+               Double t = toBeforeTime;
+               Binding timeBinding = null;
+
+               Bean[] items = history.getItems();
+               //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
+
+               for (Bean item : items) {
+                       String id = (String) item.getField("id");
+                       StreamAccessor sa = history.openStream(id, "rw");
+                       try {
+                               Stream s = new Stream(sa);
+                               timeBinding = s.timeBinding;
+                               int currentSize = sa.size();
+                               int index = s.binarySearch(timeBinding, toBeforeTime);
+                               int newSize = truncationSize(index);
+                               if (newSize < currentSize) {
+                                       //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
+                                       sa.setSize(newSize);
+
+                                       if (state != null) {
+                                               Object prevTime  = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
+                                               Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
+                                               Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
+                                               boolean isNan = isNaN(prevValue);
+
+                                               VariableState vs = state.values.get(id);
+                                               if (vs != null && vs.value != null && prevValue != null) {
+                                                       vs.value.setValue(vs.value.getBinding(), prevValue);
+                                                       vs.isValid = true;
+                                                       vs.isNan = isNan;
+                                               }
+
+                                               CollectorState.Item is = state.itemStates.get(id);
+                                               if (is != null) {
+                                                       is.firstTime = toTime(prevTime);
+                                                       is.firstValue = toValue(s.valueBinding, prevValue);
+                                                       is.currentTime = is.firstTime;
+                                                       is.currentValue = toValue(s.valueBinding, prevValue);
+                                                       is.isNaN = isNan;
+                                                       is.isValid = prevValue != null;
+                                                       is.sum = Double.NaN;
+                                                       is.count = 0;
+                                                       is.ooDeadband = false;
+                                                       is.firstDisabledTime = Double.NaN;
+                                                       is.lastDisabledTime = Double.NaN;
+                                                       is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
+                                               }
+                                       }
+                               }
+                       } finally {
+                               sa.close();
+                       }
+               }
+
+               if (timeBinding != null && state != null) {
+                       state.time.setValue(timeBinding, t);
+                       state.dT = 1.0;
+               }
+
+               return state;
+       }
+
+       private static double toTime(Object time) {
+               return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
+       }
+
+       private static MutableVariant toValue(Binding valueBinding, Object value) {
+               return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
+       }
+
+       private static boolean isNaN(Object value) {
+               return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
+       }
+
+       private static int truncationSize(int binarySearchResult) {
+               return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
+       }
+
 }