1 /*******************************************************************************
2 * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 * Semantum Oy - archive implementation
12 *******************************************************************************/
13 package org.simantics.simulation.history;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Arrays;
20 import java.util.HashMap;
23 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
24 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
25 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
26 import org.eclipse.core.runtime.IStatus;
27 import org.eclipse.core.runtime.Status;
28 import org.simantics.databoard.Bindings;
29 import org.simantics.databoard.accessor.StreamAccessor;
30 import org.simantics.databoard.accessor.binary.BinaryObject;
31 import org.simantics.databoard.accessor.error.AccessorException;
32 import org.simantics.databoard.binding.ArrayBinding;
33 import org.simantics.databoard.binding.Binding;
34 import org.simantics.databoard.binding.RecordBinding;
35 import org.simantics.databoard.binding.error.BindingConstructionException;
36 import org.simantics.databoard.binding.error.BindingException;
37 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
38 import org.simantics.databoard.binding.mutable.MutableVariant;
39 import org.simantics.databoard.binding.mutable.Variant;
40 import org.simantics.databoard.container.DataContainer;
41 import org.simantics.databoard.container.DataContainers;
42 import org.simantics.databoard.serialization.Serializer;
43 import org.simantics.databoard.util.Bean;
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
45 import org.simantics.databoard.util.binary.RandomAccessBinary;
46 import org.simantics.db.ReadGraph;
47 import org.simantics.db.Resource;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.common.request.UniqueRead;
50 import org.simantics.db.common.request.WriteRequest;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ServiceNotFoundException;
53 import org.simantics.db.request.Read;
54 import org.simantics.fastlz.FastLZ;
55 import org.simantics.history.HistoryException;
56 import org.simantics.history.HistoryManager;
57 import org.simantics.history.ItemManager;
58 import org.simantics.history.impl.CollectorImpl;
59 import org.simantics.history.impl.CollectorState;
60 import org.simantics.history.impl.CollectorState.VariableState;
61 import org.simantics.history.util.Stream;
62 import org.simantics.history.util.ValueBand;
63 import org.simantics.history.util.WeightedMedian;
64 import org.simantics.layer0.Layer0;
65 import org.simantics.simulation.Activator;
66 import org.simantics.simulation.ontology.HistoryResource;
67 import org.simantics.simulation.ontology.SimulationResource;
68 import org.simantics.utils.FileUtils;
70 import gnu.trove.map.TObjectLongMap;
71 import gnu.trove.map.hash.TObjectLongHashMap;
72 import net.jpountz.lz4.LZ4BlockInputStream;
73 import net.jpountz.lz4.LZ4BlockOutputStream;
76 * @author Toni Kalajainen
77 * @author Tuukka Lehtonen
79 public class HistoryUtil {
81 private static final boolean DEBUG = false;
82 private static final boolean PROFILE = true;
84 private enum Compression {
89 private static final Compression USED_COMPRESSION = Compression.FLZ;
91 private static final String APPLICATION_X_LZ4 = "application/x-lz4";
92 private static final String APPLICATION_X_FLZ = "application/x-flz";
93 private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
95 private static class ArchivedHistory {
96 public long totalSampleSize = 0L;
97 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
101 * Create request that exports history to graph.
103 * If removeOldUnused is true, items that do not exist in source history
104 * are deleted from graph.
106 * If item already exists in the graph, it is overwritten.
108 * @param history source history
109 * @param g write graph
110 * @param historyResource Instance of HIS.History to write data to
111 * @param removeOldUnused
112 * @param timeShift adjust time values by this value
115 * @return WriteRequest
117 public static WriteRequest export(final HistoryManager history,
118 final Resource historyResource,
119 final boolean removeOldUnused,
120 final Double timeShift,
121 final double from, final double end)
123 return new WriteRequest() {
124 public void perform(WriteGraph graph) throws DatabaseException {
125 HistoryResource HIS = HistoryResource.getInstance(graph);
126 Layer0 L0 = Layer0.getInstance(graph);
127 long totalSampleSize = 0L;
128 long s = System.nanoTime();
131 Bean[] items = history.getItems();
133 profile(null, "exporting " + items.length + " history items to database");
135 ItemManager im = new ItemManager( items );
136 Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
137 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
139 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
140 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
141 if ( bean == null ) continue;
142 String id = (String) bean.getField("id");
144 if (!im.exists(id)) {
145 // There is item in graph that does not exist in source history
146 if ( removeOldUnused ) {
150 // Item already exists, to-be-overwritten
151 oldResourceMap.put(id, oldItem);
155 for (Bean newItem : im.values())
157 //System.out.println("WRITING ITEM: " + newItem);
158 String id = (String) newItem.getField("id");
159 Resource historyItemResource = oldResourceMap.get(id);
160 Resource data = null;
162 if ( historyItemResource==null ) {
163 historyItemResource = graph.newResource();
164 graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
165 graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
166 graph.claimLiteral(historyItemResource, L0.HasName, id);
168 data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
171 Variant v = new Variant(newItem.getBinding(), newItem);
172 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
175 data = graph.newResource();
176 graph.claim(data, L0.InstanceOf, L0.Variant);
177 graph.claim(historyItemResource, HIS.History_Item_Series, data);
179 graph.denyValue(data);
184 StreamAccessor sa = history.openStream(id, "r");
186 //System.out.println("WRITING stream of size=" + sa.size());
187 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
188 Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
189 Integer constantSampleSize = serializer.getConstantSize();
190 ValueBand vb = new ValueBand(sampleBinding);
191 Stream stream = new Stream(sa, sampleBinding);
192 ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
194 //System.out.println("WRITING stream: sample size=" + constantSampleSize);
199 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
200 if ( startIndex < -stream.count() ) break readArray;
201 if ( startIndex<0 ) startIndex = -2-startIndex;
202 if ( startIndex == -1 ) startIndex = 0;
205 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
206 if ( endIndex == -1 ) break readArray;
207 if ( endIndex<0 ) endIndex = -1-endIndex;
208 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
209 if ( endIndex<startIndex ) break readArray;
211 // Write sample count
212 count = endIndex - startIndex + 1;
213 array = arrayBinding.create(count);
214 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
215 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
216 for (int i=0; i<count; i++) {
217 Object sample = sa.get(i+startIndex, sampleBinding);
220 if ( hasTimeShift ) {
221 vb.setSample(sample);
222 if ( vb.hasTime() ) {
223 double newTime = vb.getTimeDouble() + timeShift;
224 vb.setTime(Bindings.DOUBLE, newTime);
226 if ( vb.hasEndTime() ) {
227 double newTime = vb.getEndTimeDouble() + timeShift;
228 vb.setEndTime(Bindings.DOUBLE, newTime);
231 //System.out.println("\t#" + i + ": sample=" + sample);
232 arrayBinding.set(array, i, sample);
235 if (array==null) array = arrayBinding.create();
236 Variant v2 = new Variant(arrayBinding, array);
237 graph.claimValue(data, v2, Bindings.VARIANT);
239 if (constantSampleSize != null) {
240 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
241 //System.out.println("item sample size: " + itemSampleSize);
242 totalSampleSize += itemSampleSize;
243 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
245 } catch (AccessorException e) {
246 throw new DatabaseException(e);
250 } catch (AccessorException e) {
255 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
258 profile(s, "exported " + items.length + " history items to database");
260 } catch (HistoryException e) {
261 throw new DatabaseException( e );
262 } catch (BindingException e) {
263 throw new DatabaseException( e );
264 } catch (ServiceNotFoundException e) {
265 throw new DatabaseException( e );
272 * Create request that exports history to graph.
274 * If item already exists in the graph, it is overwritten.
278 * @param collectorState
279 * complete dump of the source history collector state or
280 * <code>null</code> to skip collector state saving
281 * @param historyResource
282 * Instance of HIS.History to write data to
284 * adjust time values by this value
289 * @return WriteRequest
291 public static WriteRequest exportArchive(
292 final HistoryManager history,
293 final Bean collectorState,
294 final Resource historyResource,
295 final Double timeShift,
299 return new WriteRequest() {
300 public void perform(WriteGraph graph) throws DatabaseException {
301 HistoryResource HIS = HistoryResource.getInstance(graph);
302 Layer0 L0 = Layer0.getInstance(graph);
304 long s = System.nanoTime();
306 profile(null, "archiving history items to database");
308 // Remove all possibly existing old items in the history.
309 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
310 if (graph.isInstanceOf(entity, HIS.History_Item))
311 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
313 // Create new literal for history archival
314 //graph.deny(historyResource, HIS.History_archive);
315 graph.denyValue(historyResource, HIS.History_archive);
316 Resource archiveLiteral = graph.newResource();
317 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
318 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
320 OutputStream closeable = null;
322 RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
326 ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
329 rab.writeInt((int)(rab.length() - 4L));
331 graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
334 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
336 } catch (HistoryException e) {
337 throw new DatabaseException( e );
338 } catch (IOException e) {
339 throw new DatabaseException( e );
341 FileUtils.uncheckedClose(closeable);
347 private static ArchivedHistory exportCompressedArchive(
348 final HistoryManager history,
349 final Bean collectorState,
350 final Resource historyResource,
351 final Double timeShift,
354 RandomAccessBinary rab)
355 throws IOException, HistoryException
357 switch (USED_COMPRESSION) {
359 return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
361 return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
363 throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
367 private static ArchivedHistory exportArchiveLZ4(
368 final HistoryManager history,
369 final Bean collectorState,
370 final Resource historyResource,
371 final Double timeShift,
374 RandomAccessBinary rab)
375 throws IOException, HistoryException
377 OutputStream closeable = null;
379 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
380 closeable = new RABOutputStream( rab );
381 closeable = new LZ4BlockOutputStream( closeable );
383 ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
391 FileUtils.uncheckedClose(closeable);
395 @SuppressWarnings("resource")
396 private static ArchivedHistory exportArchiveFLZ(
397 final HistoryManager history,
398 final Bean collectorState,
399 final Resource historyResource,
400 final Double timeShift,
403 RandomAccessBinary rab)
404 throws IOException, HistoryException
406 OutputStream closeable = null;
408 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
409 closeable = new RABOutputStream( rab );
410 closeable = FastLZ.write( closeable );
412 ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
420 FileUtils.uncheckedClose(closeable);
425 * Store the specified history manager's contents into a single archive
428 * @param outputStream
429 * the output stream where the history archive is written
432 * @param collectorState
433 * complete state of the history collector at the time of saving
434 * or <code>null</code> to not save any state
436 * adjust time values by this value
441 * @return ArchivedHistory instance describing the created archive
442 * @throws HistoryException
443 * when there's a problem with reading the history data from the
445 * @throws IOException
446 * when there's a problem writing the resulting history archive
449 private static ArchivedHistory exportArchive(
450 OutputStream outputStream,
451 HistoryManager history,
456 throws IOException, HistoryException
458 ArchivedHistory result = new ArchivedHistory();
460 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
461 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
463 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
464 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
467 if (collectorState != null) {
468 // Write complete collector state into the archive
470 System.out.println("WRITING collector state: " + collectorState);
471 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
472 putNextEntry(out, "state", serializedCollectorState.length);
473 out.write(serializedCollectorState);
474 out.closeArchiveEntry();
477 ItemManager im = new ItemManager( history.getItems() );
479 for (Bean item : im.values()) {
481 System.out.println("STORING ITEM: " + item);
482 String id = (String) item.getField("id");
484 // Write data stream metadata
485 byte[] metadata = beanSerializer.serialize( item );
486 putNextEntry(out, id + ".txt", metadata.length);
488 out.closeArchiveEntry();
490 System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
493 // Write data stream as a separate entry
494 StreamAccessor sa = history.openStream(id, "r");
497 System.out.println("STREAM SIZE=" + sa.size());
498 RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
499 Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
500 Integer constantSampleSize = sampleSerializer.getConstantSize();
502 if (constantSampleSize == null) {
503 // Variable length samples - no support for now.
504 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
507 ValueBand vb = new ValueBand(sampleBinding);
508 Stream stream = new Stream(sa, sampleBinding);
510 System.out.println("WRITING stream: sample size=" + constantSampleSize);
514 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
515 if ( startIndex < -stream.count() ) break readArray;
516 if ( startIndex<0 ) startIndex = -2-startIndex;
517 if ( startIndex == -1 ) startIndex = 0;
520 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
521 if ( endIndex == -1 ) break readArray;
522 if ( endIndex<0 ) endIndex = -1-endIndex;
523 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
524 if ( endIndex<startIndex ) break readArray;
526 // Written sample count
527 int count = endIndex - startIndex + 1;
529 System.out.println("WRITTEN sample count=" + count);
531 long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
533 System.out.println("saved samples size in bytes: " + savedSamplesSize);
534 putNextEntry(out, id + ".data", savedSamplesSize);
536 for (int i=0; i<count; i++) {
537 Object sample = sa.get(i+startIndex, sampleBinding);
540 if ( hasTimeShift ) {
541 vb.setSample(sample);
542 if ( vb.hasTime() ) {
543 double newTime = vb.getTimeDouble() + timeShift;
544 vb.setTime(Bindings.DOUBLE, newTime);
546 if ( vb.hasEndTime() ) {
547 double newTime = vb.getEndTimeDouble() + timeShift;
548 vb.setEndTime(Bindings.DOUBLE, newTime);
551 //System.out.println("\t#" + i + ": sample=" + sample);
552 sampleSerializer.serialize(dataOutput, sample);
554 out.closeArchiveEntry();
556 result.itemSizes.put(item, savedSamplesSize);
557 result.totalSampleSize += savedSamplesSize;
560 } catch (AccessorException e) {
561 throw new IOException(e);
565 } catch (AccessorException e) {
572 } catch (BindingException e) {
573 throw new HistoryException(e);
580 * Import all history items from graph into a history manager.
582 * @param r history resource or initial condition resource source HIS.History resource
583 * @param history destination history
584 * @return read request that always returns a HistoryImportResult instance
586 public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
588 return new UniqueRead<HistoryImportResult>() {
590 public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
591 HistoryImportResult result = new HistoryImportResult();
593 HistoryResource HIS = HistoryResource.getInstance(graph);
594 SimulationResource SIM = SimulationResource.getInstance(graph);
595 Resource historyResource = r;
597 if (!graph.isInstanceOf(historyResource, HIS.History))
598 historyResource = graph.getPossibleObject(r, SIM.State_History);
599 if (historyResource == null)
601 if (!graph.isInstanceOf(historyResource, HIS.History))
604 long s = System.nanoTime();
606 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
607 if (archive == null) {
609 profile(null, "importing history items from old database format to disk workarea");
610 importHistoryItems(graph, historyResource, history);
614 profile(null, "importing history items from archived format to disk workarea");
615 importHistoryArchive(graph, historyResource, archive, history, result);
616 } catch (IOException e) {
617 throw new DatabaseException(e);
618 } catch (HistoryException e) {
619 throw new DatabaseException(e);
624 profile(s, "imported history items from database to disk workarea");
632 * Import all history items from graph into a history manager.
634 * @param r history resource or initial condition resource source HIS.History resource
635 * @param history destination history
636 * @return <code>true</code> if successful
638 private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
640 HistoryResource HIS = HistoryResource.getInstance(graph);
641 Layer0 L0 = Layer0.getInstance(graph);
644 for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
645 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
648 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
649 String id = (String) bean.getFieldUnchecked("id");
650 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
651 Binding arrayBinding = Bindings.getBinding( array.getClass() );
652 history.modify(bean);
653 StreamAccessor sa = history.openStream(id, "rw");
655 sa.setValue(arrayBinding, array);
659 } catch (AccessorException e) {
665 } catch (AccessorException e) {
666 throw new DatabaseException(e);
667 } catch (BindingConstructionException e) {
668 throw new DatabaseException(e);
669 } catch (HistoryException e) {
670 throw new DatabaseException(e);
675 * Import all history items from database archive into a history manager.
678 * @param historyResource
681 * @return <code>true</code> if successful
682 * @throws DatabaseException
684 @SuppressWarnings("resource")
685 private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
686 throws DatabaseException, IOException, HistoryException
688 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
690 DataContainer dc = DataContainers.readHeader(rab);
693 System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
695 if (APPLICATION_X_LZ4.equals(dc.format)) {
696 if (dc.version == ARCHIVE_VERSION_1) {
697 InputStream in = new RABInputStream(rab);
698 //in = new BufferedInputStream(in);
699 in = new LZ4BlockInputStream(in);
700 return extractHistoryArchiveTar(graph, history, in, result);
702 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
703 if (dc.version == ARCHIVE_VERSION_1) {
704 InputStream in = null;
706 in = new RABInputStream(rab);
707 in = FastLZ.read(in);
708 return extractHistoryArchiveTar(graph, history, in, result);
710 FileUtils.uncheckedClose(in);
714 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
718 * Import all history items from graph into a history manager
720 * @param graph database access
721 * @param history destination history
722 * @param rab the archive to extract from
723 * @return <code>true</code> if successful
724 * @throws DatabaseException
726 private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
727 throws DatabaseException, IOException, HistoryException
729 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
730 TarArchiveInputStream tar = new TarArchiveInputStream(in);
732 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
734 Bean lastItem = null;
735 String lastItemId = null;
737 TarArchiveEntry entry = tar.getNextTarEntry();
741 String name = entry.getName();
742 boolean state = name.equals("state");
743 boolean metadata = name.endsWith(".txt");
744 boolean data = name.endsWith(".data");
747 if (result != null) {
748 byte[] st = new byte[(int) entry.getSize()];
750 result.collectorState = (Bean) beanSerializer.deserialize(st);
752 System.out.println("READ collector state: " + result.collectorState);
754 tar.skip(entry.getSize());
756 } else if (metadata) {
757 byte[] md = new byte[(int) entry.getSize()];
760 System.out.println("READING Item metadata: " + name);
761 System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
765 Bean bean = (Bean) beanSerializer.deserialize(md);
767 System.out.println("READ Item metadata: " + bean);
768 history.modify(bean);
770 lastItemId = (String) bean.getFieldUnchecked("id");
771 } catch (ClassFormatError e) {
772 // This is here because LZ4BlockInput/OutputStream seemed to
773 // be causing weird deserialization errors to occur for
774 // single data items when trying to deserialize beans. The
775 // reason being that the serialized data that was read only
776 // contains the first 512 bytes of the data. After that all
777 // data is zeros, which causes things like this to happen in
779 //signature: rsstzzzzzzzze_b7b532e9
780 //component(o): id = String
781 //component(1): variableId = String
782 //component(2): format = Temp1
783 //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
784 //component(3): = Boolean
785 //component(4): = Boolean
786 //component(5): = Boolean
787 //component(devil): = Boolean
788 //component(7): = Boolean
789 //component(music): = Boolean
790 //component(9): = Boolean
791 //component(10): = Boolean
793 // For this reason we've switched the default compression to FastLZ
794 // for now. This is reflected in the input format also.
796 Activator.getDefault().getLog().log(
797 new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
799 } else if (data && lastItem != null) {
800 StreamAccessor sa = history.openStream(lastItemId, "rw");
802 if (sa instanceof BinaryObject) {
803 BinaryObject bo = (BinaryObject) sa;
804 RandomAccessBinary output = bo.getBinary();
806 output.setLength(0L);
807 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
808 if (copiedBytes != entry.getSize()) {
809 System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
812 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
819 } catch (AccessorException e) {
829 * Get request that clears history
830 * @return cleaning request
832 public static WriteRequest clear(final Resource history) {
833 return new WriteRequest() {
835 public void perform(WriteGraph graph) throws DatabaseException {
836 HistoryResource HIS = HistoryResource.getInstance(graph);
837 Layer0 L0 = Layer0.getInstance(graph);
839 // Separate items format
840 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
842 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
843 Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
845 graph.deny(oldItem, HIS.History_Item_Info);
848 Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
850 graph.deny(oldItem, HIS.History_Item_Series);
853 graph.deny(history, L0.ConsistsOf, oldItem);
858 graph.denyValue(history, HIS.History_archive);
864 private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
865 TarArchiveEntry entry = new TarArchiveEntry(entryName);
867 out.putArchiveEntry(entry);
871 * Shall not close the underlying RandomAccessBinary.
873 private static class RABOutputStream extends OutputStream {
875 private final RandomAccessBinary output;
877 public RABOutputStream(RandomAccessBinary output) {
878 this.output = output;
882 public void write(byte[] b, int off, int len) throws IOException {
883 output.write(b, off, len);
887 public void write(int b) throws IOException {
894 * Shall not close the underlying RandomAccessBinary.
896 private static class RABInputStream extends InputStream {
898 private final RandomAccessBinary input;
900 public RABInputStream(RandomAccessBinary input) {
905 public int available() throws IOException {
906 long avail = input.length() - input.position();
907 if ((avail & 0xffffffff00000000L) != 0)
908 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
913 public int read() throws IOException {
915 return input.readUnsignedByte();
916 } catch (EOFException e) {
922 public int read(byte[] b, int off, int len) throws IOException {
923 long available = input.length() - input.position();
926 int l = (int) Math.min(available, (long) len);
927 input.readFully(b, off, l);
932 public long skip(long n) throws IOException {
935 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
936 count += input.skipBytes(l);
943 private static long profile(Long t1, String string) {
945 long t2 = System.nanoTime();
946 long delta = t1 == null ? 0 : t2-t1;
947 System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
953 public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
954 Double t = toBeforeTime;
955 Binding timeBinding = null;
957 Bean[] items = history.getItems();
958 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
960 for (Bean item : items) {
961 String id = (String) item.getField("id");
962 StreamAccessor sa = history.openStream(id, "rw");
964 Stream s = new Stream(sa);
965 timeBinding = s.timeBinding;
966 int currentSize = sa.size();
967 int index = s.binarySearch(timeBinding, toBeforeTime);
968 int newSize = truncationSize(index);
969 if (newSize < currentSize) {
970 //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
974 Object prevTime = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
975 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
976 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
977 boolean isNan = isNaN(prevValue);
979 VariableState vs = state.values.get(id);
980 if (vs != null && vs.value != null && prevValue != null) {
981 vs.value.setValue(vs.value.getBinding(), prevValue);
986 CollectorState.Item is = state.itemStates.get(id);
988 is.firstTime = toTime(prevTime);
989 is.firstValue = toValue(s.valueBinding, prevValue);
990 is.currentTime = is.firstTime;
991 is.currentValue = toValue(s.valueBinding, prevValue);
993 is.isValid = prevValue != null;
996 is.ooDeadband = false;
997 is.firstDisabledTime = Double.NaN;
998 is.lastDisabledTime = Double.NaN;
999 is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1008 if (timeBinding != null && state != null) {
1009 state.time.setValue(timeBinding, t);
1016 private static double toTime(Object time) {
1017 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1020 private static MutableVariant toValue(Binding valueBinding, Object value) {
1021 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1024 private static boolean isNaN(Object value) {
1025 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1028 private static int truncationSize(int binarySearchResult) {
1029 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);