-/*******************************************************************************\r
- * Copyright (c) 2007, 2014 Association for Decentralized Information Management in\r
- * Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- * Semantum Oy - archive implementation\r
- *******************************************************************************/\r
-package org.simantics.simulation.history;\r
-\r
-import gnu.trove.map.TObjectLongMap;\r
-import gnu.trove.map.hash.TObjectLongHashMap;\r
-\r
-import java.io.EOFException;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.OutputStream;\r
-import java.util.Arrays;\r
-import java.util.HashMap;\r
-import java.util.Map;\r
-\r
-import net.jpountz.lz4.LZ4BlockInputStream;\r
-import net.jpountz.lz4.LZ4BlockOutputStream;\r
-\r
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;\r
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;\r
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;\r
-import org.eclipse.core.runtime.IStatus;\r
-import org.eclipse.core.runtime.Status;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.accessor.StreamAccessor;\r
-import org.simantics.databoard.accessor.binary.BinaryObject;\r
-import org.simantics.databoard.accessor.error.AccessorException;\r
-import org.simantics.databoard.binding.ArrayBinding;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.RecordBinding;\r
-import org.simantics.databoard.binding.error.BindingConstructionException;\r
-import org.simantics.databoard.binding.error.BindingException;\r
-import org.simantics.databoard.binding.impl.ObjectArrayBinding;\r
-import org.simantics.databoard.binding.mutable.Variant;\r
-import org.simantics.databoard.container.DataContainer;\r
-import org.simantics.databoard.container.DataContainers;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.util.Bean;\r
-import org.simantics.databoard.util.binary.OutputStreamWriteable;\r
-import org.simantics.databoard.util.binary.RandomAccessBinary;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.common.request.UniqueRead;\r
-import org.simantics.db.common.request.WriteRequest;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.ServiceNotFoundException;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.fastlz.FastLZ;\r
-import org.simantics.history.HistoryException;\r
-import org.simantics.history.HistoryManager;\r
-import org.simantics.history.ItemManager;\r
-import org.simantics.history.util.Stream;\r
-import org.simantics.history.util.ValueBand;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.simulation.Activator;\r
-import org.simantics.simulation.ontology.HistoryResource;\r
-import org.simantics.simulation.ontology.SimulationResource;\r
-import org.simantics.utils.FileUtils;\r
-\r
-/**\r
- * @author Toni Kalajainen\r
- * @author Tuukka Lehtonen\r
- */\r
-public class HistoryUtil {\r
-\r
- private static final boolean DEBUG = false;\r
- private static final boolean PROFILE = true;\r
-\r
- private enum Compression {\r
- FLZ,\r
- LZ4,\r
- }\r
-\r
- private static final Compression USED_COMPRESSION = Compression.FLZ;\r
-\r
- private static final String APPLICATION_X_LZ4 = "application/x-lz4";\r
- private static final String APPLICATION_X_FLZ = "application/x-flz";\r
- private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream\r
-\r
- private static class ArchivedHistory {\r
- public long totalSampleSize = 0L;\r
- public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();\r
- }\r
-\r
- /**\r
- * Create request that exports history to graph.\r
- * \r
- * If removeOldUnused is true, items that do not exist in source history\r
- * are deleted from graph.\r
- * \r
- * If item already exists in the graph, it is overwritten.\r
- * \r
- * @param history source history\r
- * @param g write graph\r
- * @param historyResource Instance of HIS.History to write data to\r
- * @param removeOldUnused\r
- * @param timeShift adjust time values by this value \r
- * @param from time\r
- * @param end time\r
- * @return WriteRequest\r
- */\r
- public static WriteRequest export(final HistoryManager history, \r
- final Resource historyResource,\r
- final boolean removeOldUnused,\r
- final Double timeShift, \r
- final double from, final double end) \r
- {\r
- return new WriteRequest() {\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
- HistoryResource HIS = HistoryResource.getInstance(graph);\r
- Layer0 L0 = Layer0.getInstance(graph);\r
- long totalSampleSize = 0L;\r
- long s = System.nanoTime();\r
-\r
- try {\r
- Bean[] items = history.getItems();\r
- if (PROFILE)\r
- profile(null, "exporting " + items.length + " history items to database");\r
-\r
- ItemManager im = new ItemManager( items );\r
- Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();\r
- for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))\r
- {\r
- if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
- Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
- if ( bean == null ) continue;\r
- String id = (String) bean.getField("id");\r
- \r
- if (!im.exists(id)) {\r
- // There is item in graph that does not exist in source history\r
- if ( removeOldUnused ) {\r
- graph.deny(oldItem);\r
- }\r
- } else {\r
- // Item already exists, to-be-overwritten\r
- oldResourceMap.put(id, oldItem);\r
- }\r
- }\r
- \r
- for (Bean newItem : im.values())\r
- {\r
- //System.out.println("WRITING ITEM: " + newItem);\r
- String id = (String) newItem.getField("id");\r
- Resource historyItemResource = oldResourceMap.get(id);\r
- Resource data = null;\r
-\r
- if ( historyItemResource==null ) {\r
- historyItemResource = graph.newResource();\r
- graph.claim(historyResource, L0.ConsistsOf, historyItemResource);\r
- graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);\r
- graph.claimLiteral(historyItemResource, L0.HasName, id);\r
- } else {\r
- data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);\r
- }\r
- \r
- Variant v = new Variant(newItem.getBinding(), newItem);\r
- graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);\r
- \r
- if (data == null) {\r
- data = graph.newResource();\r
- graph.claim(data, L0.InstanceOf, L0.Variant);\r
- graph.claim(historyItemResource, HIS.History_Item_Series, data);\r
- } else {\r
- graph.denyValue(data);\r
- }\r
- \r
- \r
- // Write stream\r
- StreamAccessor sa = history.openStream(id, "r");\r
- try {\r
- //System.out.println("WRITING stream of size=" + sa.size());\r
- RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
- Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);\r
- Integer constantSampleSize = serializer.getConstantSize();\r
- ValueBand vb = new ValueBand(sampleBinding);\r
- Stream stream = new Stream(sa, sampleBinding);\r
- ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);\r
- Object array = null;\r
- //System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
-\r
- int count = 0;\r
- readArray: {\r
- // Start index\r
- int startIndex = stream.binarySearch(Bindings.DOUBLE, from); \r
- if ( startIndex < -stream.count() ) break readArray;\r
- if ( startIndex<0 ) startIndex = -2-startIndex;\r
- if ( startIndex == -1 ) startIndex = 0;\r
- \r
- // End index\r
- int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
- if ( endIndex == -1 ) break readArray;\r
- if ( endIndex<0 ) endIndex = -1-endIndex;\r
- if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
- if ( endIndex<startIndex ) break readArray;\r
- \r
- // Write sample count\r
- count = endIndex - startIndex + 1;\r
- array = arrayBinding.create(count);\r
- boolean hasTimeShift = timeShift!=null && timeShift != 0.0;\r
- //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());\r
- for (int i=0; i<count; i++) {\r
- Object sample = sa.get(i+startIndex, sampleBinding);\r
- \r
- // Adjust time\r
- if ( hasTimeShift ) {\r
- vb.setSample(sample);\r
- if ( vb.hasTime() ) {\r
- double newTime = vb.getTimeDouble() + timeShift;\r
- vb.setTime(Bindings.DOUBLE, newTime);\r
- }\r
- if ( vb.hasEndTime() ) {\r
- double newTime = vb.getEndTimeDouble() + timeShift;\r
- vb.setEndTime(Bindings.DOUBLE, newTime);\r
- }\r
- }\r
- //System.out.println("\t#" + i + ": sample=" + sample);\r
- arrayBinding.set(array, i, sample);\r
- }\r
- }\r
- if (array==null) array = arrayBinding.create();\r
- Variant v2 = new Variant(arrayBinding, array);\r
- graph.claimValue(data, v2, Bindings.VARIANT);\r
-\r
- if (constantSampleSize != null) {\r
- long itemSampleSize = ((long) count) * ((long) constantSampleSize);\r
- //System.out.println("item sample size: " + itemSampleSize);\r
- totalSampleSize += itemSampleSize;\r
- graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);\r
- }\r
- } catch (AccessorException e) {\r
- throw new DatabaseException(e);\r
- } finally {\r
- try {\r
- sa.close();\r
- } catch (AccessorException e) {\r
- }\r
- }\r
- }\r
-\r
- graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);\r
-\r
- if (PROFILE)\r
- profile(s, "exported " + items.length + " history items to database");\r
-\r
- } catch (HistoryException e) {\r
- throw new DatabaseException( e );\r
- } catch (BindingException e) {\r
- throw new DatabaseException( e );\r
- } catch (ServiceNotFoundException e) {\r
- throw new DatabaseException( e );\r
- }\r
- }\r
- };\r
- }\r
-\r
- /**\r
- * Create request that exports history to graph.\r
- * \r
- * If item already exists in the graph, it is overwritten.\r
- * \r
- * @param history\r
- * source history\r
- * @param collectorState\r
- * complete dump of the source history collector state or\r
- * <code>null</code> to skip collector state saving\r
- * @param historyResource\r
- * Instance of HIS.History to write data to\r
- * @param timeShift\r
- * adjust time values by this value\r
- * @param from\r
- * time\r
- * @param end\r
- * time\r
- * @return WriteRequest\r
- */\r
- public static WriteRequest exportArchive(\r
- final HistoryManager history,\r
- final Bean collectorState,\r
- final Resource historyResource,\r
- final Double timeShift, \r
- final double from,\r
- final double end) \r
- {\r
- return new WriteRequest() {\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
- HistoryResource HIS = HistoryResource.getInstance(graph);\r
- Layer0 L0 = Layer0.getInstance(graph);\r
-\r
- long s = System.nanoTime();\r
- if (PROFILE)\r
- profile(null, "archiving history items to database");\r
-\r
- // Remove all possibly existing old items in the history.\r
- for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))\r
- if (graph.isInstanceOf(entity, HIS.History_Item))\r
- graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);\r
-\r
- // Create new literal for history archival\r
- //graph.deny(historyResource, HIS.History_archive);\r
- graph.denyValue(historyResource, HIS.History_archive);\r
- Resource archiveLiteral = graph.newResource();\r
- graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);\r
- graph.claim(historyResource, HIS.History_archive, archiveLiteral);\r
-\r
- OutputStream closeable = null;\r
- try {\r
- RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);\r
- rab.position(0);\r
- rab.skipBytes(4);\r
-\r
- ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);\r
-\r
- rab.position(0L);\r
- rab.writeInt((int)(rab.length() - 4L));\r
-\r
- graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);\r
-\r
- if (PROFILE)\r
- profile(s, "archived history items of size " + archive.totalSampleSize + " to database");\r
-\r
- } catch (HistoryException e) {\r
- throw new DatabaseException( e );\r
- } catch (IOException e) {\r
- throw new DatabaseException( e );\r
- } finally {\r
- FileUtils.uncheckedClose(closeable);\r
- }\r
- }\r
- };\r
- }\r
-\r
- private static ArchivedHistory exportCompressedArchive(\r
- final HistoryManager history,\r
- final Bean collectorState,\r
- final Resource historyResource,\r
- final Double timeShift, \r
- final double from,\r
- final double end,\r
- RandomAccessBinary rab)\r
- throws IOException, HistoryException\r
- {\r
- switch (USED_COMPRESSION) {\r
- case FLZ:\r
- return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);\r
- case LZ4:\r
- return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);\r
- default:\r
- throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);\r
- }\r
- }\r
-\r
- private static ArchivedHistory exportArchiveLZ4(\r
- final HistoryManager history,\r
- final Bean collectorState,\r
- final Resource historyResource,\r
- final Double timeShift, \r
- final double from,\r
- final double end,\r
- RandomAccessBinary rab)\r
- throws IOException, HistoryException\r
- {\r
- OutputStream closeable = null;\r
- try {\r
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));\r
- closeable = new RABOutputStream( rab );\r
- closeable = new LZ4BlockOutputStream( closeable );\r
-\r
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
-\r
- closeable.flush();\r
- closeable.close();\r
- closeable = null;\r
-\r
- return archive;\r
- } finally {\r
- FileUtils.uncheckedClose(closeable);\r
- }\r
- }\r
-\r
- @SuppressWarnings("resource")\r
- private static ArchivedHistory exportArchiveFLZ(\r
- final HistoryManager history,\r
- final Bean collectorState,\r
- final Resource historyResource,\r
- final Double timeShift, \r
- final double from,\r
- final double end,\r
- RandomAccessBinary rab)\r
- throws IOException, HistoryException\r
- {\r
- OutputStream closeable = null;\r
- try {\r
- DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));\r
- closeable = new RABOutputStream( rab );\r
- closeable = FastLZ.write( closeable );\r
-\r
- ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
-\r
- closeable.flush();\r
- closeable.close();\r
- closeable = null;\r
-\r
- return archive;\r
- } finally {\r
- FileUtils.uncheckedClose(closeable);\r
- }\r
- }\r
-\r
- /**\r
- * Store the specified history manager's contents into a single archive\r
- * file.\r
- * \r
- * @param outputStream\r
- * the output stream where the history archive is written\r
- * @param history\r
- * source history\r
- * @param collectorState\r
- * complete state of the history collector at the time of saving\r
- * or <code>null</code> to not save any state\r
- * @param timeShift\r
- * adjust time values by this value\r
- * @param from\r
- * time\r
- * @param end\r
- * time\r
- * @return ArchivedHistory instance describing the created archive\r
- * @throws HistoryException\r
- * when there's a problem with reading the history data from the\r
- * history work area\r
- * @throws IOException\r
- * when there's a problem writing the resulting history archive\r
- * file\r
- */\r
- private static ArchivedHistory exportArchive(\r
- OutputStream outputStream,\r
- HistoryManager history,\r
- Bean collectorState,\r
- Double timeShift,\r
- double from,\r
- double end)\r
- throws IOException, HistoryException\r
- {\r
- ArchivedHistory result = new ArchivedHistory();\r
-\r
- Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
- boolean hasTimeShift = timeShift != null && timeShift != 0.0;\r
-\r
- TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);\r
- OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);\r
-\r
- try {\r
- if (collectorState != null) {\r
- // Write complete collector state into the archive\r
- if (DEBUG)\r
- System.out.println("WRITING collector state: " + collectorState);\r
- byte[] serializedCollectorState = beanSerializer.serialize( collectorState );\r
- putNextEntry(out, "state", serializedCollectorState.length);\r
- out.write(serializedCollectorState);\r
- out.closeArchiveEntry();\r
- }\r
-\r
- ItemManager im = new ItemManager( history.getItems() );\r
-\r
- for (Bean item : im.values()) {\r
- if (DEBUG)\r
- System.out.println("STORING ITEM: " + item);\r
- String id = (String) item.getField("id");\r
-\r
- // Write data stream metadata\r
- byte[] metadata = beanSerializer.serialize( item );\r
- putNextEntry(out, id + ".txt", metadata.length);\r
- out.write(metadata);\r
- out.closeArchiveEntry();\r
- if (DEBUG)\r
- System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));\r
-\r
-\r
- // Write data stream as a separate entry\r
- StreamAccessor sa = history.openStream(id, "r");\r
- try {\r
- if (DEBUG)\r
- System.out.println("STREAM SIZE=" + sa.size());\r
- RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
- Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);\r
- Integer constantSampleSize = sampleSerializer.getConstantSize();\r
-\r
- if (constantSampleSize == null) {\r
- // Variable length samples - no support for now.\r
- System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");\r
- continue;\r
- } else {\r
- ValueBand vb = new ValueBand(sampleBinding);\r
- Stream stream = new Stream(sa, sampleBinding);\r
- if (DEBUG)\r
- System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
-\r
- readArray: {\r
- // Start index\r
- int startIndex = stream.binarySearch(Bindings.DOUBLE, from);\r
- if ( startIndex < -stream.count() ) break readArray;\r
- if ( startIndex<0 ) startIndex = -2-startIndex;\r
- if ( startIndex == -1 ) startIndex = 0;\r
-\r
- // End index\r
- int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
- if ( endIndex == -1 ) break readArray;\r
- if ( endIndex<0 ) endIndex = -1-endIndex;\r
- if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
- if ( endIndex<startIndex ) break readArray;\r
-\r
- // Written sample count\r
- int count = endIndex - startIndex + 1;\r
- if (DEBUG)\r
- System.out.println("WRITTEN sample count=" + count);\r
-\r
- long savedSamplesSize = ((long) count) * ((long) constantSampleSize);\r
- if (DEBUG)\r
- System.out.println("saved samples size in bytes: " + savedSamplesSize);\r
- putNextEntry(out, id + ".data", savedSamplesSize);\r
-\r
- for (int i=0; i<count; i++) {\r
- Object sample = sa.get(i+startIndex, sampleBinding);\r
-\r
- // Adjust time\r
- if ( hasTimeShift ) {\r
- vb.setSample(sample);\r
- if ( vb.hasTime() ) {\r
- double newTime = vb.getTimeDouble() + timeShift;\r
- vb.setTime(Bindings.DOUBLE, newTime);\r
- }\r
- if ( vb.hasEndTime() ) {\r
- double newTime = vb.getEndTimeDouble() + timeShift;\r
- vb.setEndTime(Bindings.DOUBLE, newTime);\r
- }\r
- }\r
- //System.out.println("\t#" + i + ": sample=" + sample);\r
- sampleSerializer.serialize(dataOutput, sample);\r
- }\r
- out.closeArchiveEntry();\r
-\r
- result.itemSizes.put(item, savedSamplesSize);\r
- result.totalSampleSize += savedSamplesSize;\r
- }\r
- }\r
- } catch (AccessorException e) {\r
- throw new IOException(e);\r
- } finally {\r
- try {\r
- sa.close();\r
- } catch (AccessorException e) {\r
- }\r
- }\r
- \r
- }\r
-\r
- return result;\r
- } catch (BindingException e) {\r
- throw new HistoryException(e);\r
- } finally {\r
- out.finish();\r
- }\r
- }\r
-\r
- /**\r
- * Import all history items from graph into a history manager.\r
- * \r
- * @param r history resource or initial condition resource source HIS.History resource\r
- * @param history destination history\r
- * @return read request that always returns a HistoryImportResult instance\r
- */\r
- public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)\r
- {\r
- return new UniqueRead<HistoryImportResult>() {\r
- @Override\r
- public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {\r
- HistoryImportResult result = new HistoryImportResult();\r
-\r
- HistoryResource HIS = HistoryResource.getInstance(graph);\r
- SimulationResource SIM = SimulationResource.getInstance(graph);\r
- Resource historyResource = r;\r
-\r
- if (!graph.isInstanceOf(historyResource, HIS.History))\r
- historyResource = graph.getPossibleObject(r, SIM.State_History);\r
- if (historyResource == null)\r
- return result;\r
- if (!graph.isInstanceOf(historyResource, HIS.History))\r
- return result;\r
-\r
- long s = System.nanoTime();\r
-\r
- Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);\r
- if (archive == null) {\r
- if (PROFILE)\r
- profile(null, "importing history items from old database format to disk workarea");\r
- importHistoryItems(graph, historyResource, history);\r
- } else {\r
- try {\r
- if (PROFILE)\r
- profile(null, "importing history items from archived format to disk workarea");\r
- importHistoryArchive(graph, historyResource, archive, history, result);\r
- } catch (IOException e) {\r
- throw new DatabaseException(e);\r
- } catch (HistoryException e) {\r
- throw new DatabaseException(e);\r
- }\r
- }\r
-\r
- if (PROFILE)\r
- profile(s, "imported history items from database to disk workarea");\r
-\r
- return result;\r
- }\r
- };\r
- }\r
-\r
- /**\r
- * Import all history items from graph into a history manager.\r
- * \r
- * @param r history resource or initial condition resource source HIS.History resource\r
- * @param history destination history\r
- * @return <code>true</code> if successful\r
- */\r
- private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException\r
- {\r
- HistoryResource HIS = HistoryResource.getInstance(graph);\r
- Layer0 L0 = Layer0.getInstance(graph);\r
-\r
- try {\r
- for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {\r
- if (!graph.isInstanceOf(oldItem, HIS.History_Item))\r
- continue;\r
-\r
- Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
- String id = (String) bean.getFieldUnchecked("id");\r
- Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);\r
- Binding arrayBinding = Bindings.getBinding( array.getClass() );\r
- history.modify(bean);\r
- StreamAccessor sa = history.openStream(id, "rw");\r
- try {\r
- sa.setValue(arrayBinding, array);\r
- } finally {\r
- try {\r
- sa.close();\r
- } catch (AccessorException e) {\r
- }\r
- }\r
- }\r
-\r
- return true;\r
- } catch (AccessorException e) {\r
- throw new DatabaseException(e);\r
- } catch (BindingConstructionException e) {\r
- throw new DatabaseException(e);\r
- } catch (HistoryException e) {\r
- throw new DatabaseException(e);\r
- }\r
- }\r
-\r
- /**\r
- * Import all history items from database archive into a history manager.\r
- * \r
- * @param graph\r
- * @param historyResource\r
- * @param archive \r
- * @param history\r
- * @return <code>true</code> if successful\r
- * @throws DatabaseException \r
- */\r
- @SuppressWarnings("resource")\r
- private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)\r
- throws DatabaseException, IOException, HistoryException\r
- {\r
- RandomAccessBinary rab = graph.getRandomAccessBinary(archive);\r
- rab.position(4);\r
- DataContainer dc = DataContainers.readHeader(rab);\r
-\r
- if (DEBUG)\r
- System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));\r
-\r
- if (APPLICATION_X_LZ4.equals(dc.format)) {\r
- if (dc.version == ARCHIVE_VERSION_1) {\r
- InputStream in = new RABInputStream(rab);\r
- //in = new BufferedInputStream(in);\r
- in = new LZ4BlockInputStream(in);\r
- return extractHistoryArchiveTar(graph, history, in, result);\r
- }\r
- } else if (APPLICATION_X_FLZ.equals(dc.format)) {\r
- if (dc.version == ARCHIVE_VERSION_1) {\r
- InputStream in = null;\r
- try {\r
- in = new RABInputStream(rab);\r
- in = FastLZ.read(in);\r
- return extractHistoryArchiveTar(graph, history, in, result);\r
- } finally {\r
- FileUtils.uncheckedClose(in);\r
- }\r
- }\r
- }\r
- throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);\r
- }\r
-\r
- /**\r
- * Import all history items from graph into a history manager\r
- * \r
- * @param graph database access\r
- * @param history destination history\r
- * @param rab the archive to extract from\r
- * @return <code>true</code> if successful\r
- * @throws DatabaseException \r
- */\r
- private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)\r
- throws DatabaseException, IOException, HistoryException\r
- {\r
- // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.\r
- TarArchiveInputStream tar = new TarArchiveInputStream(in);\r
-\r
- Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
-\r
- Bean lastItem = null;\r
- String lastItemId = null;\r
- while (true) {\r
- TarArchiveEntry entry = tar.getNextTarEntry();\r
- if (entry == null)\r
- break;\r
-\r
- String name = entry.getName();\r
- boolean state = name.equals("state");\r
- boolean metadata = name.endsWith(".txt");\r
- boolean data = name.endsWith(".data");\r
-\r
- if (state) {\r
- if (result != null) {\r
- byte[] st = new byte[(int) entry.getSize()];\r
- tar.read(st);\r
- result.collectorState = (Bean) beanSerializer.deserialize(st);\r
- if (DEBUG)\r
- System.out.println("READ collector state: " + result.collectorState);\r
- } else {\r
- tar.skip(entry.getSize());\r
- }\r
- } else if (metadata) {\r
- byte[] md = new byte[(int) entry.getSize()];\r
- tar.read(md);\r
- if (DEBUG) {\r
- System.out.println("READING Item metadata: " + name);\r
- System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));\r
- }\r
-\r
- try {\r
- Bean bean = (Bean) beanSerializer.deserialize(md);\r
- if (DEBUG)\r
- System.out.println("READ Item metadata: " + bean);\r
- history.modify(bean);\r
- lastItem = bean;\r
- lastItemId = (String) bean.getFieldUnchecked("id");\r
- } catch (ClassFormatError e) {\r
- // This is here because LZ4BlockInput/OutputStream seemed to\r
- // be causing weird deserialization errors to occur for\r
- // single data items when trying to deserialize beans. The\r
- // reason being that the serialized data that was read only\r
- // contains the first 512 bytes of the data. After that all\r
- // data is zeros, which causes things like this to happen in\r
- // deserialization:\r
- //signature: rsstzzzzzzzze_b7b532e9\r
- //component(o): id = String\r
- //component(1): variableId = String\r
- //component(2): format = Temp1\r
- //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])\r
- //component(3): = Boolean\r
- //component(4): = Boolean\r
- //component(5): = Boolean\r
- //component(devil): = Boolean\r
- //component(7): = Boolean\r
- //component(music): = Boolean\r
- //component(9): = Boolean\r
- //component(10): = Boolean\r
- //\r
- // For this reason we've switched the default compression to FastLZ\r
- // for now. This is reflected in the input format also.\r
-\r
- Activator.getDefault().getLog().log(\r
- new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;\r
- }\r
- } else if (data && lastItem != null) {\r
- StreamAccessor sa = history.openStream(lastItemId, "rw");\r
- try {\r
- if (sa instanceof BinaryObject) {\r
- BinaryObject bo = (BinaryObject) sa;\r
- RandomAccessBinary output = bo.getBinary();\r
- output.position(0L);\r
- output.setLength(0L);\r
- long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());\r
- if (copiedBytes != entry.getSize()) {\r
- System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");\r
- }\r
- } else {\r
- System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);\r
- }\r
- } finally {\r
- try {\r
- sa.close();\r
- lastItem = null;\r
- lastItemId = null;\r
- } catch (AccessorException e) {\r
- }\r
- }\r
- }\r
- }\r
-\r
- return true;\r
- }\r
-\r
- /**\r
- * Get request that clears history\r
- * @return cleaning request\r
- */\r
- public static WriteRequest clear(final Resource history) {\r
- return new WriteRequest() {\r
- @Override\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
- HistoryResource HIS = HistoryResource.getInstance(graph);\r
- Layer0 L0 = Layer0.getInstance(graph);\r
-\r
- // Separate items format\r
- for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))\r
- {\r
- if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
- Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);\r
- if (info!=null) {\r
- graph.deny(oldItem, HIS.History_Item_Info);\r
- graph.deny(info);\r
- }\r
- Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);\r
- if (data!=null) {\r
- graph.deny(oldItem, HIS.History_Item_Series);\r
- graph.deny(data);\r
- }\r
- graph.deny(history, L0.ConsistsOf, oldItem);\r
- graph.deny(oldItem);\r
- }\r
-\r
- // Archived format\r
- graph.denyValue(history, HIS.History_archive);\r
- }\r
- };\r
- }\r
-\r
-\r
- private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {\r
- TarArchiveEntry entry = new TarArchiveEntry(entryName);\r
- entry.setSize(size);\r
- out.putArchiveEntry(entry);\r
- }\r
-\r
- /**\r
- * Shall not close the underlying RandomAccessBinary.\r
- */\r
- private static class RABOutputStream extends OutputStream {\r
-\r
- private final RandomAccessBinary output;\r
-\r
- public RABOutputStream(RandomAccessBinary output) {\r
- this.output = output;\r
- }\r
-\r
- @Override\r
- public void write(byte[] b, int off, int len) throws IOException {\r
- output.write(b, off, len);\r
- }\r
-\r
- @Override\r
- public void write(int b) throws IOException {\r
- output.write(b);\r
- }\r
-\r
- }\r
-\r
- /**\r
- * Shall not close the underlying RandomAccessBinary.\r
- */\r
- private static class RABInputStream extends InputStream {\r
-\r
- private final RandomAccessBinary input;\r
-\r
- public RABInputStream(RandomAccessBinary input) {\r
- this.input = input;\r
- }\r
-\r
- @Override\r
- public int available() throws IOException {\r
- long avail = input.length() - input.position();\r
- if ((avail & 0xffffffff00000000L) != 0)\r
- throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);\r
- return (int) avail;\r
- }\r
-\r
- @Override\r
- public int read() throws IOException {\r
- try {\r
- return input.readUnsignedByte();\r
- } catch (EOFException e) {\r
- return -1;\r
- }\r
- }\r
-\r
- @Override\r
- public int read(byte[] b, int off, int len) throws IOException {\r
- long available = input.length() - input.position();\r
- if (available == 0)\r
- return -1;\r
- int l = (int) Math.min(available, (long) len);\r
- input.readFully(b, off, l);\r
- return l;\r
- }\r
-\r
- @Override\r
- public long skip(long n) throws IOException {\r
- long count = 0;\r
- while (count < n) {\r
- int l = (int) Math.min(n, (long) Integer.MAX_VALUE);\r
- count += input.skipBytes(l);\r
- }\r
- return count;\r
- }\r
-\r
- }\r
-\r
- private static long profile(Long t1, String string) {\r
- if (PROFILE) {\r
- long t2 = System.nanoTime();\r
- long delta = t1 == null ? 0 : t2-t1;\r
- System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));\r
- return t2;\r
- }\r
- return 0L;\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
+ * Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ * Semantum Oy - archive implementation, gitlab simantics/platform#227
+ *******************************************************************************/
+package org.simantics.simulation.history;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.accessor.StreamAccessor;
+import org.simantics.databoard.accessor.binary.BinaryObject;
+import org.simantics.databoard.accessor.error.AccessorException;
+import org.simantics.databoard.binding.ArrayBinding;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.RecordBinding;
+import org.simantics.databoard.binding.error.BindingConstructionException;
+import org.simantics.databoard.binding.error.BindingException;
+import org.simantics.databoard.binding.impl.ObjectArrayBinding;
+import org.simantics.databoard.binding.mutable.MutableVariant;
+import org.simantics.databoard.binding.mutable.Variant;
+import org.simantics.databoard.container.DataContainer;
+import org.simantics.databoard.container.DataContainers;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.util.Bean;
+import org.simantics.databoard.util.binary.BinaryFile;
+import org.simantics.databoard.util.binary.OutputStreamWriteable;
+import org.simantics.databoard.util.binary.RandomAccessBinary;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.common.request.UniqueRead;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.request.Read;
+import org.simantics.fastlz.FastLZ;
+import org.simantics.history.HistoryException;
+import org.simantics.history.HistoryManager;
+import org.simantics.history.ItemManager;
+import org.simantics.history.impl.CollectorImpl;
+import org.simantics.history.impl.CollectorState;
+import org.simantics.history.impl.CollectorState.VariableState;
+import org.simantics.history.util.Stream;
+import org.simantics.history.util.ValueBand;
+import org.simantics.history.util.WeightedMedian;
+import org.simantics.layer0.Layer0;
+import org.simantics.simulation.Activator;
+import org.simantics.simulation.ontology.HistoryResource;
+import org.simantics.simulation.ontology.SimulationResource;
+import org.simantics.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.TObjectLongMap;
+import gnu.trove.map.hash.TObjectLongHashMap;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
+
+/**
+ * @author Toni Kalajainen
+ * @author Tuukka Lehtonen
+ */
+public class HistoryUtil {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class);
+
+ private static final boolean DEBUG = false;
+ private static final boolean PROFILE = true;
+
+ public enum Compression {
+ FLZ,
+ LZ4,
+ NONE,
+ }
+
+ private static final Compression USED_COMPRESSION = Compression.FLZ;
+
+ private static final String APPLICATION_X_TAR = "application/x-tar";
+ private static final String APPLICATION_X_LZ4 = "application/x-lz4";
+ private static final String APPLICATION_X_FLZ = "application/x-flz";
+ private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
+
+ public static class ArchivedHistory {
+ public long totalSampleSize = 0L;
+ public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
+ }
+
+ /**
+ * Create request that exports history to graph.
+ *
+ * If removeOldUnused is true, items that do not exist in source history
+ * are deleted from graph.
+ *
+ * If item already exists in the graph, it is overwritten.
+ *
+ * @param history source history
+ * @param g write graph
+ * @param historyResource Instance of HIS.History to write data to
+ * @param removeOldUnused
+ * @param timeShift adjust time values by this value
+ * @param from time
+ * @param end time
+ * @return WriteRequest
+ */
+ public static WriteRequest export(final HistoryManager history,
+ final Resource historyResource,
+ final boolean removeOldUnused,
+ final Double timeShift,
+ final double from, final double end)
+ {
+ return new WriteRequest() {
+ public void perform(WriteGraph graph) throws DatabaseException {
+ HistoryResource HIS = HistoryResource.getInstance(graph);
+ Layer0 L0 = Layer0.getInstance(graph);
+ long totalSampleSize = 0L;
+ long s = System.nanoTime();
+
+ try {
+ Bean[] items = history.getItems();
+ if (PROFILE)
+ profile(null, "exporting " + items.length + " history items to database");
+
+ ItemManager im = new ItemManager( items );
+ Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
+ for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
+ {
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
+ Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
+ if ( bean == null ) continue;
+ String id = (String) bean.getField("id");
+
+ if (!im.exists(id)) {
+ // There is item in graph that does not exist in source history
+ if ( removeOldUnused ) {
+ graph.deny(oldItem);
+ }
+ } else {
+ // Item already exists, to-be-overwritten
+ oldResourceMap.put(id, oldItem);
+ }
+ }
+
+ for (Bean newItem : im.values())
+ {
+ //System.out.println("WRITING ITEM: " + newItem);
+ String id = (String) newItem.getField("id");
+ Resource historyItemResource = oldResourceMap.get(id);
+ Resource data = null;
+
+ if ( historyItemResource==null ) {
+ historyItemResource = graph.newResource();
+ graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
+ graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
+ graph.claimLiteral(historyItemResource, L0.HasName, id);
+ } else {
+ data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
+ }
+
+ Variant v = new Variant(newItem.getBinding(), newItem);
+ graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
+
+ if (data == null) {
+ data = graph.newResource();
+ graph.claim(data, L0.InstanceOf, L0.Variant);
+ graph.claim(historyItemResource, HIS.History_Item_Series, data);
+ } else {
+ graph.denyValue(data);
+ }
+
+
+ // Write stream
+ StreamAccessor sa = history.openStream(id, "r");
+ try {
+ //System.out.println("WRITING stream of size=" + sa.size());
+ RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
+ Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
+ Integer constantSampleSize = serializer.getConstantSize();
+ ValueBand vb = new ValueBand(sampleBinding);
+ Stream stream = new Stream(sa, sampleBinding);
+ ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
+ Object array = null;
+ //System.out.println("WRITING stream: sample size=" + constantSampleSize);
+
+ int count = 0;
+ readArray: {
+ // Start index
+ int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
+ if ( startIndex < -stream.count() ) break readArray;
+ if ( startIndex<0 ) startIndex = -2-startIndex;
+ if ( startIndex == -1 ) startIndex = 0;
+
+ // End index
+ int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
+ if ( endIndex == -1 ) break readArray;
+ if ( endIndex<0 ) endIndex = -1-endIndex;
+ if ( endIndex == sa.size() ) endIndex = sa.size()-1;
+ if ( endIndex<startIndex ) break readArray;
+
+ // Write sample count
+ count = endIndex - startIndex + 1;
+ array = arrayBinding.create(count);
+ boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
+ //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
+ for (int i=0; i<count; i++) {
+ Object sample = sa.get(i+startIndex, sampleBinding);
+
+ // Adjust time
+ if ( hasTimeShift ) {
+ vb.setSample(sample);
+ if ( vb.hasTime() ) {
+ double newTime = vb.getTimeDouble() + timeShift;
+ vb.setTime(Bindings.DOUBLE, newTime);
+ }
+ if ( vb.hasEndTime() ) {
+ double newTime = vb.getEndTimeDouble() + timeShift;
+ vb.setEndTime(Bindings.DOUBLE, newTime);
+ }
+ }
+ //System.out.println("\t#" + i + ": sample=" + sample);
+ arrayBinding.set(array, i, sample);
+ }
+ }
+ if (array==null) array = arrayBinding.create();
+ Variant v2 = new Variant(arrayBinding, array);
+ graph.claimValue(data, v2, Bindings.VARIANT);
+
+ if (constantSampleSize != null) {
+ long itemSampleSize = ((long) count) * ((long) constantSampleSize);
+ //System.out.println("item sample size: " + itemSampleSize);
+ totalSampleSize += itemSampleSize;
+ graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
+ }
+ } catch (AccessorException e) {
+ throw new DatabaseException(e);
+ } finally {
+ try {
+ sa.close();
+ } catch (AccessorException e) {
+ }
+ }
+ }
+
+ graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
+
+ if (PROFILE)
+ profile(s, "exported " + items.length + " history items to database");
+
+ } catch (HistoryException e) {
+ throw new DatabaseException( e );
+ } catch (BindingException e) {
+ throw new DatabaseException( e );
+ } catch (ServiceNotFoundException e) {
+ throw new DatabaseException( e );
+ }
+ }
+ };
+ }
+
+ /**
+ * Create request that exports history to graph.
+ *
+ * If item already exists in the graph, it is overwritten.
+ *
+ * @param history
+ * source history
+ * @param collectorState
+ * complete dump of the source history collector state or
+ * <code>null</code> to skip collector state saving
+ * @param historyResource
+ * Instance of HIS.History to write data to
+ * @param timeShift
+ * adjust time values by this value
+ * @param from
+ * time
+ * @param end
+ * time
+ * @return WriteRequest
+ */
+ public static WriteRequest exportArchive(
+ final HistoryManager history,
+ final Bean collectorState,
+ final Resource historyResource,
+ final Double timeShift,
+ final double from,
+ final double end)
+ {
+ return new WriteRequest() {
+ public void perform(WriteGraph graph) throws DatabaseException {
+ HistoryResource HIS = HistoryResource.getInstance(graph);
+ Layer0 L0 = Layer0.getInstance(graph);
+
+ long s = System.nanoTime();
+ if (PROFILE)
+ profile(null, "archiving history items to database");
+
+ // Remove all possibly existing old items in the history.
+ for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
+ if (graph.isInstanceOf(entity, HIS.History_Item))
+ graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
+
+ // Create new literal for history archival
+ //graph.deny(historyResource, HIS.History_archive);
+ graph.denyValue(historyResource, HIS.History_archive);
+ Resource archiveLiteral = graph.newResource();
+ graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
+ graph.claim(historyResource, HIS.History_archive, archiveLiteral);
+
+ OutputStream closeable = null;
+ try {
+ RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
+ rab.position(0);
+ rab.skipBytes(4);
+
+ ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
+
+ rab.position(0L);
+ rab.writeInt((int)(rab.length() - 4L));
+
+ graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
+
+ if (PROFILE)
+ profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
+
+ } catch (HistoryException e) {
+ throw new DatabaseException( e );
+ } catch (IOException e) {
+ throw new DatabaseException( e );
+ } finally {
+ FileUtils.uncheckedClose(closeable);
+ }
+ }
+ };
+ }
+
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ RandomAccessBinary rab)
+ throws IOException, HistoryException
+ {
+ return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
+ }
+
+ public static ArchivedHistory exportCompressedArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ Compression compression,
+ RandomAccessBinary rab)
+ throws IOException, HistoryException
+ {
+ switch (compression) {
+ case FLZ:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
+ case LZ4:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
+ case NONE:
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
+ return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
+ default:
+ throw new UnsupportedOperationException("Unsupported compression: " + compression);
+ }
+ }
+
+ private static ArchivedHistory exportArchive(
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end,
+ OutputStream out)
+ throws IOException, HistoryException
+ {
+ try (OutputStream os = out) {
+ ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
+ os.flush();
+ return archive;
+ }
+ }
+
+ /**
+ * Store the specified history manager's contents into a single archive
+ * file.
+ *
+ * @param outputStream
+ * the output stream where the history archive is written
+ * @param history
+ * source history
+ * @param collectorState
+ * complete state of the history collector at the time of saving
+ * or <code>null</code> to not save any state
+ * @param timeShift
+ * adjust time values by this value
+ * @param from
+ * time
+ * @param end
+ * time
+ * @return ArchivedHistory instance describing the created archive
+ * @throws HistoryException
+ * when there's a problem with reading the history data from the
+ * history work area
+ * @throws IOException
+ * when there's a problem writing the resulting history archive
+ * file
+ */
+ private static ArchivedHistory exportArchive(
+ OutputStream outputStream,
+ HistoryManager history,
+ Bean collectorState,
+ Double timeShift,
+ double from,
+ double end)
+ throws IOException, HistoryException
+ {
+ ArchivedHistory result = new ArchivedHistory();
+
+ Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
+ boolean hasTimeShift = timeShift != null && timeShift != 0.0;
+
+ TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
+ OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
+
+ try {
+ if (collectorState != null) {
+ // Write complete collector state into the archive
+ if (DEBUG)
+ System.out.println("WRITING collector state: " + collectorState);
+ byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
+ putNextEntry(out, "state", serializedCollectorState.length);
+ out.write(serializedCollectorState);
+ out.closeArchiveEntry();
+ }
+
+ ItemManager im = new ItemManager( history.getItems() );
+
+ for (Bean item : im.values()) {
+ if (DEBUG)
+ System.out.println("STORING ITEM: " + item);
+ String id = (String) item.getField("id");
+
+ // Write data stream metadata
+ byte[] metadata = beanSerializer.serialize( item );
+ putNextEntry(out, id + ".txt", metadata.length);
+ out.write(metadata);
+ out.closeArchiveEntry();
+ if (DEBUG)
+ System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
+
+
+ // Write data stream as a separate entry
+ StreamAccessor sa = history.openStream(id, "r");
+ try {
+ if (DEBUG)
+ System.out.println("STREAM SIZE=" + sa.size());
+ RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
+ Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
+ Integer constantSampleSize = sampleSerializer.getConstantSize();
+
+ if (constantSampleSize == null) {
+ // Variable length samples - no support for now.
+ System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
+ continue;
+ } else {
+ ValueBand vb = new ValueBand(sampleBinding);
+ Stream stream = new Stream(sa, sampleBinding);
+ if (DEBUG)
+ System.out.println("WRITING stream: sample size=" + constantSampleSize);
+
+ readArray: {
+ // Start index
+ int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
+ if ( startIndex < -stream.count() ) break readArray;
+ if ( startIndex<0 ) startIndex = -2-startIndex;
+ if ( startIndex == -1 ) startIndex = 0;
+
+ // End index
+ int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
+ if ( endIndex == -1 ) break readArray;
+ if ( endIndex<0 ) endIndex = -1-endIndex;
+ if ( endIndex == sa.size() ) endIndex = sa.size()-1;
+ if ( endIndex<startIndex ) break readArray;
+
+ // Written sample count
+ int count = endIndex - startIndex + 1;
+ if (DEBUG)
+ System.out.println("WRITTEN sample count=" + count);
+
+ long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
+ if (DEBUG)
+ System.out.println("saved samples size in bytes: " + savedSamplesSize);
+ putNextEntry(out, id + ".data", savedSamplesSize);
+
+ for (int i=0; i<count; i++) {
+ Object sample = sa.get(i+startIndex, sampleBinding);
+
+ // Adjust time
+ if ( hasTimeShift ) {
+ vb.setSample(sample);
+ if ( vb.hasTime() ) {
+ double newTime = vb.getTimeDouble() + timeShift;
+ vb.setTime(Bindings.DOUBLE, newTime);
+ }
+ if ( vb.hasEndTime() ) {
+ double newTime = vb.getEndTimeDouble() + timeShift;
+ vb.setEndTime(Bindings.DOUBLE, newTime);
+ }
+ }
+ //System.out.println("\t#" + i + ": sample=" + sample);
+ sampleSerializer.serialize(dataOutput, sample);
+ }
+ out.closeArchiveEntry();
+
+ result.itemSizes.put(item, savedSamplesSize);
+ result.totalSampleSize += savedSamplesSize;
+ }
+ }
+ } catch (AccessorException e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ sa.close();
+ } catch (AccessorException e) {
+ }
+ }
+
+ }
+
+ return result;
+ } catch (BindingException e) {
+ throw new HistoryException(e);
+ } finally {
+ out.finish();
+ }
+ }
+
+ /**
+ * Import all history items from graph into a history manager.
+ *
+ * @param r history resource or initial condition resource source HIS.History resource
+ * @param history destination history
+ * @return read request that always returns a HistoryImportResult instance
+ */
+ public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
+ {
+ return new UniqueRead<HistoryImportResult>() {
+ @Override
+ public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
+ HistoryImportResult result = new HistoryImportResult();
+
+ HistoryResource HIS = HistoryResource.getInstance(graph);
+ SimulationResource SIM = SimulationResource.getInstance(graph);
+ Resource historyResource = r;
+
+ if (!graph.isInstanceOf(historyResource, HIS.History))
+ historyResource = graph.getPossibleObject(r, SIM.State_History);
+ if (historyResource == null)
+ return result;
+ if (!graph.isInstanceOf(historyResource, HIS.History))
+ return result;
+
+ long s = System.nanoTime();
+
+ Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
+ if (archive == null) {
+ if (PROFILE)
+ profile(null, "importing history items from old database format to disk workarea");
+ importHistoryItems(graph, historyResource, history);
+ } else {
+ try {
+ if (PROFILE)
+ profile(null, "importing history items from archived format to disk workarea");
+ importHistoryArchive(graph, archive, history, result);
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ } catch (HistoryException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ if (PROFILE)
+ profile(s, "imported history items from database to disk workarea");
+
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Import all history items from graph into a history manager.
+ *
+ * @param r history resource or initial condition resource source HIS.History resource
+ * @param history destination history
+ * @return <code>true</code> if successful
+ */
+ private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
+ {
+ HistoryResource HIS = HistoryResource.getInstance(graph);
+ Layer0 L0 = Layer0.getInstance(graph);
+
+ try {
+ for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item))
+ continue;
+
+ Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
+ String id = (String) bean.getFieldUnchecked("id");
+ Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
+ Binding arrayBinding = Bindings.getBinding( array.getClass() );
+ history.modify(bean);
+ StreamAccessor sa = history.openStream(id, "rw");
+ try {
+ sa.setValue(arrayBinding, array);
+ } finally {
+ try {
+ sa.close();
+ } catch (AccessorException e) {
+ }
+ }
+ }
+
+ return true;
+ } catch (AccessorException e) {
+ throw new DatabaseException(e);
+ } catch (BindingConstructionException e) {
+ throw new DatabaseException(e);
+ } catch (HistoryException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ /**
+ * @param history
+ * @param path
+ * @return
+ * @throws HistoryException
+ * @throws IOException
+ */
+ public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
+ HistoryImportResult result = new HistoryImportResult();
+ try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
+ importHistoryArchive(history, rab, result);
+ return result;
+ } catch (IOException e) {
+ LOGGER.error("Failed to import history from archive {}", path);
+ throw e;
+ }
+ }
+
+ /**
+ * Import all history items from database archive into a history manager.
+ *
+ * @param graph
+ * @param historyResource
+ * @param archive
+ * @param history
+ * @return <code>true</code> if successful
+ * @throws DatabaseException
+ */
+ private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
+ RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
+ // This is here because the data is encoded as an L0.ByteArray
+ // in the database which means the first integer describes the
+ // length of the byte array. We skip that length data here.
+ rab.position(4);
+ return importHistoryArchive(history, rab, result);
+ }
+
+ /**
+ * Import all history items from database archive into a history manager.
+ *
+ * @param graph
+ * @param historyResource
+ * @param archive
+ * @param history
+ * @return <code>true</code> if successful
+ * @throws DatabaseException
+ */
+ private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
+ throws IOException, HistoryException
+ {
+ DataContainer dc = DataContainers.readHeader(rab);
+
+ if (DEBUG)
+ System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
+
+ if (dc.version == ARCHIVE_VERSION_1) {
+ if (APPLICATION_X_LZ4.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_FLZ.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
+ } else if (APPLICATION_X_TAR.equals(dc.format)) {
+ return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
+ }
+ throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
+ } else {
+ throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
+ }
+ }
+
+ private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
+ try (InputStream _in = in) {
+ return extractHistoryArchiveTar(history, in, result);
+ }
+ }
+
+ /**
+ * Import all history items from graph into a history manager
+ *
+ * @param graph database access
+ * @param history destination history
+ * @param rab the archive to extract from
+ * @return <code>true</code> if successful
+ * @throws DatabaseException
+ */
+ private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
+ throws IOException, HistoryException
+ {
+ // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
+ TarArchiveInputStream tar = new TarArchiveInputStream(in);
+
+ Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
+
+ Bean lastItem = null;
+ String lastItemId = null;
+ while (true) {
+ TarArchiveEntry entry = tar.getNextTarEntry();
+ if (entry == null)
+ break;
+
+ String name = entry.getName();
+ boolean state = name.equals("state");
+ boolean metadata = name.endsWith(".txt");
+ boolean data = name.endsWith(".data");
+
+ if (state) {
+ if (result != null) {
+ byte[] st = new byte[(int) entry.getSize()];
+ tar.read(st);
+ result.collectorState = (Bean) beanSerializer.deserialize(st);
+ if (DEBUG)
+ System.out.println("READ collector state: " + result.collectorState);
+ } else {
+ tar.skip(entry.getSize());
+ }
+ } else if (metadata) {
+ byte[] md = new byte[(int) entry.getSize()];
+ tar.read(md);
+ if (DEBUG) {
+ System.out.println("READING Item metadata: " + name);
+ System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
+ }
+
+ try {
+ Bean bean = (Bean) beanSerializer.deserialize(md);
+ if (DEBUG)
+ System.out.println("READ Item metadata: " + bean);
+ history.modify(bean);
+ lastItem = bean;
+ lastItemId = (String) bean.getFieldUnchecked("id");
+ } catch (ClassFormatError e) {
+ // This is here because LZ4BlockInput/OutputStream seemed to
+ // be causing weird deserialization errors to occur for
+ // single data items when trying to deserialize beans. The
+ // reason being that the serialized data that was read only
+ // contains the first 512 bytes of the data. After that all
+ // data is zeros, which causes things like this to happen in
+ // deserialization:
+ //signature: rsstzzzzzzzze_b7b532e9
+ //component(o): id = String
+ //component(1): variableId = String
+ //component(2): format = Temp1
+ //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
+ //component(3): = Boolean
+ //component(4): = Boolean
+ //component(5): = Boolean
+ //component(6): = Boolean
+ //component(7): = Boolean
+ //component(8): = Boolean
+ //component(9): = Boolean
+ //component(10): = Boolean
+ //
+ // For this reason we've switched the default compression to FastLZ
+ // for now. This is reflected in the input format also.
+
+ Activator.getDefault().getLog().log(
+ new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
+ }
+ } else if (data && lastItem != null) {
+ StreamAccessor sa = history.openStream(lastItemId, "rw");
+ try {
+ if (sa instanceof BinaryObject) {
+ BinaryObject bo = (BinaryObject) sa;
+ RandomAccessBinary output = bo.getBinary();
+ output.position(0L);
+ output.setLength(0L);
+ long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
+ if (copiedBytes != entry.getSize()) {
+ System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
+ }
+ } else {
+ System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
+ }
+ } finally {
+ try {
+ sa.close();
+ lastItem = null;
+ lastItemId = null;
+ } catch (AccessorException e) {
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Get request that clears history
+ * @return cleaning request
+ */
+ public static WriteRequest clear(final Resource history) {
+ return new WriteRequest() {
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ HistoryResource HIS = HistoryResource.getInstance(graph);
+ Layer0 L0 = Layer0.getInstance(graph);
+
+ // Separate items format
+ for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
+ {
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
+ Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
+ if (info!=null) {
+ graph.deny(oldItem, HIS.History_Item_Info);
+ graph.deny(info);
+ }
+ Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
+ if (data!=null) {
+ graph.deny(oldItem, HIS.History_Item_Series);
+ graph.deny(data);
+ }
+ graph.deny(history, L0.ConsistsOf, oldItem);
+ graph.deny(oldItem);
+ }
+
+ // Archived format
+ graph.denyValue(history, HIS.History_archive);
+ }
+ };
+ }
+
+
+ private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
+ TarArchiveEntry entry = new TarArchiveEntry(entryName);
+ entry.setSize(size);
+ out.putArchiveEntry(entry);
+ }
+
+ /**
+ * Shall not close the underlying RandomAccessBinary.
+ */
+ private static class RABOutputStream extends OutputStream {
+
+ private final RandomAccessBinary output;
+
+ public RABOutputStream(RandomAccessBinary output) {
+ this.output = output;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ output.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ output.write(b);
+ }
+
+ }
+
+ /**
+ * Shall not close the underlying RandomAccessBinary.
+ */
+ private static class RABInputStream extends InputStream {
+
+ private final RandomAccessBinary input;
+
+ public RABInputStream(RandomAccessBinary input) {
+ this.input = input;
+ }
+
+ @Override
+ public int available() throws IOException {
+ long avail = input.length() - input.position();
+ if ((avail & 0xffffffff00000000L) != 0)
+ throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
+ return (int) avail;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return input.readUnsignedByte();
+ } catch (EOFException e) {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ long available = input.length() - input.position();
+ if (available == 0)
+ return -1;
+ int l = (int) Math.min(available, (long) len);
+ input.readFully(b, off, l);
+ return l;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long count = 0;
+ while (count < n) {
+ int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
+ count += input.skipBytes(l);
+ }
+ return count;
+ }
+
+ }
+
+ private static long profile(Long t1, String string) {
+ if (PROFILE) {
+ long t2 = System.nanoTime();
+ long delta = t1 == null ? 0 : t2-t1;
+ System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
+ return t2;
+ }
+ return 0L;
+ }
+
+ public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
+ Double t = toBeforeTime;
+ Binding timeBinding = null;
+
+ Bean[] items = history.getItems();
+ //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
+
+ for (Bean item : items) {
+ String id = (String) item.getField("id");
+ StreamAccessor sa = history.openStream(id, "rw");
+ try {
+ Stream s = new Stream(sa);
+ timeBinding = s.timeBinding;
+ int currentSize = sa.size();
+ int index = s.binarySearch(timeBinding, toBeforeTime);
+ int newSize = truncationSize(index);
+ if (newSize < currentSize) {
+ //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
+ sa.setSize(newSize);
+
+ if (state != null) {
+ Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
+ Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
+ Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
+ boolean isNan = isNaN(prevValue);
+
+ VariableState vs = state.values.get(id);
+ if (vs != null && vs.value != null && prevValue != null) {
+ vs.value.setValue(vs.value.getBinding(), prevValue);
+ vs.isValid = true;
+ vs.isNan = isNan;
+ }
+
+ CollectorState.Item is = state.itemStates.get(id);
+ if (is != null) {
+ is.firstTime = toTime(prevTime);
+ is.firstValue = toValue(s.valueBinding, prevValue);
+ is.currentTime = is.firstTime;
+ is.currentValue = toValue(s.valueBinding, prevValue);
+ is.isNaN = isNan;
+ is.isValid = prevValue != null;
+ is.sum = Double.NaN;
+ is.count = 0;
+ is.ooDeadband = false;
+ is.firstDisabledTime = Double.NaN;
+ is.lastDisabledTime = Double.NaN;
+ is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
+ }
+ }
+ }
+ } finally {
+ sa.close();
+ }
+ }
+
+ if (timeBinding != null && state != null) {
+ state.time.setValue(timeBinding, t);
+ state.dT = 1.0;
+ }
+
+ return state;
+ }
+
+ private static double toTime(Object time) {
+ return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
+ }
+
+ private static MutableVariant toValue(Binding valueBinding, Object value) {
+ return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
+ }
+
+ private static boolean isNaN(Object value) {
+ return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
+ }
+
+ private static int truncationSize(int binarySearchResult) {
+ return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
+ }
+
+}