1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 * Semantum Oy - archive implementation, gitlab simantics/platform#227
12 *******************************************************************************/
13 package org.simantics.simulation.history;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.nio.file.Path;
20 import java.util.Arrays;
21 import java.util.HashMap;
24 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
25 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
26 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
27 import org.eclipse.core.runtime.IStatus;
28 import org.eclipse.core.runtime.Status;
29 import org.simantics.databoard.Bindings;
30 import org.simantics.databoard.accessor.StreamAccessor;
31 import org.simantics.databoard.accessor.binary.BinaryObject;
32 import org.simantics.databoard.accessor.error.AccessorException;
33 import org.simantics.databoard.binding.ArrayBinding;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
39 import org.simantics.databoard.binding.mutable.MutableVariant;
40 import org.simantics.databoard.binding.mutable.Variant;
41 import org.simantics.databoard.container.DataContainer;
42 import org.simantics.databoard.container.DataContainers;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.util.Bean;
45 import org.simantics.databoard.util.binary.BinaryFile;
46 import org.simantics.databoard.util.binary.OutputStreamWriteable;
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
48 import org.simantics.db.ReadGraph;
49 import org.simantics.db.Resource;
50 import org.simantics.db.WriteGraph;
51 import org.simantics.db.common.request.UniqueRead;
52 import org.simantics.db.common.request.WriteRequest;
53 import org.simantics.db.exception.DatabaseException;
54 import org.simantics.db.exception.ServiceNotFoundException;
55 import org.simantics.db.request.Read;
56 import org.simantics.fastlz.FastLZ;
57 import org.simantics.history.HistoryException;
58 import org.simantics.history.HistoryManager;
59 import org.simantics.history.ItemManager;
60 import org.simantics.history.impl.CollectorImpl;
61 import org.simantics.history.impl.CollectorState;
62 import org.simantics.history.impl.CollectorState.VariableState;
63 import org.simantics.history.util.Stream;
64 import org.simantics.history.util.ValueBand;
65 import org.simantics.history.util.WeightedMedian;
66 import org.simantics.layer0.Layer0;
67 import org.simantics.simulation.Activator;
68 import org.simantics.simulation.ontology.HistoryResource;
69 import org.simantics.simulation.ontology.SimulationResource;
70 import org.simantics.utils.FileUtils;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
74 import gnu.trove.map.TObjectLongMap;
75 import gnu.trove.map.hash.TObjectLongHashMap;
76 import net.jpountz.lz4.LZ4BlockInputStream;
77 import net.jpountz.lz4.LZ4BlockOutputStream;
80 * @author Toni Kalajainen
81 * @author Tuukka Lehtonen
83 public class HistoryUtil {
85 private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class);
87 private static final boolean DEBUG = false;
88 private static final boolean PROFILE = true;
90 public enum Compression {
96 private static final Compression USED_COMPRESSION = Compression.FLZ;
98 private static final String APPLICATION_X_TAR = "application/x-tar";
99 private static final String APPLICATION_X_LZ4 = "application/x-lz4";
100 private static final String APPLICATION_X_FLZ = "application/x-flz";
101 private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
103 public static class ArchivedHistory {
104 public long totalSampleSize = 0L;
105 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
109 * Create request that exports history to graph.
111 * If removeOldUnused is true, items that do not exist in source history
112 * are deleted from graph.
114 * If item already exists in the graph, it is overwritten.
116 * @param history source history
117 * @param g write graph
118 * @param historyResource Instance of HIS.History to write data to
119 * @param removeOldUnused
120 * @param timeShift adjust time values by this value
123 * @return WriteRequest
125 public static WriteRequest export(final HistoryManager history,
126 final Resource historyResource,
127 final boolean removeOldUnused,
128 final Double timeShift,
129 final double from, final double end)
131 return new WriteRequest() {
132 public void perform(WriteGraph graph) throws DatabaseException {
133 HistoryResource HIS = HistoryResource.getInstance(graph);
134 Layer0 L0 = Layer0.getInstance(graph);
135 long totalSampleSize = 0L;
136 long s = System.nanoTime();
139 Bean[] items = history.getItems();
141 profile(null, "exporting " + items.length + " history items to database");
143 ItemManager im = new ItemManager( items );
144 Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
145 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
147 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
148 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
149 if ( bean == null ) continue;
150 String id = (String) bean.getField("id");
152 if (!im.exists(id)) {
153 // There is item in graph that does not exist in source history
154 if ( removeOldUnused ) {
158 // Item already exists, to-be-overwritten
159 oldResourceMap.put(id, oldItem);
163 for (Bean newItem : im.values())
165 //System.out.println("WRITING ITEM: " + newItem);
166 String id = (String) newItem.getField("id");
167 Resource historyItemResource = oldResourceMap.get(id);
168 Resource data = null;
170 if ( historyItemResource==null ) {
171 historyItemResource = graph.newResource();
172 graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
173 graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
174 graph.claimLiteral(historyItemResource, L0.HasName, id);
176 data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
179 Variant v = new Variant(newItem.getBinding(), newItem);
180 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
183 data = graph.newResource();
184 graph.claim(data, L0.InstanceOf, L0.Variant);
185 graph.claim(historyItemResource, HIS.History_Item_Series, data);
187 graph.denyValue(data);
192 StreamAccessor sa = history.openStream(id, "r");
194 //System.out.println("WRITING stream of size=" + sa.size());
195 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
196 Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
197 Integer constantSampleSize = serializer.getConstantSize();
198 ValueBand vb = new ValueBand(sampleBinding);
199 Stream stream = new Stream(sa, sampleBinding);
200 ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
202 //System.out.println("WRITING stream: sample size=" + constantSampleSize);
207 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
208 if ( startIndex < -stream.count() ) break readArray;
209 if ( startIndex<0 ) startIndex = -2-startIndex;
210 if ( startIndex == -1 ) startIndex = 0;
213 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
214 if ( endIndex == -1 ) break readArray;
215 if ( endIndex<0 ) endIndex = -1-endIndex;
216 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
217 if ( endIndex<startIndex ) break readArray;
219 // Write sample count
220 count = endIndex - startIndex + 1;
221 array = arrayBinding.create(count);
222 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
223 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
224 for (int i=0; i<count; i++) {
225 Object sample = sa.get(i+startIndex, sampleBinding);
228 if ( hasTimeShift ) {
229 vb.setSample(sample);
230 if ( vb.hasTime() ) {
231 double newTime = vb.getTimeDouble() + timeShift;
232 vb.setTime(Bindings.DOUBLE, newTime);
234 if ( vb.hasEndTime() ) {
235 double newTime = vb.getEndTimeDouble() + timeShift;
236 vb.setEndTime(Bindings.DOUBLE, newTime);
239 //System.out.println("\t#" + i + ": sample=" + sample);
240 arrayBinding.set(array, i, sample);
243 if (array==null) array = arrayBinding.create();
244 Variant v2 = new Variant(arrayBinding, array);
245 graph.claimValue(data, v2, Bindings.VARIANT);
247 if (constantSampleSize != null) {
248 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
249 //System.out.println("item sample size: " + itemSampleSize);
250 totalSampleSize += itemSampleSize;
251 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
253 } catch (AccessorException e) {
254 throw new DatabaseException(e);
258 } catch (AccessorException e) {
263 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
266 profile(s, "exported " + items.length + " history items to database");
268 } catch (HistoryException e) {
269 throw new DatabaseException( e );
270 } catch (BindingException e) {
271 throw new DatabaseException( e );
272 } catch (ServiceNotFoundException e) {
273 throw new DatabaseException( e );
280 * Create request that exports history to graph.
282 * If item already exists in the graph, it is overwritten.
286 * @param collectorState
287 * complete dump of the source history collector state or
288 * <code>null</code> to skip collector state saving
289 * @param historyResource
290 * Instance of HIS.History to write data to
292 * adjust time values by this value
297 * @return WriteRequest
299 public static WriteRequest exportArchive(
300 final HistoryManager history,
301 final Bean collectorState,
302 final Resource historyResource,
303 final Double timeShift,
307 return new WriteRequest() {
308 public void perform(WriteGraph graph) throws DatabaseException {
309 HistoryResource HIS = HistoryResource.getInstance(graph);
310 Layer0 L0 = Layer0.getInstance(graph);
312 long s = System.nanoTime();
314 profile(null, "archiving history items to database");
316 // Remove all possibly existing old items in the history.
317 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
318 if (graph.isInstanceOf(entity, HIS.History_Item))
319 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
321 // Create new literal for history archival
322 //graph.deny(historyResource, HIS.History_archive);
323 graph.denyValue(historyResource, HIS.History_archive);
324 Resource archiveLiteral = graph.newResource();
325 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
326 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
328 OutputStream closeable = null;
330 RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
334 ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
337 rab.writeInt((int)(rab.length() - 4L));
339 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
342 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
344 } catch (HistoryException e) {
345 throw new DatabaseException( e );
346 } catch (IOException e) {
347 throw new DatabaseException( e );
349 FileUtils.uncheckedClose(closeable);
355 public static ArchivedHistory exportCompressedArchive(
356 HistoryManager history,
361 RandomAccessBinary rab)
362 throws IOException, HistoryException
364 return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
367 public static ArchivedHistory exportCompressedArchive(
368 HistoryManager history,
373 Compression compression,
374 RandomAccessBinary rab)
375 throws IOException, HistoryException
377 switch (compression) {
379 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
380 return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
382 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
383 return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
385 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
386 return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
388 throw new UnsupportedOperationException("Unsupported compression: " + compression);
392 private static ArchivedHistory exportArchive(
393 HistoryManager history,
399 throws IOException, HistoryException
401 try (OutputStream os = out) {
402 ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
409 * Store the specified history manager's contents into a single archive
412 * @param outputStream
413 * the output stream where the history archive is written
416 * @param collectorState
417 * complete state of the history collector at the time of saving
418 * or <code>null</code> to not save any state
420 * adjust time values by this value
425 * @return ArchivedHistory instance describing the created archive
426 * @throws HistoryException
427 * when there's a problem with reading the history data from the
429 * @throws IOException
430 * when there's a problem writing the resulting history archive
433 private static ArchivedHistory exportArchive(
434 OutputStream outputStream,
435 HistoryManager history,
440 throws IOException, HistoryException
442 ArchivedHistory result = new ArchivedHistory();
444 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
445 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
447 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
448 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
451 if (collectorState != null) {
452 // Write complete collector state into the archive
454 System.out.println("WRITING collector state: " + collectorState);
455 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
456 putNextEntry(out, "state", serializedCollectorState.length);
457 out.write(serializedCollectorState);
458 out.closeArchiveEntry();
461 ItemManager im = new ItemManager( history.getItems() );
463 for (Bean item : im.values()) {
465 System.out.println("STORING ITEM: " + item);
466 String id = (String) item.getField("id");
468 // Write data stream metadata
469 byte[] metadata = beanSerializer.serialize( item );
470 putNextEntry(out, id + ".txt", metadata.length);
472 out.closeArchiveEntry();
474 System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
477 // Write data stream as a separate entry
478 StreamAccessor sa = history.openStream(id, "r");
481 System.out.println("STREAM SIZE=" + sa.size());
482 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
483 Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
484 Integer constantSampleSize = sampleSerializer.getConstantSize();
486 if (constantSampleSize == null) {
487 // Variable length samples - no support for now.
488 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
491 ValueBand vb = new ValueBand(sampleBinding);
492 Stream stream = new Stream(sa, sampleBinding);
494 System.out.println("WRITING stream: sample size=" + constantSampleSize);
498 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
499 if ( startIndex < -stream.count() ) break readArray;
500 if ( startIndex<0 ) startIndex = -2-startIndex;
501 if ( startIndex == -1 ) startIndex = 0;
504 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
505 if ( endIndex == -1 ) break readArray;
506 if ( endIndex<0 ) endIndex = -1-endIndex;
507 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
508 if ( endIndex<startIndex ) break readArray;
510 // Written sample count
511 int count = endIndex - startIndex + 1;
513 System.out.println("WRITTEN sample count=" + count);
515 long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
517 System.out.println("saved samples size in bytes: " + savedSamplesSize);
518 putNextEntry(out, id + ".data", savedSamplesSize);
520 for (int i=0; i<count; i++) {
521 Object sample = sa.get(i+startIndex, sampleBinding);
524 if ( hasTimeShift ) {
525 vb.setSample(sample);
526 if ( vb.hasTime() ) {
527 double newTime = vb.getTimeDouble() + timeShift;
528 vb.setTime(Bindings.DOUBLE, newTime);
530 if ( vb.hasEndTime() ) {
531 double newTime = vb.getEndTimeDouble() + timeShift;
532 vb.setEndTime(Bindings.DOUBLE, newTime);
535 //System.out.println("\t#" + i + ": sample=" + sample);
536 sampleSerializer.serialize(dataOutput, sample);
538 out.closeArchiveEntry();
540 result.itemSizes.put(item, savedSamplesSize);
541 result.totalSampleSize += savedSamplesSize;
544 } catch (AccessorException e) {
545 throw new IOException(e);
549 } catch (AccessorException e) {
556 } catch (BindingException e) {
557 throw new HistoryException(e);
564 * Import all history items from graph into a history manager.
566 * @param r history resource or initial condition resource source HIS.History resource
567 * @param history destination history
568 * @return read request that always returns a HistoryImportResult instance
570 public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
572 return new UniqueRead<HistoryImportResult>() {
574 public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
575 HistoryImportResult result = new HistoryImportResult();
577 HistoryResource HIS = HistoryResource.getInstance(graph);
578 SimulationResource SIM = SimulationResource.getInstance(graph);
579 Resource historyResource = r;
581 if (!graph.isInstanceOf(historyResource, HIS.History))
582 historyResource = graph.getPossibleObject(r, SIM.State_History);
583 if (historyResource == null)
585 if (!graph.isInstanceOf(historyResource, HIS.History))
588 long s = System.nanoTime();
590 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
591 if (archive == null) {
593 profile(null, "importing history items from old database format to disk workarea");
594 importHistoryItems(graph, historyResource, history);
598 profile(null, "importing history items from archived format to disk workarea");
599 importHistoryArchive(graph, archive, history, result);
600 } catch (IOException e) {
601 throw new DatabaseException(e);
602 } catch (HistoryException e) {
603 throw new DatabaseException(e);
608 profile(s, "imported history items from database to disk workarea");
616 * Import all history items from graph into a history manager.
618 * @param r history resource or initial condition resource source HIS.History resource
619 * @param history destination history
620 * @return <code>true</code> if successful
622 private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
624 HistoryResource HIS = HistoryResource.getInstance(graph);
625 Layer0 L0 = Layer0.getInstance(graph);
628 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
629 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
632 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
633 String id = (String) bean.getFieldUnchecked("id");
634 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
635 Binding arrayBinding = Bindings.getBinding( array.getClass() );
636 history.modify(bean);
637 StreamAccessor sa = history.openStream(id, "rw");
639 sa.setValue(arrayBinding, array);
643 } catch (AccessorException e) {
649 } catch (AccessorException e) {
650 throw new DatabaseException(e);
651 } catch (BindingConstructionException e) {
652 throw new DatabaseException(e);
653 } catch (HistoryException e) {
654 throw new DatabaseException(e);
662 * @throws HistoryException
663 * @throws IOException
665 public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
666 HistoryImportResult result = new HistoryImportResult();
667 try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
668 importHistoryArchive(history, rab, result);
670 } catch (IOException e) {
671 LOGGER.error("Failed to import history from archive {}", path);
677 * Import all history items from database archive into a history manager.
680 * @param historyResource
683 * @return <code>true</code> if successful
684 * @throws DatabaseException
686 private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
687 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
688 // This is here because the data is encoded as an L0.ByteArray
689 // in the database which means the first integer describes the
690 // length of the byte array. We skip that length data here.
692 return importHistoryArchive(history, rab, result);
696 * Import all history items from database archive into a history manager.
699 * @param historyResource
702 * @return <code>true</code> if successful
703 * @throws DatabaseException
705 private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
706 throws IOException, HistoryException
708 DataContainer dc = DataContainers.readHeader(rab);
711 System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
713 if (dc.version == ARCHIVE_VERSION_1) {
714 if (APPLICATION_X_LZ4.equals(dc.format)) {
715 return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
716 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
717 return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
718 } else if (APPLICATION_X_TAR.equals(dc.format)) {
719 return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
721 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
723 throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
727 private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
728 try (InputStream _in = in) {
729 return extractHistoryArchiveTar(history, in, result);
734 * Import all history items from graph into a history manager
736 * @param graph database access
737 * @param history destination history
738 * @param rab the archive to extract from
739 * @return <code>true</code> if successful
740 * @throws DatabaseException
742 private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
743 throws IOException, HistoryException
745 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
746 TarArchiveInputStream tar = new TarArchiveInputStream(in);
748 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
750 Bean lastItem = null;
751 String lastItemId = null;
753 TarArchiveEntry entry = tar.getNextTarEntry();
757 String name = entry.getName();
758 boolean state = name.equals("state");
759 boolean metadata = name.endsWith(".txt");
760 boolean data = name.endsWith(".data");
763 if (result != null) {
764 byte[] st = new byte[(int) entry.getSize()];
766 result.collectorState = (Bean) beanSerializer.deserialize(st);
768 System.out.println("READ collector state: " + result.collectorState);
770 tar.skip(entry.getSize());
772 } else if (metadata) {
773 byte[] md = new byte[(int) entry.getSize()];
776 System.out.println("READING Item metadata: " + name);
777 System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
781 Bean bean = (Bean) beanSerializer.deserialize(md);
783 System.out.println("READ Item metadata: " + bean);
784 history.modify(bean);
786 lastItemId = (String) bean.getFieldUnchecked("id");
787 } catch (ClassFormatError e) {
788 // This is here because LZ4BlockInput/OutputStream seemed to
789 // be causing weird deserialization errors to occur for
790 // single data items when trying to deserialize beans. The
791 // reason being that the serialized data that was read only
792 // contains the first 512 bytes of the data. After that all
793 // data is zeros, which causes things like this to happen in
795 //signature: rsstzzzzzzzze_b7b532e9
796 //component(o): id = String
797 //component(1): variableId = String
798 //component(2): format = Temp1
799 //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
800 //component(3): = Boolean
801 //component(4): = Boolean
802 //component(5): = Boolean
803 //component(6): = Boolean
804 //component(7): = Boolean
805 //component(8): = Boolean
806 //component(9): = Boolean
807 //component(10): = Boolean
809 // For this reason we've switched the default compression to FastLZ
810 // for now. This is reflected in the input format also.
812 Activator.getDefault().getLog().log(
813 new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
815 } else if (data && lastItem != null) {
816 StreamAccessor sa = history.openStream(lastItemId, "rw");
818 if (sa instanceof BinaryObject) {
819 BinaryObject bo = (BinaryObject) sa;
820 RandomAccessBinary output = bo.getBinary();
822 output.setLength(0L);
823 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
824 if (copiedBytes != entry.getSize()) {
825 System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
828 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
835 } catch (AccessorException e) {
845 * Get request that clears history
846 * @return cleaning request
848 public static WriteRequest clear(final Resource history) {
849 return new WriteRequest() {
851 public void perform(WriteGraph graph) throws DatabaseException {
852 HistoryResource HIS = HistoryResource.getInstance(graph);
853 Layer0 L0 = Layer0.getInstance(graph);
855 // Separate items format
856 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
858 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
859 Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
861 graph.deny(oldItem, HIS.History_Item_Info);
864 Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
866 graph.deny(oldItem, HIS.History_Item_Series);
869 graph.deny(history, L0.ConsistsOf, oldItem);
874 graph.denyValue(history, HIS.History_archive);
880 private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
881 TarArchiveEntry entry = new TarArchiveEntry(entryName);
883 out.putArchiveEntry(entry);
887 * Shall not close the underlying RandomAccessBinary.
889 private static class RABOutputStream extends OutputStream {
891 private final RandomAccessBinary output;
893 public RABOutputStream(RandomAccessBinary output) {
894 this.output = output;
898 public void write(byte[] b, int off, int len) throws IOException {
899 output.write(b, off, len);
903 public void write(int b) throws IOException {
910 * Shall not close the underlying RandomAccessBinary.
912 private static class RABInputStream extends InputStream {
914 private final RandomAccessBinary input;
916 public RABInputStream(RandomAccessBinary input) {
921 public int available() throws IOException {
922 long avail = input.length() - input.position();
923 if ((avail & 0xffffffff00000000L) != 0)
924 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
929 public int read() throws IOException {
931 return input.readUnsignedByte();
932 } catch (EOFException e) {
938 public int read(byte[] b, int off, int len) throws IOException {
939 long available = input.length() - input.position();
942 int l = (int) Math.min(available, (long) len);
943 input.readFully(b, off, l);
948 public long skip(long n) throws IOException {
951 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
952 count += input.skipBytes(l);
959 private static long profile(Long t1, String string) {
961 long t2 = System.nanoTime();
962 long delta = t1 == null ? 0 : t2-t1;
963 System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
969 public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
970 Double t = toBeforeTime;
971 Binding timeBinding = null;
973 Bean[] items = history.getItems();
974 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
976 for (Bean item : items) {
977 String id = (String) item.getField("id");
978 StreamAccessor sa = history.openStream(id, "rw");
980 Stream s = new Stream(sa);
981 timeBinding = s.timeBinding;
982 int currentSize = sa.size();
983 int index = s.binarySearch(timeBinding, toBeforeTime);
984 int newSize = truncationSize(index);
985 if (newSize < currentSize) {
986 //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
990 Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
991 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
992 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
993 boolean isNan = isNaN(prevValue);
995 VariableState vs = state.values.get(id);
996 if (vs != null && vs.value != null && prevValue != null) {
997 vs.value.setValue(vs.value.getBinding(), prevValue);
1002 CollectorState.Item is = state.itemStates.get(id);
1004 is.firstTime = toTime(prevTime);
1005 is.firstValue = toValue(s.valueBinding, prevValue);
1006 is.currentTime = is.firstTime;
1007 is.currentValue = toValue(s.valueBinding, prevValue);
1009 is.isValid = prevValue != null;
1010 is.sum = Double.NaN;
1012 is.ooDeadband = false;
1013 is.firstDisabledTime = Double.NaN;
1014 is.lastDisabledTime = Double.NaN;
1015 is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1024 if (timeBinding != null && state != null) {
1025 state.time.setValue(timeBinding, t);
1032 private static double toTime(Object time) {
1033 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1036 private static MutableVariant toValue(Binding valueBinding, Object value) {
1037 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1040 private static boolean isNaN(Object value) {
1041 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1044 private static int truncationSize(int binarySearchResult) {
1045 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);