--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2014 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ * Semantum Oy - archive implementation\r
+ *******************************************************************************/\r
+package org.simantics.simulation.history;\r
+\r
+import gnu.trove.map.TObjectLongMap;\r
+import gnu.trove.map.hash.TObjectLongHashMap;\r
+\r
+import java.io.EOFException;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.io.OutputStream;\r
+import java.util.Arrays;\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+\r
+import net.jpountz.lz4.LZ4BlockInputStream;\r
+import net.jpountz.lz4.LZ4BlockOutputStream;\r
+\r
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;\r
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;\r
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.Status;\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.accessor.StreamAccessor;\r
+import org.simantics.databoard.accessor.binary.BinaryObject;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.binding.ArrayBinding;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.RecordBinding;\r
+import org.simantics.databoard.binding.error.BindingConstructionException;\r
+import org.simantics.databoard.binding.error.BindingException;\r
+import org.simantics.databoard.binding.impl.ObjectArrayBinding;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.container.DataContainer;\r
+import org.simantics.databoard.container.DataContainers;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.util.Bean;\r
+import org.simantics.databoard.util.binary.OutputStreamWriteable;\r
+import org.simantics.databoard.util.binary.RandomAccessBinary;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.WriteGraph;\r
+import org.simantics.db.common.request.UniqueRead;\r
+import org.simantics.db.common.request.WriteRequest;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.ServiceNotFoundException;\r
+import org.simantics.db.request.Read;\r
+import org.simantics.fastlz.FastLZ;\r
+import org.simantics.history.HistoryException;\r
+import org.simantics.history.HistoryManager;\r
+import org.simantics.history.ItemManager;\r
+import org.simantics.history.util.Stream;\r
+import org.simantics.history.util.ValueBand;\r
+import org.simantics.layer0.Layer0;\r
+import org.simantics.simulation.Activator;\r
+import org.simantics.simulation.ontology.HistoryResource;\r
+import org.simantics.simulation.ontology.SimulationResource;\r
+import org.simantics.utils.FileUtils;\r
+\r
+/**\r
+ * @author Toni Kalajainen\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public class HistoryUtil {\r
+\r
+ private static final boolean DEBUG = false;\r
+ private static final boolean PROFILE = true;\r
+\r
+ private enum Compression {\r
+ FLZ,\r
+ LZ4,\r
+ }\r
+\r
+ private static final Compression USED_COMPRESSION = Compression.FLZ;\r
+\r
+ private static final String APPLICATION_X_LZ4 = "application/x-lz4";\r
+ private static final String APPLICATION_X_FLZ = "application/x-flz";\r
+ private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream\r
+\r
+ private static class ArchivedHistory {\r
+ public long totalSampleSize = 0L;\r
+ public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();\r
+ }\r
+\r
+ /**\r
+ * Create request that exports history to graph.\r
+ * \r
+ * If removeOldUnused is true, items that do not exist in source history\r
+ * are deleted from graph.\r
+ * \r
+ * If item already exists in the graph, it is overwritten.\r
+ * \r
+ * @param history source history\r
+ * @param g write graph\r
+ * @param historyResource Instance of HIS.History to write data to\r
+ * @param removeOldUnused\r
+ * @param timeShift adjust time values by this value \r
+ * @param from time\r
+ * @param end time\r
+ * @return WriteRequest\r
+ */\r
+ public static WriteRequest export(final HistoryManager history, \r
+ final Resource historyResource,\r
+ final boolean removeOldUnused,\r
+ final Double timeShift, \r
+ final double from, final double end) \r
+ {\r
+ return new WriteRequest() {\r
+ public void perform(WriteGraph graph) throws DatabaseException {\r
+ HistoryResource HIS = HistoryResource.getInstance(graph);\r
+ Layer0 L0 = Layer0.getInstance(graph);\r
+ long totalSampleSize = 0L;\r
+ long s = System.nanoTime();\r
+\r
+ try {\r
+ Bean[] items = history.getItems();\r
+ if (PROFILE)\r
+ profile(null, "exporting " + items.length + " history items to database");\r
+\r
+ ItemManager im = new ItemManager( items );\r
+ Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();\r
+ for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))\r
+ {\r
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
+ Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
+ if ( bean == null ) continue;\r
+ String id = (String) bean.getField("id");\r
+ \r
+ if (!im.exists(id)) {\r
+ // There is item in graph that does not exist in source history\r
+ if ( removeOldUnused ) {\r
+ graph.deny(oldItem);\r
+ }\r
+ } else {\r
+ // Item already exists, to-be-overwritten\r
+ oldResourceMap.put(id, oldItem);\r
+ }\r
+ }\r
+ \r
+ for (Bean newItem : im.values())\r
+ {\r
+ //System.out.println("WRITING ITEM: " + newItem);\r
+ String id = (String) newItem.getField("id");\r
+ Resource historyItemResource = oldResourceMap.get(id);\r
+ Resource data = null;\r
+\r
+ if ( historyItemResource==null ) {\r
+ historyItemResource = graph.newResource();\r
+ graph.claim(historyResource, L0.ConsistsOf, historyItemResource);\r
+ graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);\r
+ graph.claimLiteral(historyItemResource, L0.HasName, id);\r
+ } else {\r
+ data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);\r
+ }\r
+ \r
+ Variant v = new Variant(newItem.getBinding(), newItem);\r
+ graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);\r
+ \r
+ if (data == null) {\r
+ data = graph.newResource();\r
+ graph.claim(data, L0.InstanceOf, L0.Variant);\r
+ graph.claim(historyItemResource, HIS.History_Item_Series, data);\r
+ } else {\r
+ graph.denyValue(data);\r
+ }\r
+ \r
+ \r
+ // Write stream\r
+ StreamAccessor sa = history.openStream(id, "r");\r
+ try {\r
+ //System.out.println("WRITING stream of size=" + sa.size());\r
+ RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
+ Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);\r
+ Integer constantSampleSize = serializer.getConstantSize();\r
+ ValueBand vb = new ValueBand(sampleBinding);\r
+ Stream stream = new Stream(sa, sampleBinding);\r
+ ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);\r
+ Object array = null;\r
+ //System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
+\r
+ int count = 0;\r
+ readArray: {\r
+ // Start index\r
+ int startIndex = stream.binarySearch(Bindings.DOUBLE, from); \r
+ if ( startIndex < -stream.count() ) break readArray;\r
+ if ( startIndex<0 ) startIndex = -2-startIndex;\r
+ if ( startIndex == -1 ) startIndex = 0;\r
+ \r
+ // End index\r
+ int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
+ if ( endIndex == -1 ) break readArray;\r
+ if ( endIndex<0 ) endIndex = -1-endIndex;\r
+ if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
+ if ( endIndex<startIndex ) break readArray;\r
+ \r
+ // Write sample count\r
+ count = endIndex - startIndex + 1;\r
+ array = arrayBinding.create(count);\r
+ boolean hasTimeShift = timeShift!=null && timeShift != 0.0;\r
+ //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());\r
+ for (int i=0; i<count; i++) {\r
+ Object sample = sa.get(i+startIndex, sampleBinding);\r
+ \r
+ // Adjust time\r
+ if ( hasTimeShift ) {\r
+ vb.setSample(sample);\r
+ if ( vb.hasTime() ) {\r
+ double newTime = vb.getTimeDouble() + timeShift;\r
+ vb.setTime(Bindings.DOUBLE, newTime);\r
+ }\r
+ if ( vb.hasEndTime() ) {\r
+ double newTime = vb.getEndTimeDouble() + timeShift;\r
+ vb.setEndTime(Bindings.DOUBLE, newTime);\r
+ }\r
+ }\r
+ //System.out.println("\t#" + i + ": sample=" + sample);\r
+ arrayBinding.set(array, i, sample);\r
+ }\r
+ }\r
+ if (array==null) array = arrayBinding.create();\r
+ Variant v2 = new Variant(arrayBinding, array);\r
+ graph.claimValue(data, v2, Bindings.VARIANT);\r
+\r
+ if (constantSampleSize != null) {\r
+ long itemSampleSize = ((long) count) * ((long) constantSampleSize);\r
+ //System.out.println("item sample size: " + itemSampleSize);\r
+ totalSampleSize += itemSampleSize;\r
+ graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);\r
+ }\r
+ } catch (AccessorException e) {\r
+ throw new DatabaseException(e);\r
+ } finally {\r
+ try {\r
+ sa.close();\r
+ } catch (AccessorException e) {\r
+ }\r
+ }\r
+ }\r
+\r
+ graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);\r
+\r
+ if (PROFILE)\r
+ profile(s, "exported " + items.length + " history items to database");\r
+\r
+ } catch (HistoryException e) {\r
+ throw new DatabaseException( e );\r
+ } catch (BindingException e) {\r
+ throw new DatabaseException( e );\r
+ } catch (ServiceNotFoundException e) {\r
+ throw new DatabaseException( e );\r
+ }\r
+ }\r
+ };\r
+ }\r
+\r
+ /**\r
+ * Create request that exports history to graph.\r
+ * \r
+ * If item already exists in the graph, it is overwritten.\r
+ * \r
+ * @param history\r
+ * source history\r
+ * @param collectorState\r
+ * complete dump of the source history collector state or\r
+ * <code>null</code> to skip collector state saving\r
+ * @param historyResource\r
+ * Instance of HIS.History to write data to\r
+ * @param timeShift\r
+ * adjust time values by this value\r
+ * @param from\r
+ * time\r
+ * @param end\r
+ * time\r
+ * @return WriteRequest\r
+ */\r
+ public static WriteRequest exportArchive(\r
+ final HistoryManager history,\r
+ final Bean collectorState,\r
+ final Resource historyResource,\r
+ final Double timeShift, \r
+ final double from,\r
+ final double end) \r
+ {\r
+ return new WriteRequest() {\r
+ public void perform(WriteGraph graph) throws DatabaseException {\r
+ HistoryResource HIS = HistoryResource.getInstance(graph);\r
+ Layer0 L0 = Layer0.getInstance(graph);\r
+\r
+ long s = System.nanoTime();\r
+ if (PROFILE)\r
+ profile(null, "archiving history items to database");\r
+\r
+ // Remove all possibly existing old items in the history.\r
+ for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))\r
+ if (graph.isInstanceOf(entity, HIS.History_Item))\r
+ graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);\r
+\r
+ // Create new literal for history archival\r
+ //graph.deny(historyResource, HIS.History_archive);\r
+ graph.denyValue(historyResource, HIS.History_archive);\r
+ Resource archiveLiteral = graph.newResource();\r
+ graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);\r
+ graph.claim(historyResource, HIS.History_archive, archiveLiteral);\r
+\r
+ OutputStream closeable = null;\r
+ try {\r
+ RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);\r
+ rab.position(0);\r
+ rab.skipBytes(4);\r
+\r
+ ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);\r
+\r
+ rab.position(0L);\r
+ rab.writeInt((int)(rab.length() - 4L));\r
+\r
+ graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);\r
+\r
+ if (PROFILE)\r
+ profile(s, "archived history items of size " + archive.totalSampleSize + " to database");\r
+\r
+ } catch (HistoryException e) {\r
+ throw new DatabaseException( e );\r
+ } catch (IOException e) {\r
+ throw new DatabaseException( e );\r
+ } finally {\r
+ FileUtils.uncheckedClose(closeable);\r
+ }\r
+ }\r
+ };\r
+ }\r
+\r
+ private static ArchivedHistory exportCompressedArchive(\r
+ final HistoryManager history,\r
+ final Bean collectorState,\r
+ final Resource historyResource,\r
+ final Double timeShift, \r
+ final double from,\r
+ final double end,\r
+ RandomAccessBinary rab)\r
+ throws IOException, HistoryException\r
+ {\r
+ switch (USED_COMPRESSION) {\r
+ case FLZ:\r
+ return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);\r
+ case LZ4:\r
+ return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);\r
+ default:\r
+ throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);\r
+ }\r
+ }\r
+\r
+ private static ArchivedHistory exportArchiveLZ4(\r
+ final HistoryManager history,\r
+ final Bean collectorState,\r
+ final Resource historyResource,\r
+ final Double timeShift, \r
+ final double from,\r
+ final double end,\r
+ RandomAccessBinary rab)\r
+ throws IOException, HistoryException\r
+ {\r
+ OutputStream closeable = null;\r
+ try {\r
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));\r
+ closeable = new RABOutputStream( rab );\r
+ closeable = new LZ4BlockOutputStream( closeable );\r
+\r
+ ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
+\r
+ closeable.flush();\r
+ closeable.close();\r
+ closeable = null;\r
+\r
+ return archive;\r
+ } finally {\r
+ FileUtils.uncheckedClose(closeable);\r
+ }\r
+ }\r
+\r
+ @SuppressWarnings("resource")\r
+ private static ArchivedHistory exportArchiveFLZ(\r
+ final HistoryManager history,\r
+ final Bean collectorState,\r
+ final Resource historyResource,\r
+ final Double timeShift, \r
+ final double from,\r
+ final double end,\r
+ RandomAccessBinary rab)\r
+ throws IOException, HistoryException\r
+ {\r
+ OutputStream closeable = null;\r
+ try {\r
+ DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));\r
+ closeable = new RABOutputStream( rab );\r
+ closeable = FastLZ.write( closeable );\r
+\r
+ ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
+\r
+ closeable.flush();\r
+ closeable.close();\r
+ closeable = null;\r
+\r
+ return archive;\r
+ } finally {\r
+ FileUtils.uncheckedClose(closeable);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Store the specified history manager's contents into a single archive\r
+ * file.\r
+ * \r
+ * @param outputStream\r
+ * the output stream where the history archive is written\r
+ * @param history\r
+ * source history\r
+ * @param collectorState\r
+ * complete state of the history collector at the time of saving\r
+ * or <code>null</code> to not save any state\r
+ * @param timeShift\r
+ * adjust time values by this value\r
+ * @param from\r
+ * time\r
+ * @param end\r
+ * time\r
+ * @return ArchivedHistory instance describing the created archive\r
+ * @throws HistoryException\r
+ * when there's a problem with reading the history data from the\r
+ * history work area\r
+ * @throws IOException\r
+ * when there's a problem writing the resulting history archive\r
+ * file\r
+ */\r
+ private static ArchivedHistory exportArchive(\r
+ OutputStream outputStream,\r
+ HistoryManager history,\r
+ Bean collectorState,\r
+ Double timeShift,\r
+ double from,\r
+ double end)\r
+ throws IOException, HistoryException\r
+ {\r
+ ArchivedHistory result = new ArchivedHistory();\r
+\r
+ Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
+ boolean hasTimeShift = timeShift != null && timeShift != 0.0;\r
+\r
+ TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);\r
+ OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);\r
+\r
+ try {\r
+ if (collectorState != null) {\r
+ // Write complete collector state into the archive\r
+ if (DEBUG)\r
+ System.out.println("WRITING collector state: " + collectorState);\r
+ byte[] serializedCollectorState = beanSerializer.serialize( collectorState );\r
+ putNextEntry(out, "state", serializedCollectorState.length);\r
+ out.write(serializedCollectorState);\r
+ out.closeArchiveEntry();\r
+ }\r
+\r
+ ItemManager im = new ItemManager( history.getItems() );\r
+\r
+ for (Bean item : im.values()) {\r
+ if (DEBUG)\r
+ System.out.println("STORING ITEM: " + item);\r
+ String id = (String) item.getField("id");\r
+\r
+ // Write data stream metadata\r
+ byte[] metadata = beanSerializer.serialize( item );\r
+ putNextEntry(out, id + ".txt", metadata.length);\r
+ out.write(metadata);\r
+ out.closeArchiveEntry();\r
+ if (DEBUG)\r
+ System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));\r
+\r
+\r
+ // Write data stream as a separate entry\r
+ StreamAccessor sa = history.openStream(id, "r");\r
+ try {\r
+ if (DEBUG)\r
+ System.out.println("STREAM SIZE=" + sa.size());\r
+ RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
+ Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);\r
+ Integer constantSampleSize = sampleSerializer.getConstantSize();\r
+\r
+ if (constantSampleSize == null) {\r
+ // Variable length samples - no support for now.\r
+ System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");\r
+ continue;\r
+ } else {\r
+ ValueBand vb = new ValueBand(sampleBinding);\r
+ Stream stream = new Stream(sa, sampleBinding);\r
+ if (DEBUG)\r
+ System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
+\r
+ readArray: {\r
+ // Start index\r
+ int startIndex = stream.binarySearch(Bindings.DOUBLE, from);\r
+ if ( startIndex < -stream.count() ) break readArray;\r
+ if ( startIndex<0 ) startIndex = -2-startIndex;\r
+ if ( startIndex == -1 ) startIndex = 0;\r
+\r
+ // End index\r
+ int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
+ if ( endIndex == -1 ) break readArray;\r
+ if ( endIndex<0 ) endIndex = -1-endIndex;\r
+ if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
+ if ( endIndex<startIndex ) break readArray;\r
+\r
+ // Written sample count\r
+ int count = endIndex - startIndex + 1;\r
+ if (DEBUG)\r
+ System.out.println("WRITTEN sample count=" + count);\r
+\r
+ long savedSamplesSize = ((long) count) * ((long) constantSampleSize);\r
+ if (DEBUG)\r
+ System.out.println("saved samples size in bytes: " + savedSamplesSize);\r
+ putNextEntry(out, id + ".data", savedSamplesSize);\r
+\r
+ for (int i=0; i<count; i++) {\r
+ Object sample = sa.get(i+startIndex, sampleBinding);\r
+\r
+ // Adjust time\r
+ if ( hasTimeShift ) {\r
+ vb.setSample(sample);\r
+ if ( vb.hasTime() ) {\r
+ double newTime = vb.getTimeDouble() + timeShift;\r
+ vb.setTime(Bindings.DOUBLE, newTime);\r
+ }\r
+ if ( vb.hasEndTime() ) {\r
+ double newTime = vb.getEndTimeDouble() + timeShift;\r
+ vb.setEndTime(Bindings.DOUBLE, newTime);\r
+ }\r
+ }\r
+ //System.out.println("\t#" + i + ": sample=" + sample);\r
+ sampleSerializer.serialize(dataOutput, sample);\r
+ }\r
+ out.closeArchiveEntry();\r
+\r
+ result.itemSizes.put(item, savedSamplesSize);\r
+ result.totalSampleSize += savedSamplesSize;\r
+ }\r
+ }\r
+ } catch (AccessorException e) {\r
+ throw new IOException(e);\r
+ } finally {\r
+ try {\r
+ sa.close();\r
+ } catch (AccessorException e) {\r
+ }\r
+ }\r
+ \r
+ }\r
+\r
+ return result;\r
+ } catch (BindingException e) {\r
+ throw new HistoryException(e);\r
+ } finally {\r
+ out.finish();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Import all history items from graph into a history manager.\r
+ * \r
+ * @param r history resource or initial condition resource source HIS.History resource\r
+ * @param history destination history\r
+ * @return read request that always returns a HistoryImportResult instance\r
+ */\r
+ public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)\r
+ {\r
+ return new UniqueRead<HistoryImportResult>() {\r
+ @Override\r
+ public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {\r
+ HistoryImportResult result = new HistoryImportResult();\r
+\r
+ HistoryResource HIS = HistoryResource.getInstance(graph);\r
+ SimulationResource SIM = SimulationResource.getInstance(graph);\r
+ Resource historyResource = r;\r
+\r
+ if (!graph.isInstanceOf(historyResource, HIS.History))\r
+ historyResource = graph.getPossibleObject(r, SIM.State_History);\r
+ if (historyResource == null)\r
+ return result;\r
+ if (!graph.isInstanceOf(historyResource, HIS.History))\r
+ return result;\r
+\r
+ long s = System.nanoTime();\r
+\r
+ Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);\r
+ if (archive == null) {\r
+ if (PROFILE)\r
+ profile(null, "importing history items from old database format to disk workarea");\r
+ importHistoryItems(graph, historyResource, history);\r
+ } else {\r
+ try {\r
+ if (PROFILE)\r
+ profile(null, "importing history items from archived format to disk workarea");\r
+ importHistoryArchive(graph, historyResource, archive, history, result);\r
+ } catch (IOException e) {\r
+ throw new DatabaseException(e);\r
+ } catch (HistoryException e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+\r
+ if (PROFILE)\r
+ profile(s, "imported history items from database to disk workarea");\r
+\r
+ return result;\r
+ }\r
+ };\r
+ }\r
+\r
+ /**\r
+ * Import all history items from graph into a history manager.\r
+ * \r
+ * @param r history resource or initial condition resource source HIS.History resource\r
+ * @param history destination history\r
+ * @return <code>true</code> if successful\r
+ */\r
+ private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException\r
+ {\r
+ HistoryResource HIS = HistoryResource.getInstance(graph);\r
+ Layer0 L0 = Layer0.getInstance(graph);\r
+\r
+ try {\r
+ for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {\r
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item))\r
+ continue;\r
+\r
+ Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
+ String id = (String) bean.getFieldUnchecked("id");\r
+ Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);\r
+ Binding arrayBinding = Bindings.getBinding( array.getClass() );\r
+ history.modify(bean);\r
+ StreamAccessor sa = history.openStream(id, "rw");\r
+ try {\r
+ sa.setValue(arrayBinding, array);\r
+ } finally {\r
+ try {\r
+ sa.close();\r
+ } catch (AccessorException e) {\r
+ }\r
+ }\r
+ }\r
+\r
+ return true;\r
+ } catch (AccessorException e) {\r
+ throw new DatabaseException(e);\r
+ } catch (BindingConstructionException e) {\r
+ throw new DatabaseException(e);\r
+ } catch (HistoryException e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Import all history items from database archive into a history manager.\r
+ * \r
+ * @param graph\r
+ * @param historyResource\r
+ * @param archive \r
+ * @param history\r
+ * @return <code>true</code> if successful\r
+ * @throws DatabaseException \r
+ */\r
+ @SuppressWarnings("resource")\r
+ private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)\r
+ throws DatabaseException, IOException, HistoryException\r
+ {\r
+ RandomAccessBinary rab = graph.getRandomAccessBinary(archive);\r
+ rab.position(4);\r
+ DataContainer dc = DataContainers.readHeader(rab);\r
+\r
+ if (DEBUG)\r
+ System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));\r
+\r
+ if (APPLICATION_X_LZ4.equals(dc.format)) {\r
+ if (dc.version == ARCHIVE_VERSION_1) {\r
+ InputStream in = new RABInputStream(rab);\r
+ //in = new BufferedInputStream(in);\r
+ in = new LZ4BlockInputStream(in);\r
+ return extractHistoryArchiveTar(graph, history, in, result);\r
+ }\r
+ } else if (APPLICATION_X_FLZ.equals(dc.format)) {\r
+ if (dc.version == ARCHIVE_VERSION_1) {\r
+ InputStream in = null;\r
+ try {\r
+ in = new RABInputStream(rab);\r
+ in = FastLZ.read(in);\r
+ return extractHistoryArchiveTar(graph, history, in, result);\r
+ } finally {\r
+ FileUtils.uncheckedClose(in);\r
+ }\r
+ }\r
+ }\r
+ throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);\r
+ }\r
+\r
+ /**\r
+ * Import all history items from graph into a history manager\r
+ * \r
+ * @param graph database access\r
+ * @param history destination history\r
+ * @param rab the archive to extract from\r
+ * @return <code>true</code> if successful\r
+ * @throws DatabaseException \r
+ */\r
+ private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)\r
+ throws DatabaseException, IOException, HistoryException\r
+ {\r
+ // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.\r
+ TarArchiveInputStream tar = new TarArchiveInputStream(in);\r
+\r
+ Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
+\r
+ Bean lastItem = null;\r
+ String lastItemId = null;\r
+ while (true) {\r
+ TarArchiveEntry entry = tar.getNextTarEntry();\r
+ if (entry == null)\r
+ break;\r
+\r
+ String name = entry.getName();\r
+ boolean state = name.equals("state");\r
+ boolean metadata = name.endsWith(".txt");\r
+ boolean data = name.endsWith(".data");\r
+\r
+ if (state) {\r
+ if (result != null) {\r
+ byte[] st = new byte[(int) entry.getSize()];\r
+ tar.read(st);\r
+ result.collectorState = (Bean) beanSerializer.deserialize(st);\r
+ if (DEBUG)\r
+ System.out.println("READ collector state: " + result.collectorState);\r
+ } else {\r
+ tar.skip(entry.getSize());\r
+ }\r
+ } else if (metadata) {\r
+ byte[] md = new byte[(int) entry.getSize()];\r
+ tar.read(md);\r
+ if (DEBUG) {\r
+ System.out.println("READING Item metadata: " + name);\r
+ System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));\r
+ }\r
+\r
+ try {\r
+ Bean bean = (Bean) beanSerializer.deserialize(md);\r
+ if (DEBUG)\r
+ System.out.println("READ Item metadata: " + bean);\r
+ history.modify(bean);\r
+ lastItem = bean;\r
+ lastItemId = (String) bean.getFieldUnchecked("id");\r
+ } catch (ClassFormatError e) {\r
+ // This is here because LZ4BlockInput/OutputStream seemed to\r
+ // be causing weird deserialization errors to occur for\r
+ // single data items when trying to deserialize beans. The\r
+ // reason being that the serialized data that was read only\r
+ // contains the first 512 bytes of the data. After that all\r
+ // data is zeros, which causes things like this to happen in\r
+ // deserialization:\r
+ //signature: rsstzzzzzzzze_b7b532e9\r
+ //component(o): id = String\r
+ //component(1): variableId = String\r
+ //component(2): format = Temp1\r
+ //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])\r
+ //component(3): = Boolean\r
+ //component(4): = Boolean\r
+ //component(5): = Boolean\r
+ //component(devil): = Boolean\r
+ //component(7): = Boolean\r
+ //component(music): = Boolean\r
+ //component(9): = Boolean\r
+ //component(10): = Boolean\r
+ //\r
+ // For this reason we've switched the default compression to FastLZ\r
+ // for now. This is reflected in the input format also.\r
+\r
+ Activator.getDefault().getLog().log(\r
+ new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;\r
+ }\r
+ } else if (data && lastItem != null) {\r
+ StreamAccessor sa = history.openStream(lastItemId, "rw");\r
+ try {\r
+ if (sa instanceof BinaryObject) {\r
+ BinaryObject bo = (BinaryObject) sa;\r
+ RandomAccessBinary output = bo.getBinary();\r
+ output.position(0L);\r
+ output.setLength(0L);\r
+ long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());\r
+ if (copiedBytes != entry.getSize()) {\r
+ System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");\r
+ }\r
+ } else {\r
+ System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);\r
+ }\r
+ } finally {\r
+ try {\r
+ sa.close();\r
+ lastItem = null;\r
+ lastItemId = null;\r
+ } catch (AccessorException e) {\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ return true;\r
+ }\r
+\r
+ /**\r
+ * Get request that clears history\r
+ * @return cleaning request\r
+ */\r
+ public static WriteRequest clear(final Resource history) {\r
+ return new WriteRequest() {\r
+ @Override\r
+ public void perform(WriteGraph graph) throws DatabaseException {\r
+ HistoryResource HIS = HistoryResource.getInstance(graph);\r
+ Layer0 L0 = Layer0.getInstance(graph);\r
+\r
+ // Separate items format\r
+ for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))\r
+ {\r
+ if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
+ Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);\r
+ if (info!=null) {\r
+ graph.deny(oldItem, HIS.History_Item_Info);\r
+ graph.deny(info);\r
+ }\r
+ Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);\r
+ if (data!=null) {\r
+ graph.deny(oldItem, HIS.History_Item_Series);\r
+ graph.deny(data);\r
+ }\r
+ graph.deny(history, L0.ConsistsOf, oldItem);\r
+ graph.deny(oldItem);\r
+ }\r
+\r
+ // Archived format\r
+ graph.denyValue(history, HIS.History_archive);\r
+ }\r
+ };\r
+ }\r
+\r
+\r
+ private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {\r
+ TarArchiveEntry entry = new TarArchiveEntry(entryName);\r
+ entry.setSize(size);\r
+ out.putArchiveEntry(entry);\r
+ }\r
+\r
+ /**\r
+ * Shall not close the underlying RandomAccessBinary.\r
+ */\r
+ private static class RABOutputStream extends OutputStream {\r
+\r
+ private final RandomAccessBinary output;\r
+\r
+ public RABOutputStream(RandomAccessBinary output) {\r
+ this.output = output;\r
+ }\r
+\r
+ @Override\r
+ public void write(byte[] b, int off, int len) throws IOException {\r
+ output.write(b, off, len);\r
+ }\r
+\r
+ @Override\r
+ public void write(int b) throws IOException {\r
+ output.write(b);\r
+ }\r
+\r
+ }\r
+\r
+ /**\r
+ * Shall not close the underlying RandomAccessBinary.\r
+ */\r
+ private static class RABInputStream extends InputStream {\r
+\r
+ private final RandomAccessBinary input;\r
+\r
+ public RABInputStream(RandomAccessBinary input) {\r
+ this.input = input;\r
+ }\r
+\r
+ @Override\r
+ public int available() throws IOException {\r
+ long avail = input.length() - input.position();\r
+ if ((avail & 0xffffffff00000000L) != 0)\r
+ throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);\r
+ return (int) avail;\r
+ }\r
+\r
+ @Override\r
+ public int read() throws IOException {\r
+ try {\r
+ return input.readUnsignedByte();\r
+ } catch (EOFException e) {\r
+ return -1;\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public int read(byte[] b, int off, int len) throws IOException {\r
+ long available = input.length() - input.position();\r
+ if (available == 0)\r
+ return -1;\r
+ int l = (int) Math.min(available, (long) len);\r
+ input.readFully(b, off, l);\r
+ return l;\r
+ }\r
+\r
+ @Override\r
+ public long skip(long n) throws IOException {\r
+ long count = 0;\r
+ while (count < n) {\r
+ int l = (int) Math.min(n, (long) Integer.MAX_VALUE);\r
+ count += input.skipBytes(l);\r
+ }\r
+ return count;\r
+ }\r
+\r
+ }\r
+\r
+ private static long profile(Long t1, String string) {\r
+ if (PROFILE) {\r
+ long t2 = System.nanoTime();\r
+ long delta = t1 == null ? 0 : t2-t1;\r
+ System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));\r
+ return t2;\r
+ }\r
+ return 0L;\r
+ }\r
+\r
+}\r