]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
Added logging for history archive import failure cases
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - archive implementation, gitlab simantics/platform#227
12  *******************************************************************************/
13 package org.simantics.simulation.history;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.nio.file.Path;
20 import java.util.Arrays;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
25 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
26 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
27 import org.eclipse.core.runtime.IStatus;
28 import org.eclipse.core.runtime.Status;
29 import org.simantics.databoard.Bindings;
30 import org.simantics.databoard.accessor.StreamAccessor;
31 import org.simantics.databoard.accessor.binary.BinaryObject;
32 import org.simantics.databoard.accessor.error.AccessorException;
33 import org.simantics.databoard.binding.ArrayBinding;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.RecordBinding;
36 import org.simantics.databoard.binding.error.BindingConstructionException;
37 import org.simantics.databoard.binding.error.BindingException;
38 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
39 import org.simantics.databoard.binding.mutable.MutableVariant;
40 import org.simantics.databoard.binding.mutable.Variant;
41 import org.simantics.databoard.container.DataContainer;
42 import org.simantics.databoard.container.DataContainers;
43 import org.simantics.databoard.serialization.Serializer;
44 import org.simantics.databoard.util.Bean;
45 import org.simantics.databoard.util.binary.BinaryFile;
46 import org.simantics.databoard.util.binary.OutputStreamWriteable;
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
48 import org.simantics.db.ReadGraph;
49 import org.simantics.db.Resource;
50 import org.simantics.db.WriteGraph;
51 import org.simantics.db.common.request.UniqueRead;
52 import org.simantics.db.common.request.WriteRequest;
53 import org.simantics.db.exception.DatabaseException;
54 import org.simantics.db.exception.ServiceNotFoundException;
55 import org.simantics.db.request.Read;
56 import org.simantics.fastlz.FastLZ;
57 import org.simantics.history.HistoryException;
58 import org.simantics.history.HistoryManager;
59 import org.simantics.history.ItemManager;
60 import org.simantics.history.impl.CollectorImpl;
61 import org.simantics.history.impl.CollectorState;
62 import org.simantics.history.impl.CollectorState.VariableState;
63 import org.simantics.history.util.Stream;
64 import org.simantics.history.util.ValueBand;
65 import org.simantics.history.util.WeightedMedian;
66 import org.simantics.layer0.Layer0;
67 import org.simantics.simulation.Activator;
68 import org.simantics.simulation.ontology.HistoryResource;
69 import org.simantics.simulation.ontology.SimulationResource;
70 import org.simantics.utils.FileUtils;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 import gnu.trove.map.TObjectLongMap;
75 import gnu.trove.map.hash.TObjectLongHashMap;
76 import net.jpountz.lz4.LZ4BlockInputStream;
77 import net.jpountz.lz4.LZ4BlockOutputStream;
78
79 /**
80  * @author Toni Kalajainen
81  * @author Tuukka Lehtonen
82  */
83 public class HistoryUtil {
84
85         private static final Logger LOGGER = LoggerFactory.getLogger(HistoryUtil.class);
86
87         private static final boolean DEBUG = false;
88         private static final boolean PROFILE = true;
89
90         public enum Compression {
91                 FLZ,
92                 LZ4,
93                 NONE,
94         }
95
96         private static final Compression USED_COMPRESSION = Compression.FLZ;
97
98         private static final String APPLICATION_X_TAR = "application/x-tar";
99         private static final String APPLICATION_X_LZ4 = "application/x-lz4";
100         private static final String APPLICATION_X_FLZ = "application/x-flz";
101         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
102
103         public static class ArchivedHistory {
104                 public long totalSampleSize = 0L;
105                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
106         }
107
108         /**
109          * Create request that exports history to graph.
110          * 
111          * If removeOldUnused is true, items that do not exist in source history
112          * are deleted from graph.
113          * 
114          * If item already exists in the graph, it is overwritten.
115          * 
116          * @param history source history
117          * @param g write graph
118          * @param historyResource Instance of HIS.History to write data to
119          * @param removeOldUnused
120          * @param timeShift adjust time values by this value 
121          * @param from time
122          * @param end time
123          * @return WriteRequest
124          */
125         public static WriteRequest export(final HistoryManager history, 
126                         final Resource historyResource,
127                         final boolean removeOldUnused,
128                         final Double timeShift, 
129                         final double from, final double end) 
130         {
131                 return new WriteRequest() {
132                         public void perform(WriteGraph graph) throws DatabaseException {
133                                 HistoryResource HIS = HistoryResource.getInstance(graph);
134                                 Layer0 L0 = Layer0.getInstance(graph);
135                                 long totalSampleSize = 0L;
136                                 long s = System.nanoTime();
137
138                                 try {
139                                         Bean[] items = history.getItems();
140                                         if (PROFILE)
141                                                 profile(null, "exporting " + items.length + " history items to database");
142
143                                         ItemManager im = new ItemManager( items );
144                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
145                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
146                                         {
147                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
148                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
149                                                 if ( bean == null ) continue;
150                                                 String id = (String) bean.getField("id");
151                                                 
152                                                 if (!im.exists(id)) {
153                                                         // There is item in graph that does not exist in source history
154                                                         if ( removeOldUnused ) {
155                                                                 graph.deny(oldItem);
156                                                         }
157                                                 } else {
158                                                         // Item already exists, to-be-overwritten
159                                                         oldResourceMap.put(id, oldItem);
160                                                 }
161                                         }
162                                         
163                                         for (Bean newItem : im.values())
164                                         {
165                                                 //System.out.println("WRITING ITEM: " + newItem);
166                                                 String id = (String) newItem.getField("id");
167                                                 Resource historyItemResource = oldResourceMap.get(id);
168                                                 Resource data = null;
169
170                                                 if ( historyItemResource==null ) {
171                                                         historyItemResource = graph.newResource();
172                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
173                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
174                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);
175                                                 } else {
176                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
177                                                 }
178                                                 
179                                                 Variant v = new Variant(newItem.getBinding(), newItem);
180                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
181                                                 
182                                                 if (data == null) {
183                                                         data = graph.newResource();
184                                                         graph.claim(data, L0.InstanceOf, L0.Variant);
185                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);
186                                                 } else {
187                                                         graph.denyValue(data);
188                                                 }
189                                                 
190                                                 
191                                                 // Write stream
192                                                 StreamAccessor sa = history.openStream(id, "r");
193                                                 try {
194                                                         //System.out.println("WRITING stream of size=" + sa.size());
195                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
196                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
197                                                         Integer constantSampleSize = serializer.getConstantSize();
198                                                         ValueBand vb = new ValueBand(sampleBinding);
199                                                         Stream stream = new Stream(sa, sampleBinding);
200                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
201                                                         Object array = null;
202                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);
203
204                                                         int count = 0;
205                                                         readArray: {
206                                                                 // Start index
207                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            
208                                                                 if ( startIndex < -stream.count() ) break readArray;
209                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;
210                                                                 if ( startIndex == -1 ) startIndex = 0;
211                                                                 
212                                                                 // End index
213                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
214                                                                 if ( endIndex == -1 ) break readArray;
215                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;
216                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
217                                                                 if ( endIndex<startIndex ) break readArray;
218                                                         
219                                                                 // Write sample count
220                                                                 count = endIndex - startIndex + 1;
221                                                                 array = arrayBinding.create(count);
222                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
223                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
224                                                                 for (int i=0; i<count; i++) {
225                                                                         Object sample = sa.get(i+startIndex, sampleBinding);
226                                                                         
227                                                                         // Adjust time
228                                                                         if ( hasTimeShift ) {
229                                                                                 vb.setSample(sample);
230                                                                                 if ( vb.hasTime() ) {
231                                                                                         double newTime = vb.getTimeDouble() + timeShift;
232                                                                                         vb.setTime(Bindings.DOUBLE, newTime);
233                                                                                 }
234                                                                                 if ( vb.hasEndTime() ) {
235                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;
236                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);
237                                                                                 }
238                                                                         }
239                                                                         //System.out.println("\t#" + i + ": sample=" + sample);
240                                                                         arrayBinding.set(array, i, sample);
241                                                                 }
242                                                         }
243                                                         if (array==null) array = arrayBinding.create();
244                                                         Variant v2 = new Variant(arrayBinding, array);
245                                                         graph.claimValue(data, v2, Bindings.VARIANT);
246
247                                                         if (constantSampleSize != null) {
248                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
249                                                                 //System.out.println("item sample size: " + itemSampleSize);
250                                                                 totalSampleSize += itemSampleSize;
251                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
252                                                         }
253                                                 } catch (AccessorException e) {
254                                                         throw new DatabaseException(e);
255                                                 } finally {
256                                                         try {
257                                                                 sa.close();
258                                                         } catch (AccessorException e) {
259                                                         }
260                                                 }
261                                         }
262
263                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
264
265                                         if (PROFILE)
266                                                 profile(s, "exported " + items.length + " history items to database");
267
268                                 } catch (HistoryException e) {
269                                         throw new DatabaseException( e );
270                                 } catch (BindingException e) {
271                                         throw new DatabaseException( e );
272                                 } catch (ServiceNotFoundException e) {
273                                         throw new DatabaseException( e );
274                                 }
275                         }
276                 };
277         }
278
279         /**
280          * Create request that exports history to graph.
281          * 
282          * If item already exists in the graph, it is overwritten.
283          * 
284          * @param history
285          *            source history
286          * @param collectorState
287          *            complete dump of the source history collector state or
288          *            <code>null</code> to skip collector state saving
289          * @param historyResource
290          *            Instance of HIS.History to write data to
291          * @param timeShift
292          *            adjust time values by this value
293          * @param from
294          *            time
295          * @param end
296          *            time
297          * @return WriteRequest
298          */
299         public static WriteRequest exportArchive(
300                         final HistoryManager history,
301                         final Bean collectorState,
302                         final Resource historyResource,
303                         final Double timeShift, 
304                         final double from,
305                         final double end) 
306         {
307                 return new WriteRequest() {
308                         public void perform(WriteGraph graph) throws DatabaseException {
309                                 HistoryResource HIS = HistoryResource.getInstance(graph);
310                                 Layer0 L0 = Layer0.getInstance(graph);
311
312                                 long s = System.nanoTime();
313                                 if (PROFILE)
314                                         profile(null, "archiving history items to database");
315
316                                 // Remove all possibly existing old items in the history.
317                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
318                                         if (graph.isInstanceOf(entity, HIS.History_Item))
319                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
320
321                                 // Create new literal for history archival
322                                 //graph.deny(historyResource, HIS.History_archive);
323                                 graph.denyValue(historyResource, HIS.History_archive);
324                                 Resource archiveLiteral = graph.newResource();
325                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
326                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
327
328                                 OutputStream closeable = null;
329                                 try {
330                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
331                                         rab.position(0);
332                                         rab.skipBytes(4);
333
334                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, timeShift, from, end, rab);
335
336                                         rab.position(0L);
337                                         rab.writeInt((int)(rab.length() - 4L));
338
339                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
340
341                                         if (PROFILE)
342                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
343
344                                 } catch (HistoryException e) {
345                                         throw new DatabaseException( e );
346                                 } catch (IOException e) {
347                                         throw new DatabaseException( e );
348                                 } finally {
349                                         FileUtils.uncheckedClose(closeable);
350                                 }
351                         }
352                 };
353         }
354
355         public static ArchivedHistory exportCompressedArchive(
356                         HistoryManager history,
357                         Bean collectorState,
358                         Double timeShift, 
359                         double from,
360                         double end,
361                         RandomAccessBinary rab)
362                                         throws IOException, HistoryException
363         {
364                 return exportCompressedArchive(history, collectorState, timeShift, from, end, USED_COMPRESSION, rab);
365         }
366
367         public static ArchivedHistory exportCompressedArchive(
368                         HistoryManager history,
369                         Bean collectorState,
370                         Double timeShift, 
371                         double from,
372                         double end,
373                         Compression compression,
374                         RandomAccessBinary rab)
375                                         throws IOException, HistoryException
376         {
377                 switch (compression) {
378                         case FLZ:
379                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
380                                 return exportArchive(history, collectorState, timeShift, from, end, FastLZ.write(new RABOutputStream(rab)));
381                         case LZ4:
382                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
383                                 return exportArchive(history, collectorState, timeShift, from, end, new LZ4BlockOutputStream(new RABOutputStream(rab)));
384                         case NONE:
385                                 DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_TAR, ARCHIVE_VERSION_1, null));
386                                 return exportArchive(history, collectorState, timeShift, from, end, new RABOutputStream(rab));
387                         default:
388                                 throw new UnsupportedOperationException("Unsupported compression: " + compression);
389                 }
390         }
391
392         private static ArchivedHistory exportArchive(
393                         HistoryManager history,
394                         Bean collectorState,
395                         Double timeShift, 
396                         double from,
397                         double end,
398                         OutputStream out)
399                                         throws IOException, HistoryException
400         {
401                 try (OutputStream os = out) {
402                         ArchivedHistory archive = exportArchive( os, history, collectorState, timeShift, from, end );
403                         os.flush();
404                         return archive;
405                 }
406         }
407
408         /**
409          * Store the specified history manager's contents into a single archive
410          * file.
411          * 
412          * @param outputStream
413          *            the output stream where the history archive is written
414          * @param history
415          *            source history
416          * @param collectorState
417          *            complete state of the history collector at the time of saving
418          *            or <code>null</code> to not save any state
419          * @param timeShift
420          *            adjust time values by this value
421          * @param from
422          *            time
423          * @param end
424          *            time
425          * @return ArchivedHistory instance describing the created archive
426          * @throws HistoryException
427          *             when there's a problem with reading the history data from the
428          *             history work area
429          * @throws IOException
430          *             when there's a problem writing the resulting history archive
431          *             file
432          */
433         private static ArchivedHistory exportArchive(
434                         OutputStream outputStream,
435                         HistoryManager history,
436                         Bean collectorState,
437                         Double timeShift,
438                         double from,
439                         double end)
440                                         throws IOException, HistoryException
441         {
442                 ArchivedHistory result = new ArchivedHistory();
443
444                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
445                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
446
447                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
448                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
449
450                 try {
451                         if (collectorState != null) {
452                                 // Write complete collector state into the archive
453                                 if (DEBUG)
454                                         System.out.println("WRITING collector state: " + collectorState);
455                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
456                                 putNextEntry(out, "state", serializedCollectorState.length);
457                                 out.write(serializedCollectorState);
458                                 out.closeArchiveEntry();
459                         }
460
461                         ItemManager im = new ItemManager( history.getItems() );
462
463                         for (Bean item : im.values()) {
464                                 if (DEBUG)
465                                         System.out.println("STORING ITEM: " + item);
466                                 String id = (String) item.getField("id");
467
468                                 // Write data stream metadata
469                                 byte[] metadata = beanSerializer.serialize( item );
470                                 putNextEntry(out, id + ".txt", metadata.length);
471                                 out.write(metadata);
472                                 out.closeArchiveEntry();
473                                 if (DEBUG)
474                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
475
476
477                                 // Write data stream as a separate entry
478                                 StreamAccessor sa = history.openStream(id, "r");
479                                 try {
480                                         if (DEBUG)
481                                                 System.out.println("STREAM SIZE=" + sa.size());
482                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
483                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
484                                         Integer constantSampleSize = sampleSerializer.getConstantSize();
485
486                                         if (constantSampleSize == null) {
487                                                 // Variable length samples - no support for now.
488                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
489                                                 continue;
490                                         } else {
491                                                 ValueBand vb = new ValueBand(sampleBinding);
492                                                 Stream stream = new Stream(sa, sampleBinding);
493                                                 if (DEBUG)
494                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);
495
496                                                 readArray: {
497                                                         // Start index
498                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
499                                                         if ( startIndex < -stream.count() ) break readArray;
500                                                         if ( startIndex<0 ) startIndex = -2-startIndex;
501                                                         if ( startIndex == -1 ) startIndex = 0;
502
503                                                         // End index
504                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
505                                                         if ( endIndex == -1 ) break readArray;
506                                                         if ( endIndex<0 ) endIndex = -1-endIndex;
507                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;
508                                                         if ( endIndex<startIndex ) break readArray;
509
510                                                         // Written sample count
511                                                         int count = endIndex - startIndex + 1;
512                                                         if (DEBUG)
513                                                                 System.out.println("WRITTEN sample count=" + count);
514
515                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
516                                                         if (DEBUG)
517                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);
518                                                         putNextEntry(out, id + ".data", savedSamplesSize);
519
520                                                         for (int i=0; i<count; i++) {
521                                                                 Object sample = sa.get(i+startIndex, sampleBinding);
522
523                                                                 // Adjust time
524                                                                 if ( hasTimeShift ) {
525                                                                         vb.setSample(sample);
526                                                                         if ( vb.hasTime() ) {
527                                                                                 double newTime = vb.getTimeDouble() + timeShift;
528                                                                                 vb.setTime(Bindings.DOUBLE, newTime);
529                                                                         }
530                                                                         if ( vb.hasEndTime() ) {
531                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;
532                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);
533                                                                         }
534                                                                 }
535                                                                 //System.out.println("\t#" + i + ": sample=" + sample);
536                                                                 sampleSerializer.serialize(dataOutput, sample);
537                                                         }
538                                                         out.closeArchiveEntry();
539
540                                                         result.itemSizes.put(item, savedSamplesSize);
541                                                         result.totalSampleSize += savedSamplesSize;
542                                                 }
543                                         }
544                                 } catch (AccessorException e) {
545                                         throw new IOException(e);
546                                 } finally {
547                                         try {
548                                                 sa.close();
549                                         } catch (AccessorException e) {
550                                         }
551                                 }
552                                 
553                         }
554
555                         return result;
556                 } catch (BindingException e) {
557                         throw new HistoryException(e);
558                 } finally {
559                         out.finish();
560                 }
561         }
562
563         /**
564          * Import all history items from graph into a history manager.
565          * 
566          * @param r history resource or initial condition resource source HIS.History resource
567          * @param history destination history
568          * @return read request that always returns a HistoryImportResult instance
569          */
570         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
571         {
572                 return new UniqueRead<HistoryImportResult>() {
573                         @Override
574                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
575                                 HistoryImportResult result = new HistoryImportResult();
576
577                                 HistoryResource HIS = HistoryResource.getInstance(graph);
578                                 SimulationResource SIM = SimulationResource.getInstance(graph);
579                                 Resource historyResource = r;
580
581                                 if (!graph.isInstanceOf(historyResource, HIS.History))
582                                         historyResource = graph.getPossibleObject(r, SIM.State_History);
583                                 if (historyResource == null)
584                                         return result;
585                                 if (!graph.isInstanceOf(historyResource, HIS.History))
586                                         return result;
587
588                                 long s = System.nanoTime();
589
590                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
591                                 if (archive == null) {
592                                         if (PROFILE)
593                                                 profile(null, "importing history items from old database format to disk workarea");
594                                         importHistoryItems(graph, historyResource, history);
595                                 } else {
596                                         try {
597                                                 if (PROFILE)
598                                                         profile(null, "importing history items from archived format to disk workarea");
599                                                 importHistoryArchive(graph, archive, history, result);
600                                         } catch (IOException e) {
601                                                 throw new DatabaseException(e);
602                                         } catch (HistoryException e) {
603                                                 throw new DatabaseException(e);
604                                         }
605                                 }
606
607                                 if (PROFILE)
608                                         profile(s, "imported history items from database to disk workarea");
609
610                                 return result;
611                         }
612                 };
613         }
614
615         /**
616          * Import all history items from graph into a history manager.
617          * 
618          * @param r history resource or initial condition resource source HIS.History resource
619          * @param history destination history
620          * @return <code>true</code> if successful
621          */
622         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
623         {
624                 HistoryResource HIS = HistoryResource.getInstance(graph);
625                 Layer0 L0 = Layer0.getInstance(graph);
626
627                 try {
628                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
629                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
630                                         continue;
631
632                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
633                                 String id = (String) bean.getFieldUnchecked("id");
634                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
635                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );
636                                 history.modify(bean);
637                                 StreamAccessor sa = history.openStream(id, "rw");
638                                 try {
639                                         sa.setValue(arrayBinding, array);
640                                 } finally {
641                                         try {
642                                                 sa.close();
643                                         } catch (AccessorException e) {
644                                         }
645                                 }
646                         }
647
648                         return true;
649                 } catch (AccessorException e) {
650                         throw new DatabaseException(e);
651                 } catch (BindingConstructionException e) {
652                         throw new DatabaseException(e);
653                 } catch (HistoryException e) {
654                         throw new DatabaseException(e);
655                 }
656         }
657
658         /**
659          * @param history
660          * @param path
661          * @return
662          * @throws HistoryException 
663          * @throws IOException 
664          */
665         public static HistoryImportResult importHistoryArchive(HistoryManager history, Path path) throws IOException, HistoryException {
666                 HistoryImportResult result = new HistoryImportResult();
667                 try (RandomAccessBinary rab = new BinaryFile(path.toFile())) {
668                         importHistoryArchive(history, rab, result);
669                         return result;
670                 } catch (IOException e) {
671                         LOGGER.error("Failed to import history from archive {}", path);
672                         throw e;
673                 }
674         }
675
676         /**
677          * Import all history items from database archive into a history manager.
678          * 
679          * @param graph
680          * @param historyResource
681          * @param archive 
682          * @param history
683          * @return <code>true</code> if successful
684          * @throws DatabaseException 
685          */
686         private static boolean importHistoryArchive(ReadGraph graph, Resource archive, HistoryManager history, HistoryImportResult result) throws DatabaseException, IOException, HistoryException {
687                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
688                 // This is here because the data is encoded as an L0.ByteArray
689                 // in the database which means the first integer describes the
690                 // length of the byte array. We skip that length data here.
691                 rab.position(4);
692                 return importHistoryArchive(history, rab, result);
693         }
694
695         /**
696          * Import all history items from database archive into a history manager.
697          * 
698          * @param graph
699          * @param historyResource
700          * @param archive 
701          * @param history
702          * @return <code>true</code> if successful
703          * @throws DatabaseException 
704          */
705         private static boolean importHistoryArchive(HistoryManager history, RandomAccessBinary rab, HistoryImportResult result)
706                         throws IOException, HistoryException
707         {
708                 DataContainer dc = DataContainers.readHeader(rab);
709
710                 if (DEBUG)
711                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
712
713                 if (dc.version == ARCHIVE_VERSION_1) {
714                         if (APPLICATION_X_LZ4.equals(dc.format)) {
715                                 return extractHistoryArchiveInputStream(history, new LZ4BlockInputStream(new RABInputStream(rab)), result);
716                         } else if (APPLICATION_X_FLZ.equals(dc.format)) {
717                                 return extractHistoryArchiveInputStream(history, FastLZ.read(new RABInputStream(rab)), result);
718                         } else if (APPLICATION_X_TAR.equals(dc.format)) {
719                                 return extractHistoryArchiveInputStream(history, new RABInputStream(rab), result);
720                         }
721                         throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
722                 } else {
723                         throw new UnsupportedOperationException("Unsupported history archive version " + dc.version + " with format " + dc.format);
724                 }
725         }
726
727         private static boolean extractHistoryArchiveInputStream(HistoryManager history, InputStream in, HistoryImportResult result) throws IOException, HistoryException {
728                 try (InputStream _in = in) {
729                         return extractHistoryArchiveTar(history, in, result);
730                 }
731         }
732
733         /**
734          * Import all history items from graph into a history manager
735          * 
736          * @param graph database access
737          * @param history destination history
738          * @param rab the archive to extract from
739          * @return <code>true</code> if successful
740          * @throws DatabaseException 
741          */
742         private static boolean extractHistoryArchiveTar(HistoryManager history, InputStream in, HistoryImportResult result)
743                         throws IOException, HistoryException
744         {
745                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
746                 TarArchiveInputStream tar = new TarArchiveInputStream(in);
747
748                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
749
750                 Bean lastItem = null;
751                 String lastItemId = null;
752                 while (true) {
753                         TarArchiveEntry entry = tar.getNextTarEntry();
754                         if (entry == null)
755                                 break;
756
757                         String name = entry.getName();
758                         boolean state = name.equals("state");
759                         boolean metadata = name.endsWith(".txt");
760                         boolean data = name.endsWith(".data");
761
762                         if (state) {
763                                 if (result != null) {
764                                         byte[] st = new byte[(int) entry.getSize()];
765                                         tar.read(st);
766                                         result.collectorState = (Bean) beanSerializer.deserialize(st);
767                                         if (DEBUG)
768                                                 System.out.println("READ collector state: " + result.collectorState);
769                                 } else {
770                                         tar.skip(entry.getSize());
771                                 }
772                         } else if (metadata) {
773                                 byte[] md = new byte[(int) entry.getSize()];
774                                 tar.read(md);
775                                 if (DEBUG) {
776                                         System.out.println("READING Item metadata: " + name);
777                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
778                                 }
779
780                                 try {
781                                         Bean bean = (Bean) beanSerializer.deserialize(md);
782                                         if (DEBUG)
783                                                 System.out.println("READ Item metadata: " + bean);
784                                         history.modify(bean);
785                                         lastItem = bean;
786                                         lastItemId = (String) bean.getFieldUnchecked("id");
787                                 } catch (ClassFormatError e) {
788                                         // This is here because LZ4BlockInput/OutputStream seemed to
789                                         // be causing weird deserialization errors to occur for
790                                         // single data items when trying to deserialize beans. The
791                                         // reason being that the serialized data that was read only
792                                         // contains the first 512 bytes of the data. After that all
793                                         // data is zeros, which causes things like this to happen in
794                                         // deserialization:
795                                         //signature: rsstzzzzzzzze_b7b532e9
796                                         //component(o): id = String
797                                         //component(1): variableId = String
798                                         //component(2): format = Temp1
799                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
800                                         //component(3):  = Boolean
801                                         //component(4):  = Boolean
802                                         //component(5):  = Boolean
803                                         //component(6):  = Boolean
804                                         //component(7):  = Boolean
805                                         //component(8):  = Boolean
806                                         //component(9):  = Boolean
807                                         //component(10):  = Boolean
808                                         //
809                                         // For this reason we've switched the default compression to FastLZ
810                                         // for now. This is reflected in the input format also.
811
812                                         Activator.getDefault().getLog().log(
813                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
814                                 }
815                         } else if (data && lastItem != null) {
816                                 StreamAccessor sa = history.openStream(lastItemId, "rw");
817                                 try {
818                                         if (sa instanceof BinaryObject) {
819                                                 BinaryObject bo = (BinaryObject) sa;
820                                                 RandomAccessBinary output = bo.getBinary();
821                                                 output.position(0L);
822                                                 output.setLength(0L);
823                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
824                                                 if (copiedBytes != entry.getSize()) {
825                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
826                                                 }
827                                         } else {
828                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
829                                         }
830                                 } finally {
831                                         try {
832                                                 sa.close();
833                                                 lastItem = null;
834                                                 lastItemId = null;
835                                         } catch (AccessorException e) {
836                                         }
837                                 }
838                         }
839                 }
840
841                 return true;
842         }
843
844         /**
845          * Get request that clears history
846          * @return cleaning request
847          */
848         public static WriteRequest clear(final Resource history) {
849                 return new WriteRequest() {
850                         @Override
851                         public void perform(WriteGraph graph) throws DatabaseException {
852                                 HistoryResource HIS = HistoryResource.getInstance(graph);
853                                 Layer0 L0 = Layer0.getInstance(graph);
854
855                                 // Separate items format
856                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
857                                 {
858                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
859                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
860                                         if (info!=null) {
861                                                 graph.deny(oldItem, HIS.History_Item_Info);
862                                                 graph.deny(info);
863                                         }
864                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
865                                         if (data!=null) {
866                                                 graph.deny(oldItem, HIS.History_Item_Series);
867                                                 graph.deny(data);
868                                         }
869                                         graph.deny(history, L0.ConsistsOf, oldItem);
870                                         graph.deny(oldItem);
871                                 }
872
873                                 // Archived format
874                                 graph.denyValue(history, HIS.History_archive);
875                         }
876                 };
877         }
878
879
880         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
881                 TarArchiveEntry entry = new TarArchiveEntry(entryName);
882                 entry.setSize(size);
883                 out.putArchiveEntry(entry);
884         }
885
886         /**
887          * Shall not close the underlying RandomAccessBinary.
888          */
889         private static class RABOutputStream extends OutputStream {
890
891                 private final RandomAccessBinary output;
892
893                 public RABOutputStream(RandomAccessBinary output) {
894                         this.output = output;
895                 }
896
897                 @Override
898                 public void write(byte[] b, int off, int len) throws IOException {
899                         output.write(b, off, len);
900                 }
901
902                 @Override
903                 public void write(int b) throws IOException {
904                         output.write(b);
905                 }
906
907         }
908
909         /**
910          * Shall not close the underlying RandomAccessBinary.
911          */
912         private static class RABInputStream extends InputStream {
913
914                 private final RandomAccessBinary input;
915
916                 public RABInputStream(RandomAccessBinary input) {
917                         this.input = input;
918                 }
919
920                 @Override
921                 public int available() throws IOException {
922                         long avail = input.length() - input.position();
923                         if ((avail & 0xffffffff00000000L) != 0)
924                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
925                         return (int) avail;
926                 }
927
928                 @Override
929                 public int read() throws IOException {
930                         try {
931                                 return input.readUnsignedByte();
932                         } catch (EOFException e) {
933                                 return -1;
934                         }
935                 }
936
937                 @Override
938                 public int read(byte[] b, int off, int len) throws IOException {
939                         long available = input.length() - input.position();
940                         if (available == 0)
941                                 return -1;
942                         int l = (int) Math.min(available, (long) len);
943                         input.readFully(b, off, l);
944                         return l;
945                 }
946
947                 @Override
948                 public long skip(long n) throws IOException {
949                         long count = 0;
950                         while (count < n) {
951                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
952                                 count += input.skipBytes(l);
953                         }
954                         return count;
955                 }
956
957         }
958
959         private static long profile(Long t1, String string) {
960                 if (PROFILE) {
961                         long t2 = System.nanoTime();
962                         long delta = t1 == null ? 0 : t2-t1;
963                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
964                         return t2;
965                 }
966                 return 0L;
967         }
968
969         public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
970                 Double t = toBeforeTime;
971                 Binding timeBinding = null;
972
973                 Bean[] items = history.getItems();
974                 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
975
976                 for (Bean item : items) {
977                         String id = (String) item.getField("id");
978                         StreamAccessor sa = history.openStream(id, "rw");
979                         try {
980                                 Stream s = new Stream(sa);
981                                 timeBinding = s.timeBinding;
982                                 int currentSize = sa.size();
983                                 int index = s.binarySearch(timeBinding, toBeforeTime);
984                                 int newSize = truncationSize(index);
985                                 if (newSize < currentSize) {
986                                         //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
987                                         sa.setSize(newSize);
988
989                                         if (state != null) {
990                                                 Object prevTime  = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
991                                                 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
992                                                 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
993                                                 boolean isNan = isNaN(prevValue);
994
995                                                 VariableState vs = state.values.get(id);
996                                                 if (vs != null && vs.value != null && prevValue != null) {
997                                                         vs.value.setValue(vs.value.getBinding(), prevValue);
998                                                         vs.isValid = true;
999                                                         vs.isNan = isNan;
1000                                                 }
1001
1002                                                 CollectorState.Item is = state.itemStates.get(id);
1003                                                 if (is != null) {
1004                                                         is.firstTime = toTime(prevTime);
1005                                                         is.firstValue = toValue(s.valueBinding, prevValue);
1006                                                         is.currentTime = is.firstTime;
1007                                                         is.currentValue = toValue(s.valueBinding, prevValue);
1008                                                         is.isNaN = isNan;
1009                                                         is.isValid = prevValue != null;
1010                                                         is.sum = Double.NaN;
1011                                                         is.count = 0;
1012                                                         is.ooDeadband = false;
1013                                                         is.firstDisabledTime = Double.NaN;
1014                                                         is.lastDisabledTime = Double.NaN;
1015                                                         is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1016                                                 }
1017                                         }
1018                                 }
1019                         } finally {
1020                                 sa.close();
1021                         }
1022                 }
1023
1024                 if (timeBinding != null && state != null) {
1025                         state.time.setValue(timeBinding, t);
1026                         state.dT = 1.0;
1027                 }
1028
1029                 return state;
1030         }
1031
1032         private static double toTime(Object time) {
1033                 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1034         }
1035
1036         private static MutableVariant toValue(Binding valueBinding, Object value) {
1037                 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1038         }
1039
1040         private static boolean isNaN(Object value) {
1041                 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1042         }
1043
1044         private static int truncationSize(int binarySearchResult) {
1045                 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
1046         }
1047
1048 }