1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 * Semantum Oy - archive implementation, gitlab simantics/platform#227
12 *******************************************************************************/
13 package org.simantics.simulation.history;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.nio.file.Path;
20 import java.util.Arrays;
21 import java.util.HashMap;
24 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
25 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
26 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
27 import org.eclipse.core.runtime.IStatus;
28 import org.eclipse.core.runtime.Status;
29 import org.simantics.databoard.Bindings;
30 import org.simantics.databoard.accessor.StreamAccessor;
31 import org.simantics.databoard.accessor.binary.BinaryObject;
32 import org.simantics.databoard.accessor.error.AccessorException;
33 import org.simantics.databoard.binding.ArrayBinding;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
39 import org.simantics.databoard.binding.mutable.MutableVariant;
40 import org.simantics.databoard.binding.mutable.Variant;
41 import org.simantics.databoard.container.DataContainer;
42 import org.simantics.databoard.container.DataContainers;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.util.Bean;
45 import org.simantics.databoard.util.binary.BinaryFile;
46 import org.simantics.databoard.util.binary.OutputStreamWriteable;
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
48 import org.simantics.db.ReadGraph;
49 import org.simantics.db.Resource;
50 import org.simantics.db.WriteGraph;
51 import org.simantics.db.common.request.UniqueRead;
52 import org.simantics.db.common.request.WriteRequest;
53 import org.simantics.db.exception.DatabaseException;
54 import org.simantics.db.exception.ServiceNotFoundException;
55 import org.simantics.db.request.Read;
56 import org.simantics.fastlz.FastLZ;
57 import org.simantics.history.HistoryException;
58 import org.simantics.history.HistoryManager;
59 import org.simantics.history.ItemManager;
60 import org.simantics.history.impl.CollectorImpl;
61 import org.simantics.history.impl.CollectorState;
62 import org.simantics.history.impl.CollectorState.VariableState;
63 import org.simantics.history.util.Stream;
64 import org.simantics.history.util.ValueBand;
65 import org.simantics.history.util.WeightedMedian;
66 import org.simantics.layer0.Layer0;
67 import org.simantics.simulation.Activator;
68 import org.simantics.simulation.ontology.HistoryResource;
69 import org.simantics.simulation.ontology.SimulationResource;
70 import org.simantics.utils.FileUtils;
72 import gnu.trove.map.TObjectLongMap;
73 import gnu.trove.map.hash.TObjectLongHashMap;
74 import net.jpountz.lz4.LZ4BlockInputStream;
75 import net.jpountz.lz4.LZ4BlockOutputStream;
78 * @author Toni Kalajainen
79 * @author Tuukka Lehtonen
81 public class HistoryUtil {
83 private static final boolean DEBUG = false;
84 private static final boolean PROFILE = true;
86 public enum Compression {
92 private static final Compression USED_COMPRESSION = Compression.FLZ;
94 private static final String APPLICATION_X_TAR = "application/x-tar";
95 private static final String APPLICATION_X_LZ4 = "application/x-lz4";
96 private static final String APPLICATION_X_FLZ = "application/x-flz";
97 private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
99 public static class ArchivedHistory {
100 public long totalSampleSize = 0L;
101 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
105 * Create request that exports history to graph.
107 * If removeOldUnused is true, items that do not exist in source history
108 * are deleted from graph.
110 * If item already exists in the graph, it is overwritten.
112 * @param history source history
113 * @param g write graph
114 * @param historyResource Instance of HIS.History to write data to
115 * @param removeOldUnused
116 * @param timeShift adjust time values by this value
119 * @return WriteRequest
121 public static WriteRequest export(final HistoryManager history,
122 final Resource historyResource,
123 final boolean removeOldUnused,
124 final Double timeShift,
125 final double from, final double end)
127 return new WriteRequest() {
128 public void perform(WriteGraph graph) throws DatabaseException {
129 HistoryResource HIS = HistoryResource.getInstance(graph);
130 Layer0 L0 = Layer0.getInstance(graph);
131 long totalSampleSize = 0L;
132 long s = System.nanoTime();
135 Bean[] items = history.getItems();
137 profile(null, "exporting " + items.length + " history items to database");
139 ItemManager im = new ItemManager( items );
140 Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
141 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
143 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
144 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
145 if ( bean == null ) continue;
146 String id = (String) bean.getField("id");
148 if (!im.exists(id)) {
149 // There is item in graph that does not exist in source history
150 if ( removeOldUnused ) {
154 // Item already exists, to-be-overwritten
155 oldResourceMap.put(id, oldItem);
159 for (Bean newItem : im.values())
161 //System.out.println("WRITING ITEM: " + newItem);
162 String id = (String) newItem.getField("id");
163 Resource historyItemResource = oldResourceMap.get(id);
164 Resource data = null;
166 if ( historyItemResource==null ) {
167 historyItemResource = graph.newResource();
168 graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
169 graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
170 graph.claimLiteral(historyItemResource, L0.HasName, id);
172 data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
175 Variant v = new Variant(newItem.getBinding(), newItem);
176 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
179 data = graph.newResource();
180 graph.claim(data, L0.InstanceOf, L0.Variant);
181 graph.claim(historyItemResource, HIS.History_Item_Series, data);
183 graph.denyValue(data);
188 StreamAccessor sa = history.openStream(id, "r");
190 //System.out.println("WRITING stream of size=" + sa.size());
191 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
192 Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
193 Integer constantSampleSize = serializer.getConstantSize();
194 ValueBand vb = new ValueBand(sampleBinding);
195 Stream stream = new Stream(sa, sampleBinding);
196 ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
198 //System.out.println("WRITING stream: sample size=" + constantSampleSize);
203 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
204 if ( startIndex < -stream.count() ) break readArray;
205 if ( startIndex<0 ) startIndex = -2-startIndex;
206 if ( startIndex == -1 ) startIndex = 0;
209 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
210 if ( endIndex == -1 ) break readArray;
211 if ( endIndex<0 ) endIndex = -1-endIndex;
212 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
213 if ( endIndex<startIndex ) break readArray;
215 // Write sample count
216 count = endIndex - startIndex + 1;
217 array = arrayBinding.create(count);
218 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
219 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
220 for (int i=0; i<count; i++) {
221 Object sample = sa.get(i+startIndex, sampleBinding);
224 if ( hasTimeShift ) {
225 vb.setSample(sample);
226 if ( vb.hasTime() ) {
227 double newTime = vb.getTimeDouble() + timeShift;
228 vb.setTime(Bindings.DOUBLE, newTime);
230 if ( vb.hasEndTime() ) {
231 double newTime = vb.getEndTimeDouble() + timeShift;
232 vb.setEndTime(Bindings.DOUBLE, newTime);
235 //System.out.println("\t#" + i + ": sample=" + sample);
236 arrayBinding.set(array, i, sample);
239 if (array==null) array = arrayBinding.create();
240 Variant v2 = new Variant(arrayBinding, array);
241 graph.claimValue(data, v2, Bindings.VARIANT);
243 if (constantSampleSize != null) {
244 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
245 //System.out.println("item sample size: " + itemSampleSize);
246 totalSampleSize += itemSampleSize;
247 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
249 } catch (AccessorException e) {
250 throw new DatabaseException(e);
254 } catch (AccessorException e) {
259 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
262 profile(s, "exported " + items.length + " history items to database");
264 } catch (HistoryException e) {
265 throw new DatabaseException( e );
266 } catch (BindingException e) {
267 throw new DatabaseException( e );
268 } catch (ServiceNotFoundException e) {
269 throw new DatabaseException( e );
276 * Create request that exports history to graph.
278 * If item already exists in the graph, it is overwritten.
282 * @param collectorState
283 * complete dump of the source history collector state or
284 * <code>null</code> to skip collector state saving
285 * @param historyResource
286 * Instance of HIS.History to write data to
288 * adjust time values by this value
293 * @return WriteRequest
295 public static WriteRequest exportArchive(
296 final HistoryManager history,
297 final Bean collectorState,
298 final Resource historyResource,
299 final Double timeShift,
303 return new WriteRequest() {
304 public void perform(WriteGraph graph) throws DatabaseException {
305 HistoryResource HIS = HistoryResource.getInstance(graph);
306 Layer0 L0 = Layer0.getInstance(graph);
308 long s = System.nanoTime();
310 profile(null, "archiving history items to database");
312 // Remove all possibly existing old items in the history.
313 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
314 if (graph.isInstanceOf(entity, HIS.History_Item))
315 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
317 // Create new literal for history archival
318 //graph.deny(historyResource, HIS.History_archive);
319 graph.denyValue(historyResource, HIS.History_archive);
320 Resource archiveLiteral = graph.newResource();
321 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
322 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
324 OutputStream closeable = null;
326 RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
330 ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
333 rab.writeInt((int)(rab.length() - 4L));
335 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
338 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
340 } catch (HistoryException e) {
341 throw new DatabaseException( e );
342 } catch (IOException e) {
343 throw new DatabaseException( e );
345 FileUtils.uncheckedClose(closeable);
351 public static ArchivedHistory exportCompressedArchive(
352 HistoryManager history,
357 RandomAccessBinary rab)
358 throws IOException, HistoryException
360 return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
363 public static ArchivedHistory exportCompressedArchive(
364 HistoryManager history,
369 Compression compression,
370 RandomAccessBinary rab)
371 throws IOException, HistoryException
373 switch (compression) {
375 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
376 return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
378 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
379 return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
381 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
382 return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
384 throw new UnsupportedOperationException("Unsupported compression: " + compression);
388 private static ArchivedHistory exportArchive(
389 HistoryManager history,
395 throws IOException, HistoryException
397 try (OutputStream os = out) {
398 ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
405 * Store the specified history manager's contents into a single archive
408 * @param outputStream
409 * the output stream where the history archive is written
412 * @param collectorState
413 * complete state of the history collector at the time of saving
414 * or <code>null</code> to not save any state
416 * adjust time values by this value
421 * @return ArchivedHistory instance describing the created archive
422 * @throws HistoryException
423 * when there's a problem with reading the history data from the
425 * @throws IOException
426 * when there's a problem writing the resulting history archive
429 private static ArchivedHistory exportArchive(
430 OutputStream outputStream,
431 HistoryManager history,
436 throws IOException, HistoryException
438 ArchivedHistory result = new ArchivedHistory();
440 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
441 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
443 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
444 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
447 if (collectorState != null) {
448 // Write complete collector state into the archive
450 System.out.println("WRITING collector state: " + collectorState);
451 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
452 putNextEntry(out, "state", serializedCollectorState.length);
453 out.write(serializedCollectorState);
454 out.closeArchiveEntry();
457 ItemManager im = new ItemManager( history.getItems() );
459 for (Bean item : im.values()) {
461 System.out.println("STORING ITEM: " + item);
462 String id = (String) item.getField("id");
464 // Write data stream metadata
465 byte[] metadata = beanSerializer.serialize( item );
466 putNextEntry(out, id + ".txt", metadata.length);
468 out.closeArchiveEntry();
470 System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
473 // Write data stream as a separate entry
474 StreamAccessor sa = history.openStream(id, "r");
477 System.out.println("STREAM SIZE=" + sa.size());
478 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
479 Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
480 Integer constantSampleSize = sampleSerializer.getConstantSize();
482 if (constantSampleSize == null) {
483 // Variable length samples - no support for now.
484 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
487 ValueBand vb = new ValueBand(sampleBinding);
488 Stream stream = new Stream(sa, sampleBinding);
490 System.out.println("WRITING stream: sample size=" + constantSampleSize);
494 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
495 if ( startIndex < -stream.count() ) break readArray;
496 if ( startIndex<0 ) startIndex = -2-startIndex;
497 if ( startIndex == -1 ) startIndex = 0;
500 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
501 if ( endIndex == -1 ) break readArray;
502 if ( endIndex<0 ) endIndex = -1-endIndex;
503 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
504 if ( endIndex<startIndex ) break readArray;
506 // Written sample count
507 int count = endIndex - startIndex + 1;
509 System.out.println("WRITTEN sample count=" + count);
511 long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
513 System.out.println("saved samples size in bytes: " + savedSamplesSize);
514 putNextEntry(out, id + ".data", savedSamplesSize);
516 for (int i=0; i<count; i++) {
517 Object sample = sa.get(i+startIndex, sampleBinding);
520 if ( hasTimeShift ) {
521 vb.setSample(sample);
522 if ( vb.hasTime() ) {
523 double newTime = vb.getTimeDouble() + timeShift;
524 vb.setTime(Bindings.DOUBLE, newTime);
526 if ( vb.hasEndTime() ) {
527 double newTime = vb.getEndTimeDouble() + timeShift;
528 vb.setEndTime(Bindings.DOUBLE, newTime);
531 //System.out.println("\t#" + i + ": sample=" + sample);
532 sampleSerializer.serialize(dataOutput, sample);
534 out.closeArchiveEntry();
536 result.itemSizes.put(item, savedSamplesSize);
537 result.totalSampleSize += savedSamplesSize;
540 } catch (AccessorException e) {
541 throw new IOException(e);
545 } catch (AccessorException e) {
552 } catch (BindingException e) {
553 throw new HistoryException(e);
560 * Import all history items from graph into a history manager.
562 * @param r history resource or initial condition resource source HIS.History resource
563 * @param history destination history
564 * @return read request that always returns a HistoryImportResult instance
566 public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
568 return new UniqueRead<HistoryImportResult>() {
570 public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
571 HistoryImportResult result = new HistoryImportResult();
573 HistoryResource HIS = HistoryResource.getInstance(graph);
574 SimulationResource SIM = SimulationResource.getInstance(graph);
575 Resource historyResource = r;
577 if (!graph.isInstanceOf(historyResource, HIS.History))
578 historyResource = graph.getPossibleObject(r, SIM.State_History);
579 if (historyResource == null)
581 if (!graph.isInstanceOf(historyResource, HIS.History))
584 long s = System.nanoTime();
586 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
587 if (archive == null) {
589 profile(null, "importing history items from old database format to disk workarea");
590 importHistoryItems(graph, historyResource, history);
594 profile(null, "importing history items from archived format to disk workarea");
595 importHistoryArchive(graph, archive, history, result);
596 } catch (IOException e) {
597 throw new DatabaseException(e);
598 } catch (HistoryException e) {
599 throw new DatabaseException(e);
604 profile(s, "imported history items from database to disk workarea");
612 * Import all history items from graph into a history manager.
614 * @param r history resource or initial condition resource source HIS.History resource
615 * @param history destination history
616 * @return <code>true</code> if successful
618 private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
620 HistoryResource HIS = HistoryResource.getInstance(graph);
621 Layer0 L0 = Layer0.getInstance(graph);
624 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
625 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
628 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
629 String id = (String) bean.getFieldUnchecked("id");
630 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
631 Binding arrayBinding = Bindings.getBinding( array.getClass() );
632 history.modify(bean);
633 StreamAccessor sa = history.openStream(id, "rw");
635 sa.setValue(arrayBinding, array);
639 } catch (AccessorException e) {
645 } catch (AccessorException e) {
646 throw new DatabaseException(e);
647 } catch (BindingConstructionException e) {
648 throw new DatabaseException(e);
649 } catch (HistoryException e) {
650 throw new DatabaseException(e);
658 * @throws HistoryException
659 * @throws IOException
661 public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
662 HistoryImportResult result = new HistoryImportResult();
663 try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
664 importHistoryArchive(history, rab, result);
670 * Import all history items from database archive into a history manager.
673 * @param historyResource
676 * @return <code>true</code> if successful
677 * @throws DatabaseException
679 private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
680 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
681 // This is here because the data is encoded as an L0.ByteArray
682 // in the database which means the first integer describes the
683 // length of the byte array. We skip that length data here.
685 return importHistoryArchive(history, rab, result);
689 * Import all history items from database archive into a history manager.
692 * @param historyResource
695 * @return <code>true</code> if successful
696 * @throws DatabaseException
698 private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
699 throws IOException, HistoryException
701 DataContainer dc = DataContainers.readHeader(rab);
704 System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
706 if (dc.version == ARCHIVE_VERSION_1) {
707 if (APPLICATION_X_LZ4.equals(dc.format)) {
708 return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
709 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
710 return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
711 } else if (APPLICATION_X_TAR.equals(dc.format)) {
712 return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
714 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
716 throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
720 private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
721 try (InputStream _in = in) {
722 return extractHistoryArchiveTar(history, in, result);
727 * Import all history items from graph into a history manager
729 * @param graph database access
730 * @param history destination history
731 * @param rab the archive to extract from
732 * @return <code>true</code> if successful
733 * @throws DatabaseException
735 private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
736 throws IOException, HistoryException
738 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
739 TarArchiveInputStream tar = new TarArchiveInputStream(in);
741 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
743 Bean lastItem = null;
744 String lastItemId = null;
746 TarArchiveEntry entry = tar.getNextTarEntry();
750 String name = entry.getName();
751 boolean state = name.equals("state");
752 boolean metadata = name.endsWith(".txt");
753 boolean data = name.endsWith(".data");
756 if (result != null) {
757 byte[] st = new byte[(int) entry.getSize()];
759 result.collectorState = (Bean) beanSerializer.deserialize(st);
761 System.out.println("READ collector state: " + result.collectorState);
763 tar.skip(entry.getSize());
765 } else if (metadata) {
766 byte[] md = new byte[(int) entry.getSize()];
769 System.out.println("READING Item metadata: " + name);
770 System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
774 Bean bean = (Bean) beanSerializer.deserialize(md);
776 System.out.println("READ Item metadata: " + bean);
777 history.modify(bean);
779 lastItemId = (String) bean.getFieldUnchecked("id");
780 } catch (ClassFormatError e) {
781 // This is here because LZ4BlockInput/OutputStream seemed to
782 // be causing weird deserialization errors to occur for
783 // single data items when trying to deserialize beans. The
784 // reason being that the serialized data that was read only
785 // contains the first 512 bytes of the data. After that all
786 // data is zeros, which causes things like this to happen in
788 //signature: rsstzzzzzzzze_b7b532e9
789 //component(o): id = String
790 //component(1): variableId = String
791 //component(2): format = Temp1
792 //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
793 //component(3): = Boolean
794 //component(4): = Boolean
795 //component(5): = Boolean
796 //component(6): = Boolean
797 //component(7): = Boolean
798 //component(8): = Boolean
799 //component(9): = Boolean
800 //component(10): = Boolean
802 // For this reason we've switched the default compression to FastLZ
803 // for now. This is reflected in the input format also.
805 Activator.getDefault().getLog().log(
806 new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
808 } else if (data && lastItem != null) {
809 StreamAccessor sa = history.openStream(lastItemId, "rw");
811 if (sa instanceof BinaryObject) {
812 BinaryObject bo = (BinaryObject) sa;
813 RandomAccessBinary output = bo.getBinary();
815 output.setLength(0L);
816 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
817 if (copiedBytes != entry.getSize()) {
818 System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
821 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
828 } catch (AccessorException e) {
838 * Get request that clears history
839 * @return cleaning request
841 public static WriteRequest clear(final Resource history) {
842 return new WriteRequest() {
844 public void perform(WriteGraph graph) throws DatabaseException {
845 HistoryResource HIS = HistoryResource.getInstance(graph);
846 Layer0 L0 = Layer0.getInstance(graph);
848 // Separate items format
849 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
851 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
852 Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
854 graph.deny(oldItem, HIS.History_Item_Info);
857 Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
859 graph.deny(oldItem, HIS.History_Item_Series);
862 graph.deny(history, L0.ConsistsOf, oldItem);
867 graph.denyValue(history, HIS.History_archive);
873 private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
874 TarArchiveEntry entry = new TarArchiveEntry(entryName);
876 out.putArchiveEntry(entry);
880 * Shall not close the underlying RandomAccessBinary.
882 private static class RABOutputStream extends OutputStream {
884 private final RandomAccessBinary output;
886 public RABOutputStream(RandomAccessBinary output) {
887 this.output = output;
891 public void write(byte[] b, int off, int len) throws IOException {
892 output.write(b, off, len);
896 public void write(int b) throws IOException {
903 * Shall not close the underlying RandomAccessBinary.
905 private static class RABInputStream extends InputStream {
907 private final RandomAccessBinary input;
909 public RABInputStream(RandomAccessBinary input) {
914 public int available() throws IOException {
915 long avail = input.length() - input.position();
916 if ((avail & 0xffffffff00000000L) != 0)
917 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
922 public int read() throws IOException {
924 return input.readUnsignedByte();
925 } catch (EOFException e) {
931 public int read(byte[] b, int off, int len) throws IOException {
932 long available = input.length() - input.position();
935 int l = (int) Math.min(available, (long) len);
936 input.readFully(b, off, l);
941 public long skip(long n) throws IOException {
944 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
945 count += input.skipBytes(l);
952 private static long profile(Long t1, String string) {
954 long t2 = System.nanoTime();
955 long delta = t1 == null ? 0 : t2-t1;
956 System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
962 public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
963 Double t = toBeforeTime;
964 Binding timeBinding = null;
966 Bean[] items = history.getItems();
967 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
969 for (Bean item : items) {
970 String id = (String) item.getField("id");
971 StreamAccessor sa = history.openStream(id, "rw");
973 Stream s = new Stream(sa);
974 timeBinding = s.timeBinding;
975 int currentSize = sa.size();
976 int index = s.binarySearch(timeBinding, toBeforeTime);
977 int newSize = truncationSize(index);
978 if (newSize < currentSize) {
979 //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
983 Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
984 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
985 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
986 boolean isNan = isNaN(prevValue);
988 VariableState vs = state.values.get(id);
989 if (vs != null && vs.value != null && prevValue != null) {
990 vs.value.setValue(vs.value.getBinding(), prevValue);
995 CollectorState.Item is = state.itemStates.get(id);
997 is.firstTime = toTime(prevTime);
998 is.firstValue = toValue(s.valueBinding, prevValue);
999 is.currentTime = is.firstTime;
1000 is.currentValue = toValue(s.valueBinding, prevValue);
1002 is.isValid = prevValue != null;
1003 is.sum = Double.NaN;
1005 is.ooDeadband = false;
1006 is.firstDisabledTime = Double.NaN;
1007 is.lastDisabledTime = Double.NaN;
1008 is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1017 if (timeBinding != null && state != null) {
1018 state.time.setValue(timeBinding, t);
1025 private static double toTime(Object time) {
1026 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1029 private static MutableVariant toValue(Binding valueBinding, Object value) {
1030 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1033 private static boolean isNaN(Object value) {
1034 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1037 private static int truncationSize(int binarySearchResult) {
1038 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);