]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
HistoryUtil API for exporting subscription history data archive file
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - archive implementation, gitlab simantics/platform#227
12  *******************************************************************************/
13 package org.simantics.simulation.history;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.nio.file.Path;
20 import java.util.Arrays;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
25 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
26 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
27 import org.eclipse.core.runtime.IStatus;
28 import org.eclipse.core.runtime.Status;
29 import org.simantics.databoard.Bindings;
30 import org.simantics.databoard.accessor.StreamAccessor;
31 import org.simantics.databoard.accessor.binary.BinaryObject;
32 import org.simantics.databoard.accessor.error.AccessorException;
33 import org.simantics.databoard.binding.ArrayBinding;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
39 import org.simantics.databoard.binding.mutable.MutableVariant;
40 import org.simantics.databoard.binding.mutable.Variant;
41 import org.simantics.databoard.container.DataContainer;
42 import org.simantics.databoard.container.DataContainers;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.util.Bean;
45 import org.simantics.databoard.util.binary.BinaryFile;
46 import org.simantics.databoard.util.binary.OutputStreamWriteable;
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
48 import org.simantics.db.ReadGraph;
49 import org.simantics.db.Resource;
50 import org.simantics.db.WriteGraph;
51 import org.simantics.db.common.request.UniqueRead;
52 import org.simantics.db.common.request.WriteRequest;
53 import org.simantics.db.exception.DatabaseException;
54 import org.simantics.db.exception.ServiceNotFoundException;
55 import org.simantics.db.request.Read;
56 import org.simantics.fastlz.FastLZ;
57 import org.simantics.history.HistoryException;
58 import org.simantics.history.HistoryManager;
59 import org.simantics.history.ItemManager;
60 import org.simantics.history.impl.CollectorImpl;
61 import org.simantics.history.impl.CollectorState;
62 import org.simantics.history.impl.CollectorState.VariableState;
63 import org.simantics.history.util.Stream;
64 import org.simantics.history.util.ValueBand;
65 import org.simantics.history.util.WeightedMedian;
66 import org.simantics.layer0.Layer0;
67 import org.simantics.simulation.Activator;
68 import org.simantics.simulation.ontology.HistoryResource;
69 import org.simantics.simulation.ontology.SimulationResource;
70 import org.simantics.utils.FileUtils;
71
72 import gnu.trove.map.TObjectLongMap;
73 import gnu.trove.map.hash.TObjectLongHashMap;
74 import net.jpountz.lz4.LZ4BlockInputStream;
75 import net.jpountz.lz4.LZ4BlockOutputStream;
76
77 /**
78  * @author Toni Kalajainen
79  * @author Tuukka Lehtonen
80  */
81 public class HistoryUtil {
82
83         private static final boolean DEBUG = false;
84         private static final boolean PROFILE = true;
85
86         public enum Compression {
87                 FLZ,
88                 LZ4,
89                 NONE,
90         }
91
92         private static final Compression USED_COMPRESSION = Compression.FLZ;
93
94         private static final String APPLICATION_X_TAR = "application/x-tar";
95         private static final String APPLICATION_X_LZ4 = "application/x-lz4";
96         private static final String APPLICATION_X_FLZ = "application/x-flz";
97         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
98
99         public static class ArchivedHistory {
100                 public long totalSampleSize = 0L;
101                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
102         }
103
104         /**
105          * Create request that exports history to graph.
106          * 
107          * If removeOldUnused is true, items that do not exist in source history
108          * are deleted from graph.
109          * 
110          * If item already exists in the graph, it is overwritten.
111          * 
112          * @param history source history
113          * @param g write graph
114          * @param historyResource Instance of HIS.History to write data to
115          * @param removeOldUnused
116          * @param timeShift adjust time values by this value 
117          * @param from time
118          * @param end time
119          * @return WriteRequest
120          */
121         public static WriteRequest export(final HistoryManager history, 
122                         final Resource historyResource,
123                         final boolean removeOldUnused,
124                         final Double timeShift, 
125                         final double from, final double end) 
126         {
127                 return new WriteRequest() {
128                         public void perform(WriteGraph graph) throws DatabaseException {
129                                 HistoryResource HIS = HistoryResource.getInstance(graph);
130                                 Layer0 L0 = Layer0.getInstance(graph);
131                                 long totalSampleSize = 0L;
132                                 long s = System.nanoTime();
133
134                                 try {
135                                         Bean[] items = history.getItems();
136                                         if (PROFILE)
137                                                 profile(null, "exporting " + items.length + " history items to database");
138
139                                         ItemManager im = new ItemManager( items );
140                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
141                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
142                                         {
143                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
144                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
145                                                 if ( bean == null ) continue;
146                                                 String id = (String) bean.getField("id");
147                                                 
148                                                 if (!im.exists(id)) {
149                                                         // There is item in graph that does not exist in source history
150                                                         if ( removeOldUnused ) {
151                                                                 graph.deny(oldItem);
152                                                         }
153                                                 } else {
154                                                         // Item already exists, to-be-overwritten
155                                                         oldResourceMap.put(id, oldItem);
156                                                 }
157                                         }
158                                         
159                                         for (Bean newItem : im.values())
160                                         {
161                                                 //System.out.println("WRITING ITEM: " + newItem);
162                                                 String id = (String) newItem.getField("id");
163                                                 Resource historyItemResource = oldResourceMap.get(id);
164                                                 Resource data = null;
165
166                                                 if ( historyItemResource==null ) {
167                                                         historyItemResource = graph.newResource();
168                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
169                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
170                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);
171                                                 } else {
172                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
173                                                 }
174                                                 
175                                                 Variant v = new Variant(newItem.getBinding(), newItem);
176                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
177                                                 
178                                                 if (data == null) {
179                                                         data = graph.newResource();
180                                                         graph.claim(data, L0.InstanceOf, L0.Variant);
181                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);
182                                                 } else {
183                                                         graph.denyValue(data);
184                                                 }
185                                                 
186                                                 
187                                                 // Write stream
188                                                 StreamAccessor sa = history.openStream(id, "r");
189                                                 try {
190                                                         //System.out.println("WRITING stream of size=" + sa.size());
191                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
192                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
193                                                         Integer constantSampleSize = serializer.getConstantSize();
194                                                         ValueBand vb = new ValueBand(sampleBinding);
195                                                         Stream stream = new Stream(sa, sampleBinding);
196                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
197                                                         Object array = null;
198                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);
199
200                                                         int count = 0;
201                                                         readArray: {
202                                                                 // Start index
203                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            
204                                                                 if ( startIndex < -stream.count() ) break readArray;
205                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;
206                                                                 if ( startIndex == -1 ) startIndex = 0;
207                                                                 
208                                                                 // End index
209                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
210                                                                 if ( endIndex == -1 ) break readArray;
211                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;
212                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
213                                                                 if ( endIndex<startIndex ) break readArray;
214                                                         
215                                                                 // Write sample count
216                                                                 count = endIndex - startIndex + 1;
217                                                                 array = arrayBinding.create(count);
218                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
219                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
220                                                                 for (int i=0; i<count; i++) {
221                                                                         Object sample = sa.get(i+startIndex, sampleBinding);
222                                                                         
223                                                                         // Adjust time
224                                                                         if ( hasTimeShift ) {
225                                                                                 vb.setSample(sample);
226                                                                                 if ( vb.hasTime() ) {
227                                                                                         double newTime = vb.getTimeDouble() + timeShift;
228                                                                                         vb.setTime(Bindings.DOUBLE, newTime);
229                                                                                 }
230                                                                                 if ( vb.hasEndTime() ) {
231                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;
232                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);
233                                                                                 }
234                                                                         }
235                                                                         //System.out.println("\t#" + i + ": sample=" + sample);
236                                                                         arrayBinding.set(array, i, sample);
237                                                                 }
238                                                         }
239                                                         if (array==null) array = arrayBinding.create();
240                                                         Variant v2 = new Variant(arrayBinding, array);
241                                                         graph.claimValue(data, v2, Bindings.VARIANT);
242
243                                                         if (constantSampleSize != null) {
244                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
245                                                                 //System.out.println("item sample size: " + itemSampleSize);
246                                                                 totalSampleSize += itemSampleSize;
247                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
248                                                         }
249                                                 } catch (AccessorException e) {
250                                                         throw new DatabaseException(e);
251                                                 } finally {
252                                                         try {
253                                                                 sa.close();
254                                                         } catch (AccessorException e) {
255                                                         }
256                                                 }
257                                         }
258
259                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
260
261                                         if (PROFILE)
262                                                 profile(s, "exported " + items.length + " history items to database");
263
264                                 } catch (HistoryException e) {
265                                         throw new DatabaseException( e );
266                                 } catch (BindingException e) {
267                                         throw new DatabaseException( e );
268                                 } catch (ServiceNotFoundException e) {
269                                         throw new DatabaseException( e );
270                                 }
271                         }
272                 };
273         }
274
275         /**
276          * Create request that exports history to graph.
277          * 
278          * If item already exists in the graph, it is overwritten.
279          * 
280          * @param history
281          *            source history
282          * @param collectorState
283          *            complete dump of the source history collector state or
284          *            <code>null</code> to skip collector state saving
285          * @param historyResource
286          *            Instance of HIS.History to write data to
287          * @param timeShift
288          *            adjust time values by this value
289          * @param from
290          *            time
291          * @param end
292          *            time
293          * @return WriteRequest
294          */
295         public static WriteRequest exportArchive(
296                         final HistoryManager history,
297                         final Bean collectorState,
298                         final Resource historyResource,
299                         final Double timeShift, 
300                         final double from,
301                         final double end) 
302         {
303                 return new WriteRequest() {
304                         public void perform(WriteGraph graph) throws DatabaseException {
305                                 HistoryResource HIS = HistoryResource.getInstance(graph);
306                                 Layer0 L0 = Layer0.getInstance(graph);
307
308                                 long s = System.nanoTime();
309                                 if (PROFILE)
310                                         profile(null, "archiving history items to database");
311
312                                 // Remove all possibly existing old items in the history.
313                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
314                                         if (graph.isInstanceOf(entity, HIS.History_Item))
315                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
316
317                                 // Create new literal for history archival
318                                 //graph.deny(historyResource, HIS.History_archive);
319                                 graph.denyValue(historyResource, HIS.History_archive);
320                                 Resource archiveLiteral = graph.newResource();
321                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
322                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
323
324                                 OutputStream closeable = null;
325                                 try {
326                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
327                                         rab.position(0);
328                                         rab.skipBytes(4);
329
330                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
331
332                                         rab.position(0L);
333                                         rab.writeInt((int)(rab.length() - 4L));
334
335                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
336
337                                         if (PROFILE)
338                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
339
340                                 } catch (HistoryException e) {
341                                         throw new DatabaseException( e );
342                                 } catch (IOException e) {
343                                         throw new DatabaseException( e );
344                                 } finally {
345                                         FileUtils.uncheckedClose(closeable);
346                                 }
347                         }
348                 };
349         }
350
351         public static ArchivedHistory exportCompressedArchive(
352                         HistoryManager history,
353                         Bean collectorState,
354                         Double timeShift, 
355                         double from,
356                         double end,
357                         RandomAccessBinary rab)
358                                         throws IOException, HistoryException
359         {
360                 return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
361         }
362
363         public static ArchivedHistory exportCompressedArchive(
364                         HistoryManager history,
365                         Bean collectorState,
366                         Double timeShift, 
367                         double from,
368                         double end,
369                         Compression compression,
370                         RandomAccessBinary rab)
371                                         throws IOException, HistoryException
372         {
373                 switch (compression) {
374                         case FLZ:
375                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
376                                 return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
377                         case LZ4:
378                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
379                                 return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
380                         case NONE:
381                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
382                                 return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
383                         default:
384                                 throw new UnsupportedOperationException("Unsupported compression: " + compression);
385                 }
386         }
387
388         private static ArchivedHistory exportArchive(
389                         HistoryManager history,
390                         Bean collectorState,
391                         Double timeShift, 
392                         double from,
393                         double end,
394                         OutputStream out)
395                                         throws IOException, HistoryException
396         {
397                 try (OutputStream os = out) {
398                         ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
399                         os.flush();
400                         return archive;
401                 }
402         }
403
404         /**
405          * Store the specified history manager's contents into a single archive
406          * file.
407          * 
408          * @param outputStream
409          *            the output stream where the history archive is written
410          * @param history
411          *            source history
412          * @param collectorState
413          *            complete state of the history collector at the time of saving
414          *            or <code>null</code> to not save any state
415          * @param timeShift
416          *            adjust time values by this value
417          * @param from
418          *            time
419          * @param end
420          *            time
421          * @return ArchivedHistory instance describing the created archive
422          * @throws HistoryException
423          *             when there's a problem with reading the history data from the
424          *             history work area
425          * @throws IOException
426          *             when there's a problem writing the resulting history archive
427          *             file
428          */
429         private static ArchivedHistory exportArchive(
430                         OutputStream outputStream,
431                         HistoryManager history,
432                         Bean collectorState,
433                         Double timeShift,
434                         double from,
435                         double end)
436                                         throws IOException, HistoryException
437         {
438                 ArchivedHistory result = new ArchivedHistory();
439
440                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
441                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
442
443                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
444                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
445
446                 try {
447                         if (collectorState != null) {
448                                 // Write complete collector state into the archive
449                                 if (DEBUG)
450                                         System.out.println("WRITING collector state: " + collectorState);
451                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
452                                 putNextEntry(out, "state", serializedCollectorState.length);
453                                 out.write(serializedCollectorState);
454                                 out.closeArchiveEntry();
455                         }
456
457                         ItemManager im = new ItemManager( history.getItems() );
458
459                         for (Bean item : im.values()) {
460                                 if (DEBUG)
461                                         System.out.println("STORING ITEM: " + item);
462                                 String id = (String) item.getField("id");
463
464                                 // Write data stream metadata
465                                 byte[] metadata = beanSerializer.serialize( item );
466                                 putNextEntry(out, id + ".txt", metadata.length);
467                                 out.write(metadata);
468                                 out.closeArchiveEntry();
469                                 if (DEBUG)
470                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
471
472
473                                 // Write data stream as a separate entry
474                                 StreamAccessor sa = history.openStream(id, "r");
475                                 try {
476                                         if (DEBUG)
477                                                 System.out.println("STREAM SIZE=" + sa.size());
478                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
479                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
480                                         Integer constantSampleSize = sampleSerializer.getConstantSize();
481
482                                         if (constantSampleSize == null) {
483                                                 // Variable length samples - no support for now.
484                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
485                                                 continue;
486                                         } else {
487                                                 ValueBand vb = new ValueBand(sampleBinding);
488                                                 Stream stream = new Stream(sa, sampleBinding);
489                                                 if (DEBUG)
490                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);
491
492                                                 readArray: {
493                                                         // Start index
494                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
495                                                         if ( startIndex < -stream.count() ) break readArray;
496                                                         if ( startIndex<0 ) startIndex = -2-startIndex;
497                                                         if ( startIndex == -1 ) startIndex = 0;
498
499                                                         // End index
500                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
501                                                         if ( endIndex == -1 ) break readArray;
502                                                         if ( endIndex<0 ) endIndex = -1-endIndex;
503                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;
504                                                         if ( endIndex<startIndex ) break readArray;
505
506                                                         // Written sample count
507                                                         int count = endIndex - startIndex + 1;
508                                                         if (DEBUG)
509                                                                 System.out.println("WRITTEN sample count=" + count);
510
511                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
512                                                         if (DEBUG)
513                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);
514                                                         putNextEntry(out, id + ".data", savedSamplesSize);
515
516                                                         for (int i=0; i<count; i++) {
517                                                                 Object sample = sa.get(i+startIndex, sampleBinding);
518
519                                                                 // Adjust time
520                                                                 if ( hasTimeShift ) {
521                                                                         vb.setSample(sample);
522                                                                         if ( vb.hasTime() ) {
523                                                                                 double newTime = vb.getTimeDouble() + timeShift;
524                                                                                 vb.setTime(Bindings.DOUBLE, newTime);
525                                                                         }
526                                                                         if ( vb.hasEndTime() ) {
527                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;
528                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);
529                                                                         }
530                                                                 }
531                                                                 //System.out.println("\t#" + i + ": sample=" + sample);
532                                                                 sampleSerializer.serialize(dataOutput, sample);
533                                                         }
534                                                         out.closeArchiveEntry();
535
536                                                         result.itemSizes.put(item, savedSamplesSize);
537                                                         result.totalSampleSize += savedSamplesSize;
538                                                 }
539                                         }
540                                 } catch (AccessorException e) {
541                                         throw new IOException(e);
542                                 } finally {
543                                         try {
544                                                 sa.close();
545                                         } catch (AccessorException e) {
546                                         }
547                                 }
548                                 
549                         }
550
551                         return result;
552                 } catch (BindingException e) {
553                         throw new HistoryException(e);
554                 } finally {
555                         out.finish();
556                 }
557         }
558
559         /**
560          * Import all history items from graph into a history manager.
561          * 
562          * @param r history resource or initial condition resource source HIS.History resource
563          * @param history destination history
564          * @return read request that always returns a HistoryImportResult instance
565          */
566         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
567         {
568                 return new UniqueRead<HistoryImportResult>() {
569                         @Override
570                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
571                                 HistoryImportResult result = new HistoryImportResult();
572
573                                 HistoryResource HIS = HistoryResource.getInstance(graph);
574                                 SimulationResource SIM = SimulationResource.getInstance(graph);
575                                 Resource historyResource = r;
576
577                                 if (!graph.isInstanceOf(historyResource, HIS.History))
578                                         historyResource = graph.getPossibleObject(r, SIM.State_History);
579                                 if (historyResource == null)
580                                         return result;
581                                 if (!graph.isInstanceOf(historyResource, HIS.History))
582                                         return result;
583
584                                 long s = System.nanoTime();
585
586                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
587                                 if (archive == null) {
588                                         if (PROFILE)
589                                                 profile(null, "importing history items from old database format to disk workarea");
590                                         importHistoryItems(graph, historyResource, history);
591                                 } else {
592                                         try {
593                                                 if (PROFILE)
594                                                         profile(null, "importing history items from archived format to disk workarea");
595                                                 importHistoryArchive(graph, archive, history, result);
596                                         } catch (IOException e) {
597                                                 throw new DatabaseException(e);
598                                         } catch (HistoryException e) {
599                                                 throw new DatabaseException(e);
600                                         }
601                                 }
602
603                                 if (PROFILE)
604                                         profile(s, "imported history items from database to disk workarea");
605
606                                 return result;
607                         }
608                 };
609         }
610
611         /**
612          * Import all history items from graph into a history manager.
613          * 
614          * @param r history resource or initial condition resource source HIS.History resource
615          * @param history destination history
616          * @return <code>true</code> if successful
617          */
618         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
619         {
620                 HistoryResource HIS = HistoryResource.getInstance(graph);
621                 Layer0 L0 = Layer0.getInstance(graph);
622
623                 try {
624                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
625                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
626                                         continue;
627
628                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
629                                 String id = (String) bean.getFieldUnchecked("id");
630                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
631                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );
632                                 history.modify(bean);
633                                 StreamAccessor sa = history.openStream(id, "rw");
634                                 try {
635                                         sa.setValue(arrayBinding, array);
636                                 } finally {
637                                         try {
638                                                 sa.close();
639                                         } catch (AccessorException e) {
640                                         }
641                                 }
642                         }
643
644                         return true;
645                 } catch (AccessorException e) {
646                         throw new DatabaseException(e);
647                 } catch (BindingConstructionException e) {
648                         throw new DatabaseException(e);
649                 } catch (HistoryException e) {
650                         throw new DatabaseException(e);
651                 }
652         }
653
654         /**
655          * @param history
656          * @param path
657          * @return
658          * @throws HistoryException 
659          * @throws IOException 
660          */
661         public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
662                 HistoryImportResult result = new HistoryImportResult();
663                 try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
664                         importHistoryArchive(history, rab, result);
665                         return result;
666                 }
667         }
668
669         /**
670          * Import all history items from database archive into a history manager.
671          * 
672          * @param graph
673          * @param historyResource
674          * @param archive 
675          * @param history
676          * @return <code>true</code> if successful
677          * @throws DatabaseException 
678          */
679         private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
680                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
681                 // This is here because the data is encoded as an L0.ByteArray
682                 // in the database which means the first integer describes the
683                 // length of the byte array. We skip that length data here.
684                 rab.position(4);
685                 return importHistoryArchive(history, rab, result);
686         }
687
688         /**
689          * Import all history items from database archive into a history manager.
690          * 
691          * @param graph
692          * @param historyResource
693          * @param archive 
694          * @param history
695          * @return <code>true</code> if successful
696          * @throws DatabaseException 
697          */
698         private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
699                         throws IOException, HistoryException
700         {
701                 DataContainer dc = DataContainers.readHeader(rab);
702
703                 if (DEBUG)
704                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
705
706                 if (dc.version == ARCHIVE_VERSION_1) {
707                         if (APPLICATION_X_LZ4.equals(dc.format)) {
708                                 return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
709                         } else if (APPLICATION_X_FLZ.equals(dc.format)) {
710                                 return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
711                         } else if (APPLICATION_X_TAR.equals(dc.format)) {
712                                 return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
713                         }
714                         throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
715                 } else {
716                         throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
717                 }
718         }
719
720         private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
721                 try (InputStream _in = in) {
722                         return extractHistoryArchiveTar(history, in, result);
723                 }
724         }
725
726         /**
727          * Import all history items from graph into a history manager
728          * 
729          * @param graph database access
730          * @param history destination history
731          * @param rab the archive to extract from
732          * @return <code>true</code> if successful
733          * @throws DatabaseException 
734          */
735         private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
736                         throws IOException, HistoryException
737         {
738                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
739                 TarArchiveInputStream tar = new TarArchiveInputStream(in);
740
741                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
742
743                 Bean lastItem = null;
744                 String lastItemId = null;
745                 while (true) {
746                         TarArchiveEntry entry = tar.getNextTarEntry();
747                         if (entry == null)
748                                 break;
749
750                         String name = entry.getName();
751                         boolean state = name.equals("state");
752                         boolean metadata = name.endsWith(".txt");
753                         boolean data = name.endsWith(".data");
754
755                         if (state) {
756                                 if (result != null) {
757                                         byte[] st = new byte[(int) entry.getSize()];
758                                         tar.read(st);
759                                         result.collectorState = (Bean) beanSerializer.deserialize(st);
760                                         if (DEBUG)
761                                                 System.out.println("READ collector state: " + result.collectorState);
762                                 } else {
763                                         tar.skip(entry.getSize());
764                                 }
765                         } else if (metadata) {
766                                 byte[] md = new byte[(int) entry.getSize()];
767                                 tar.read(md);
768                                 if (DEBUG) {
769                                         System.out.println("READING Item metadata: " + name);
770                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
771                                 }
772
773                                 try {
774                                         Bean bean = (Bean) beanSerializer.deserialize(md);
775                                         if (DEBUG)
776                                                 System.out.println("READ Item metadata: " + bean);
777                                         history.modify(bean);
778                                         lastItem = bean;
779                                         lastItemId = (String) bean.getFieldUnchecked("id");
780                                 } catch (ClassFormatError e) {
781                                         // This is here because LZ4BlockInput/OutputStream seemed to
782                                         // be causing weird deserialization errors to occur for
783                                         // single data items when trying to deserialize beans. The
784                                         // reason being that the serialized data that was read only
785                                         // contains the first 512 bytes of the data. After that all
786                                         // data is zeros, which causes things like this to happen in
787                                         // deserialization:
788                                         //signature: rsstzzzzzzzze_b7b532e9
789                                         //component(o): id = String
790                                         //component(1): variableId = String
791                                         //component(2): format = Temp1
792                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
793                                         //component(3):  = Boolean
794                                         //component(4):  = Boolean
795                                         //component(5):  = Boolean
796                                         //component(6):  = Boolean
797                                         //component(7):  = Boolean
798                                         //component(8):  = Boolean
799                                         //component(9):  = Boolean
800                                         //component(10):  = Boolean
801                                         //
802                                         // For this reason we've switched the default compression to FastLZ
803                                         // for now. This is reflected in the input format also.
804
805                                         Activator.getDefault().getLog().log(
806                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
807                                 }
808                         } else if (data && lastItem != null) {
809                                 StreamAccessor sa = history.openStream(lastItemId, "rw");
810                                 try {
811                                         if (sa instanceof BinaryObject) {
812                                                 BinaryObject bo = (BinaryObject) sa;
813                                                 RandomAccessBinary output = bo.getBinary();
814                                                 output.position(0L);
815                                                 output.setLength(0L);
816                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
817                                                 if (copiedBytes != entry.getSize()) {
818                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
819                                                 }
820                                         } else {
821                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
822                                         }
823                                 } finally {
824                                         try {
825                                                 sa.close();
826                                                 lastItem = null;
827                                                 lastItemId = null;
828                                         } catch (AccessorException e) {
829                                         }
830                                 }
831                         }
832                 }
833
834                 return true;
835         }
836
837         /**
838          * Get request that clears history
839          * @return cleaning request
840          */
841         public static WriteRequest clear(final Resource history) {
842                 return new WriteRequest() {
843                         @Override
844                         public void perform(WriteGraph graph) throws DatabaseException {
845                                 HistoryResource HIS = HistoryResource.getInstance(graph);
846                                 Layer0 L0 = Layer0.getInstance(graph);
847
848                                 // Separate items format
849                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
850                                 {
851                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
852                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
853                                         if (info!=null) {
854                                                 graph.deny(oldItem, HIS.History_Item_Info);
855                                                 graph.deny(info);
856                                         }
857                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
858                                         if (data!=null) {
859                                                 graph.deny(oldItem, HIS.History_Item_Series);
860                                                 graph.deny(data);
861                                         }
862                                         graph.deny(history, L0.ConsistsOf, oldItem);
863                                         graph.deny(oldItem);
864                                 }
865
866                                 // Archived format
867                                 graph.denyValue(history, HIS.History_archive);
868                         }
869                 };
870         }
871
872
873         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
874                 TarArchiveEntry entry = new TarArchiveEntry(entryName);
875                 entry.setSize(size);
876                 out.putArchiveEntry(entry);
877         }
878
879         /**
880          * Shall not close the underlying RandomAccessBinary.
881          */
882         private static class RABOutputStream extends OutputStream {
883
884                 private final RandomAccessBinary output;
885
886                 public RABOutputStream(RandomAccessBinary output) {
887                         this.output = output;
888                 }
889
890                 @Override
891                 public void write(byte[] b, int off, int len) throws IOException {
892                         output.write(b, off, len);
893                 }
894
895                 @Override
896                 public void write(int b) throws IOException {
897                         output.write(b);
898                 }
899
900         }
901
902         /**
903          * Shall not close the underlying RandomAccessBinary.
904          */
905         private static class RABInputStream extends InputStream {
906
907                 private final RandomAccessBinary input;
908
909                 public RABInputStream(RandomAccessBinary input) {
910                         this.input = input;
911                 }
912
913                 @Override
914                 public int available() throws IOException {
915                         long avail = input.length() - input.position();
916                         if ((avail & 0xffffffff00000000L) != 0)
917                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
918                         return (int) avail;
919                 }
920
921                 @Override
922                 public int read() throws IOException {
923                         try {
924                                 return input.readUnsignedByte();
925                         } catch (EOFException e) {
926                                 return -1;
927                         }
928                 }
929
930                 @Override
931                 public int read(byte[] b, int off, int len) throws IOException {
932                         long available = input.length() - input.position();
933                         if (available == 0)
934                                 return -1;
935                         int l = (int) Math.min(available, (long) len);
936                         input.readFully(b, off, l);
937                         return l;
938                 }
939
940                 @Override
941                 public long skip(long n) throws IOException {
942                         long count = 0;
943                         while (count < n) {
944                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
945                                 count += input.skipBytes(l);
946                         }
947                         return count;
948                 }
949
950         }
951
952         private static long profile(Long t1, String string) {
953                 if (PROFILE) {
954                         long t2 = System.nanoTime();
955                         long delta = t1 == null ? 0 : t2-t1;
956                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
957                         return t2;
958                 }
959                 return 0L;
960         }
961
962         public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
963                 Double t = toBeforeTime;
964                 Binding timeBinding = null;
965
966                 Bean[] items = history.getItems();
967                 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
968
969                 for (Bean item : items) {
970                         String id = (String) item.getField("id");
971                         StreamAccessor sa = history.openStream(id, "rw");
972                         try {
973                                 Stream s = new Stream(sa);
974                                 timeBinding = s.timeBinding;
975                                 int currentSize = sa.size();
976                                 int index = s.binarySearch(timeBinding, toBeforeTime);
977                                 int newSize = truncationSize(index);
978                                 if (newSize < currentSize) {
979                                         //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
980                                         sa.setSize(newSize);
981
982                                         if (state != null) {
983                                                 Object prevTime  = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
984                                                 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
985                                                 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
986                                                 boolean isNan = isNaN(prevValue);
987
988                                                 VariableState vs = state.values.get(id);
989                                                 if (vs != null && vs.value != null && prevValue != null) {
990                                                         vs.value.setValue(vs.value.getBinding(), prevValue);
991                                                         vs.isValid = true;
992                                                         vs.isNan = isNan;
993                                                 }
994
995                                                 CollectorState.Item is = state.itemStates.get(id);
996                                                 if (is != null) {
997                                                         is.firstTime = toTime(prevTime);
998                                                         is.firstValue = toValue(s.valueBinding, prevValue);
999                                                         is.currentTime = is.firstTime;
1000                                                         is.currentValue = toValue(s.valueBinding, prevValue);
1001                                                         is.isNaN = isNan;
1002                                                         is.isValid = prevValue != null;
1003                                                         is.sum = Double.NaN;
1004                                                         is.count = 0;
1005                                                         is.ooDeadband = false;
1006                                                         is.firstDisabledTime = Double.NaN;
1007                                                         is.lastDisabledTime = Double.NaN;
1008                                                         is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1009                                                 }
1010                                         }
1011                                 }
1012                         } finally {
1013                                 sa.close();
1014                         }
1015                 }
1016
1017                 if (timeBinding != null && state != null) {
1018                         state.time.setValue(timeBinding, t);
1019                         state.dT = 1.0;
1020                 }
1021
1022                 return state;
1023         }
1024
1025         private static double toTime(Object time) {
1026                 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1027         }
1028
1029         private static MutableVariant toValue(Binding valueBinding, Object value) {
1030                 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1031         }
1032
1033         private static boolean isNaN(Object value) {
1034                 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1035         }
1036
1037         private static int truncationSize(int binarySearchResult) {
1038                 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
1039         }
1040
1041 }