]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
Added utility for truncating collected history data
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - archive implementation
12  *******************************************************************************/
13 package org.simantics.simulation.history;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Arrays;
20 import java.util.HashMap;
21 import java.util.Map;
22
23 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
24 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
25 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
26 import org.eclipse.core.runtime.IStatus;
27 import org.eclipse.core.runtime.Status;
28 import org.simantics.databoard.Bindings;
29 import org.simantics.databoard.accessor.StreamAccessor;
30 import org.simantics.databoard.accessor.binary.BinaryObject;
31 import org.simantics.databoard.accessor.error.AccessorException;
32 import org.simantics.databoard.binding.ArrayBinding;
33 import org.simantics.databoard.binding.Binding;
34 import org.simantics.databoard.binding.RecordBinding;
35 import org.simantics.databoard.binding.error.BindingConstructionException;
36 import org.simantics.databoard.binding.error.BindingException;
37 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
38 import org.simantics.databoard.binding.mutable.MutableVariant;
39 import org.simantics.databoard.binding.mutable.Variant;
40 import org.simantics.databoard.container.DataContainer;
41 import org.simantics.databoard.container.DataContainers;
42 import org.simantics.databoard.serialization.Serializer;
43 import org.simantics.databoard.util.Bean;
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
45 import org.simantics.databoard.util.binary.RandomAccessBinary;
46 import org.simantics.db.ReadGraph;
47 import org.simantics.db.Resource;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.common.request.UniqueRead;
50 import org.simantics.db.common.request.WriteRequest;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ServiceNotFoundException;
53 import org.simantics.db.request.Read;
54 import org.simantics.fastlz.FastLZ;
55 import org.simantics.history.Collector;
56 import org.simantics.history.HistoryException;
57 import org.simantics.history.HistoryManager;
58 import org.simantics.history.ItemManager;
59 import org.simantics.history.impl.CollectorState;
60 import org.simantics.history.impl.CollectorState.VariableState;
61 import org.simantics.history.util.Stream;
62 import org.simantics.history.util.ValueBand;
63 import org.simantics.layer0.Layer0;
64 import org.simantics.simulation.Activator;
65 import org.simantics.simulation.ontology.HistoryResource;
66 import org.simantics.simulation.ontology.SimulationResource;
67 import org.simantics.utils.FileUtils;
68
69 import gnu.trove.map.TObjectLongMap;
70 import gnu.trove.map.hash.TObjectLongHashMap;
71 import net.jpountz.lz4.LZ4BlockInputStream;
72 import net.jpountz.lz4.LZ4BlockOutputStream;
73
74 /**
75  * @author Toni Kalajainen
76  * @author Tuukka Lehtonen
77  */
78 public class HistoryUtil {
79
80         private static final boolean DEBUG = false;
81         private static final boolean PROFILE = true;
82
83         private enum Compression {
84                 FLZ,
85                 LZ4,
86         }
87
88         private static final Compression USED_COMPRESSION = Compression.FLZ;
89
90         private static final String APPLICATION_X_LZ4 = "application/x-lz4";
91         private static final String APPLICATION_X_FLZ = "application/x-flz";
92         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
93
94         private static class ArchivedHistory {
95                 public long totalSampleSize = 0L;
96                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
97         }
98
99         /**
100          * Create request that exports history to graph.
101          * 
102          * If removeOldUnused is true, items that do not exist in source history
103          * are deleted from graph.
104          * 
105          * If item already exists in the graph, it is overwritten.
106          * 
107          * @param history source history
108          * @param g write graph
109          * @param historyResource Instance of HIS.History to write data to
110          * @param removeOldUnused
111          * @param timeShift adjust time values by this value 
112          * @param from time
113          * @param end time
114          * @return WriteRequest
115          */
116         public static WriteRequest export(final HistoryManager history, 
117                         final Resource historyResource,
118                         final boolean removeOldUnused,
119                         final Double timeShift, 
120                         final double from, final double end) 
121         {
122                 return new WriteRequest() {
123                         public void perform(WriteGraph graph) throws DatabaseException {
124                                 HistoryResource HIS = HistoryResource.getInstance(graph);
125                                 Layer0 L0 = Layer0.getInstance(graph);
126                                 long totalSampleSize = 0L;
127                                 long s = System.nanoTime();
128
129                                 try {
130                                         Bean[] items = history.getItems();
131                                         if (PROFILE)
132                                                 profile(null, "exporting " + items.length + " history items to database");
133
134                                         ItemManager im = new ItemManager( items );
135                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
136                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
137                                         {
138                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
139                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
140                                                 if ( bean == null ) continue;
141                                                 String id = (String) bean.getField("id");
142                                                 
143                                                 if (!im.exists(id)) {
144                                                         // There is item in graph that does not exist in source history
145                                                         if ( removeOldUnused ) {
146                                                                 graph.deny(oldItem);
147                                                         }
148                                                 } else {
149                                                         // Item already exists, to-be-overwritten
150                                                         oldResourceMap.put(id, oldItem);
151                                                 }
152                                         }
153                                         
154                                         for (Bean newItem : im.values())
155                                         {
156                                                 //System.out.println("WRITING ITEM: " + newItem);
157                                                 String id = (String) newItem.getField("id");
158                                                 Resource historyItemResource = oldResourceMap.get(id);
159                                                 Resource data = null;
160
161                                                 if ( historyItemResource==null ) {
162                                                         historyItemResource = graph.newResource();
163                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
164                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
165                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);
166                                                 } else {
167                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
168                                                 }
169                                                 
170                                                 Variant v = new Variant(newItem.getBinding(), newItem);
171                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
172                                                 
173                                                 if (data == null) {
174                                                         data = graph.newResource();
175                                                         graph.claim(data, L0.InstanceOf, L0.Variant);
176                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);
177                                                 } else {
178                                                         graph.denyValue(data);
179                                                 }
180                                                 
181                                                 
182                                                 // Write stream
183                                                 StreamAccessor sa = history.openStream(id, "r");
184                                                 try {
185                                                         //System.out.println("WRITING stream of size=" + sa.size());
186                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
187                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
188                                                         Integer constantSampleSize = serializer.getConstantSize();
189                                                         ValueBand vb = new ValueBand(sampleBinding);
190                                                         Stream stream = new Stream(sa, sampleBinding);
191                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
192                                                         Object array = null;
193                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);
194
195                                                         int count = 0;
196                                                         readArray: {
197                                                                 // Start index
198                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            
199                                                                 if ( startIndex < -stream.count() ) break readArray;
200                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;
201                                                                 if ( startIndex == -1 ) startIndex = 0;
202                                                                 
203                                                                 // End index
204                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
205                                                                 if ( endIndex == -1 ) break readArray;
206                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;
207                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
208                                                                 if ( endIndex<startIndex ) break readArray;
209                                                         
210                                                                 // Write sample count
211                                                                 count = endIndex - startIndex + 1;
212                                                                 array = arrayBinding.create(count);
213                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
214                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
215                                                                 for (int i=0; i<count; i++) {
216                                                                         Object sample = sa.get(i+startIndex, sampleBinding);
217                                                                         
218                                                                         // Adjust time
219                                                                         if ( hasTimeShift ) {
220                                                                                 vb.setSample(sample);
221                                                                                 if ( vb.hasTime() ) {
222                                                                                         double newTime = vb.getTimeDouble() + timeShift;
223                                                                                         vb.setTime(Bindings.DOUBLE, newTime);
224                                                                                 }
225                                                                                 if ( vb.hasEndTime() ) {
226                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;
227                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);
228                                                                                 }
229                                                                         }
230                                                                         //System.out.println("\t#" + i + ": sample=" + sample);
231                                                                         arrayBinding.set(array, i, sample);
232                                                                 }
233                                                         }
234                                                         if (array==null) array = arrayBinding.create();
235                                                         Variant v2 = new Variant(arrayBinding, array);
236                                                         graph.claimValue(data, v2, Bindings.VARIANT);
237
238                                                         if (constantSampleSize != null) {
239                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
240                                                                 //System.out.println("item sample size: " + itemSampleSize);
241                                                                 totalSampleSize += itemSampleSize;
242                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
243                                                         }
244                                                 } catch (AccessorException e) {
245                                                         throw new DatabaseException(e);
246                                                 } finally {
247                                                         try {
248                                                                 sa.close();
249                                                         } catch (AccessorException e) {
250                                                         }
251                                                 }
252                                         }
253
254                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
255
256                                         if (PROFILE)
257                                                 profile(s, "exported " + items.length + " history items to database");
258
259                                 } catch (HistoryException e) {
260                                         throw new DatabaseException( e );
261                                 } catch (BindingException e) {
262                                         throw new DatabaseException( e );
263                                 } catch (ServiceNotFoundException e) {
264                                         throw new DatabaseException( e );
265                                 }
266                         }
267                 };
268         }
269
270         /**
271          * Create request that exports history to graph.
272          * 
273          * If item already exists in the graph, it is overwritten.
274          * 
275          * @param history
276          *            source history
277          * @param collectorState
278          *            complete dump of the source history collector state or
279          *            <code>null</code> to skip collector state saving
280          * @param historyResource
281          *            Instance of HIS.History to write data to
282          * @param timeShift
283          *            adjust time values by this value
284          * @param from
285          *            time
286          * @param end
287          *            time
288          * @return WriteRequest
289          */
290         public static WriteRequest exportArchive(
291                         final HistoryManager history,
292                         final Bean collectorState,
293                         final Resource historyResource,
294                         final Double timeShift, 
295                         final double from,
296                         final double end) 
297         {
298                 return new WriteRequest() {
299                         public void perform(WriteGraph graph) throws DatabaseException {
300                                 HistoryResource HIS = HistoryResource.getInstance(graph);
301                                 Layer0 L0 = Layer0.getInstance(graph);
302
303                                 long s = System.nanoTime();
304                                 if (PROFILE)
305                                         profile(null, "archiving history items to database");
306
307                                 // Remove all possibly existing old items in the history.
308                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
309                                         if (graph.isInstanceOf(entity, HIS.History_Item))
310                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
311
312                                 // Create new literal for history archival
313                                 //graph.deny(historyResource, HIS.History_archive);
314                                 graph.denyValue(historyResource, HIS.History_archive);
315                                 Resource archiveLiteral = graph.newResource();
316                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
317                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
318
319                                 OutputStream closeable = null;
320                                 try {
321                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
322                                         rab.position(0);
323                                         rab.skipBytes(4);
324
325                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
326
327                                         rab.position(0L);
328                                         rab.writeInt((int)(rab.length() - 4L));
329
330                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
331
332                                         if (PROFILE)
333                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
334
335                                 } catch (HistoryException e) {
336                                         throw new DatabaseException( e );
337                                 } catch (IOException e) {
338                                         throw new DatabaseException( e );
339                                 } finally {
340                                         FileUtils.uncheckedClose(closeable);
341                                 }
342                         }
343                 };
344         }
345
346         private static ArchivedHistory exportCompressedArchive(
347                         final HistoryManager history,
348                         final Bean collectorState,
349                         final Resource historyResource,
350                         final Double timeShift, 
351                         final double from,
352                         final double end,
353                         RandomAccessBinary rab)
354                                         throws IOException, HistoryException
355         {
356                 switch (USED_COMPRESSION) {
357                 case FLZ:
358                         return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
359                 case LZ4:
360                         return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
361                 default:
362                         throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
363                 }
364         }
365
366         private static ArchivedHistory exportArchiveLZ4(
367                         final HistoryManager history,
368                         final Bean collectorState,
369                         final Resource historyResource,
370                         final Double timeShift, 
371                         final double from,
372                         final double end,
373                         RandomAccessBinary rab)
374                                         throws IOException, HistoryException
375         {
376                 OutputStream closeable = null;
377                 try {
378                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
379                         closeable = new RABOutputStream( rab );
380                         closeable = new LZ4BlockOutputStream( closeable );
381
382                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
383
384                         closeable.flush();
385                         closeable.close();
386                         closeable = null;
387
388                         return archive;
389                 } finally {
390                         FileUtils.uncheckedClose(closeable);
391                 }
392         }
393
394         @SuppressWarnings("resource")
395         private static ArchivedHistory exportArchiveFLZ(
396                         final HistoryManager history,
397                         final Bean collectorState,
398                         final Resource historyResource,
399                         final Double timeShift, 
400                         final double from,
401                         final double end,
402                         RandomAccessBinary rab)
403                                         throws IOException, HistoryException
404         {
405                 OutputStream closeable = null;
406                 try {
407                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
408                         closeable = new RABOutputStream( rab );
409                         closeable = FastLZ.write( closeable );
410
411                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
412
413                         closeable.flush();
414                         closeable.close();
415                         closeable = null;
416
417                         return archive;
418                 } finally {
419                         FileUtils.uncheckedClose(closeable);
420                 }
421         }
422
423         /**
424          * Store the specified history manager's contents into a single archive
425          * file.
426          * 
427          * @param outputStream
428          *            the output stream where the history archive is written
429          * @param history
430          *            source history
431          * @param collectorState
432          *            complete state of the history collector at the time of saving
433          *            or <code>null</code> to not save any state
434          * @param timeShift
435          *            adjust time values by this value
436          * @param from
437          *            time
438          * @param end
439          *            time
440          * @return ArchivedHistory instance describing the created archive
441          * @throws HistoryException
442          *             when there's a problem with reading the history data from the
443          *             history work area
444          * @throws IOException
445          *             when there's a problem writing the resulting history archive
446          *             file
447          */
448         private static ArchivedHistory exportArchive(
449                         OutputStream outputStream,
450                         HistoryManager history,
451                         Bean collectorState,
452                         Double timeShift,
453                         double from,
454                         double end)
455                                         throws IOException, HistoryException
456         {
457                 ArchivedHistory result = new ArchivedHistory();
458
459                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
460                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
461
462                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
463                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
464
465                 try {
466                         if (collectorState != null) {
467                                 // Write complete collector state into the archive
468                                 if (DEBUG)
469                                         System.out.println("WRITING collector state: " + collectorState);
470                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
471                                 putNextEntry(out, "state", serializedCollectorState.length);
472                                 out.write(serializedCollectorState);
473                                 out.closeArchiveEntry();
474                         }
475
476                         ItemManager im = new ItemManager( history.getItems() );
477
478                         for (Bean item : im.values()) {
479                                 if (DEBUG)
480                                         System.out.println("STORING ITEM: " + item);
481                                 String id = (String) item.getField("id");
482
483                                 // Write data stream metadata
484                                 byte[] metadata = beanSerializer.serialize( item );
485                                 putNextEntry(out, id + ".txt", metadata.length);
486                                 out.write(metadata);
487                                 out.closeArchiveEntry();
488                                 if (DEBUG)
489                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
490
491
492                                 // Write data stream as a separate entry
493                                 StreamAccessor sa = history.openStream(id, "r");
494                                 try {
495                                         if (DEBUG)
496                                                 System.out.println("STREAM SIZE=" + sa.size());
497                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
498                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
499                                         Integer constantSampleSize = sampleSerializer.getConstantSize();
500
501                                         if (constantSampleSize == null) {
502                                                 // Variable length samples - no support for now.
503                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
504                                                 continue;
505                                         } else {
506                                                 ValueBand vb = new ValueBand(sampleBinding);
507                                                 Stream stream = new Stream(sa, sampleBinding);
508                                                 if (DEBUG)
509                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);
510
511                                                 readArray: {
512                                                         // Start index
513                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
514                                                         if ( startIndex < -stream.count() ) break readArray;
515                                                         if ( startIndex<0 ) startIndex = -2-startIndex;
516                                                         if ( startIndex == -1 ) startIndex = 0;
517
518                                                         // End index
519                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
520                                                         if ( endIndex == -1 ) break readArray;
521                                                         if ( endIndex<0 ) endIndex = -1-endIndex;
522                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;
523                                                         if ( endIndex<startIndex ) break readArray;
524
525                                                         // Written sample count
526                                                         int count = endIndex - startIndex + 1;
527                                                         if (DEBUG)
528                                                                 System.out.println("WRITTEN sample count=" + count);
529
530                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
531                                                         if (DEBUG)
532                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);
533                                                         putNextEntry(out, id + ".data", savedSamplesSize);
534
535                                                         for (int i=0; i<count; i++) {
536                                                                 Object sample = sa.get(i+startIndex, sampleBinding);
537
538                                                                 // Adjust time
539                                                                 if ( hasTimeShift ) {
540                                                                         vb.setSample(sample);
541                                                                         if ( vb.hasTime() ) {
542                                                                                 double newTime = vb.getTimeDouble() + timeShift;
543                                                                                 vb.setTime(Bindings.DOUBLE, newTime);
544                                                                         }
545                                                                         if ( vb.hasEndTime() ) {
546                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;
547                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);
548                                                                         }
549                                                                 }
550                                                                 //System.out.println("\t#" + i + ": sample=" + sample);
551                                                                 sampleSerializer.serialize(dataOutput, sample);
552                                                         }
553                                                         out.closeArchiveEntry();
554
555                                                         result.itemSizes.put(item, savedSamplesSize);
556                                                         result.totalSampleSize += savedSamplesSize;
557                                                 }
558                                         }
559                                 } catch (AccessorException e) {
560                                         throw new IOException(e);
561                                 } finally {
562                                         try {
563                                                 sa.close();
564                                         } catch (AccessorException e) {
565                                         }
566                                 }
567                                 
568                         }
569
570                         return result;
571                 } catch (BindingException e) {
572                         throw new HistoryException(e);
573                 } finally {
574                         out.finish();
575                 }
576         }
577
578         /**
579          * Import all history items from graph into a history manager.
580          * 
581          * @param r history resource or initial condition resource source HIS.History resource
582          * @param history destination history
583          * @return read request that always returns a HistoryImportResult instance
584          */
585         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
586         {
587                 return new UniqueRead<HistoryImportResult>() {
588                         @Override
589                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
590                                 HistoryImportResult result = new HistoryImportResult();
591
592                                 HistoryResource HIS = HistoryResource.getInstance(graph);
593                                 SimulationResource SIM = SimulationResource.getInstance(graph);
594                                 Resource historyResource = r;
595
596                                 if (!graph.isInstanceOf(historyResource, HIS.History))
597                                         historyResource = graph.getPossibleObject(r, SIM.State_History);
598                                 if (historyResource == null)
599                                         return result;
600                                 if (!graph.isInstanceOf(historyResource, HIS.History))
601                                         return result;
602
603                                 long s = System.nanoTime();
604
605                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
606                                 if (archive == null) {
607                                         if (PROFILE)
608                                                 profile(null, "importing history items from old database format to disk workarea");
609                                         importHistoryItems(graph, historyResource, history);
610                                 } else {
611                                         try {
612                                                 if (PROFILE)
613                                                         profile(null, "importing history items from archived format to disk workarea");
614                                                 importHistoryArchive(graph, historyResource, archive, history, result);
615                                         } catch (IOException e) {
616                                                 throw new DatabaseException(e);
617                                         } catch (HistoryException e) {
618                                                 throw new DatabaseException(e);
619                                         }
620                                 }
621
622                                 if (PROFILE)
623                                         profile(s, "imported history items from database to disk workarea");
624
625                                 return result;
626                         }
627                 };
628         }
629
630         /**
631          * Import all history items from graph into a history manager.
632          * 
633          * @param r history resource or initial condition resource source HIS.History resource
634          * @param history destination history
635          * @return <code>true</code> if successful
636          */
637         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
638         {
639                 HistoryResource HIS = HistoryResource.getInstance(graph);
640                 Layer0 L0 = Layer0.getInstance(graph);
641
642                 try {
643                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
644                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
645                                         continue;
646
647                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
648                                 String id = (String) bean.getFieldUnchecked("id");
649                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
650                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );
651                                 history.modify(bean);
652                                 StreamAccessor sa = history.openStream(id, "rw");
653                                 try {
654                                         sa.setValue(arrayBinding, array);
655                                 } finally {
656                                         try {
657                                                 sa.close();
658                                         } catch (AccessorException e) {
659                                         }
660                                 }
661                         }
662
663                         return true;
664                 } catch (AccessorException e) {
665                         throw new DatabaseException(e);
666                 } catch (BindingConstructionException e) {
667                         throw new DatabaseException(e);
668                 } catch (HistoryException e) {
669                         throw new DatabaseException(e);
670                 }
671         }
672
673         /**
674          * Import all history items from database archive into a history manager.
675          * 
676          * @param graph
677          * @param historyResource
678          * @param archive 
679          * @param history
680          * @return <code>true</code> if successful
681          * @throws DatabaseException 
682          */
683         @SuppressWarnings("resource")
684         private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
685                         throws DatabaseException, IOException, HistoryException
686         {
687                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
688                 rab.position(4);
689                 DataContainer dc = DataContainers.readHeader(rab);
690
691                 if (DEBUG)
692                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
693
694                 if (APPLICATION_X_LZ4.equals(dc.format)) {
695                         if (dc.version == ARCHIVE_VERSION_1) {
696                                 InputStream in = new RABInputStream(rab);
697                                 //in = new BufferedInputStream(in);
698                                 in = new LZ4BlockInputStream(in);
699                                 return extractHistoryArchiveTar(graph, history, in, result);
700                         }
701                 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
702                         if (dc.version == ARCHIVE_VERSION_1) {
703                                 InputStream in = null;
704                                 try {
705                                         in = new RABInputStream(rab);
706                                         in = FastLZ.read(in);
707                                         return extractHistoryArchiveTar(graph, history, in, result);
708                                 } finally {
709                                         FileUtils.uncheckedClose(in);
710                                 }
711                         }
712                 }
713                 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
714         }
715
716         /**
717          * Import all history items from graph into a history manager
718          * 
719          * @param graph database access
720          * @param history destination history
721          * @param rab the archive to extract from
722          * @return <code>true</code> if successful
723          * @throws DatabaseException 
724          */
725         private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
726                         throws DatabaseException, IOException, HistoryException
727         {
728                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
729                 TarArchiveInputStream tar = new TarArchiveInputStream(in);
730
731                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
732
733                 Bean lastItem = null;
734                 String lastItemId = null;
735                 while (true) {
736                         TarArchiveEntry entry = tar.getNextTarEntry();
737                         if (entry == null)
738                                 break;
739
740                         String name = entry.getName();
741                         boolean state = name.equals("state");
742                         boolean metadata = name.endsWith(".txt");
743                         boolean data = name.endsWith(".data");
744
745                         if (state) {
746                                 if (result != null) {
747                                         byte[] st = new byte[(int) entry.getSize()];
748                                         tar.read(st);
749                                         result.collectorState = (Bean) beanSerializer.deserialize(st);
750                                         if (DEBUG)
751                                                 System.out.println("READ collector state: " + result.collectorState);
752                                 } else {
753                                         tar.skip(entry.getSize());
754                                 }
755                         } else if (metadata) {
756                                 byte[] md = new byte[(int) entry.getSize()];
757                                 tar.read(md);
758                                 if (DEBUG) {
759                                         System.out.println("READING Item metadata: " + name);
760                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
761                                 }
762
763                                 try {
764                                         Bean bean = (Bean) beanSerializer.deserialize(md);
765                                         if (DEBUG)
766                                                 System.out.println("READ Item metadata: " + bean);
767                                         history.modify(bean);
768                                         lastItem = bean;
769                                         lastItemId = (String) bean.getFieldUnchecked("id");
770                                 } catch (ClassFormatError e) {
771                                         // This is here because LZ4BlockInput/OutputStream seemed to
772                                         // be causing weird deserialization errors to occur for
773                                         // single data items when trying to deserialize beans. The
774                                         // reason being that the serialized data that was read only
775                                         // contains the first 512 bytes of the data. After that all
776                                         // data is zeros, which causes things like this to happen in
777                                         // deserialization:
778                                         //signature: rsstzzzzzzzze_b7b532e9
779                                         //component(o): id = String
780                                         //component(1): variableId = String
781                                         //component(2): format = Temp1
782                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
783                                         //component(3):  = Boolean
784                                         //component(4):  = Boolean
785                                         //component(5):  = Boolean
786                                         //component(devil):  = Boolean
787                                         //component(7):  = Boolean
788                                         //component(music):  = Boolean
789                                         //component(9):  = Boolean
790                                         //component(10):  = Boolean
791                                         //
792                                         // For this reason we've switched the default compression to FastLZ
793                                         // for now. This is reflected in the input format also.
794
795                                         Activator.getDefault().getLog().log(
796                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
797                                 }
798                         } else if (data && lastItem != null) {
799                                 StreamAccessor sa = history.openStream(lastItemId, "rw");
800                                 try {
801                                         if (sa instanceof BinaryObject) {
802                                                 BinaryObject bo = (BinaryObject) sa;
803                                                 RandomAccessBinary output = bo.getBinary();
804                                                 output.position(0L);
805                                                 output.setLength(0L);
806                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
807                                                 if (copiedBytes != entry.getSize()) {
808                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
809                                                 }
810                                         } else {
811                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
812                                         }
813                                 } finally {
814                                         try {
815                                                 sa.close();
816                                                 lastItem = null;
817                                                 lastItemId = null;
818                                         } catch (AccessorException e) {
819                                         }
820                                 }
821                         }
822                 }
823
824                 return true;
825         }
826
827         /**
828          * Get request that clears history
829          * @return cleaning request
830          */
831         public static WriteRequest clear(final Resource history) {
832                 return new WriteRequest() {
833                         @Override
834                         public void perform(WriteGraph graph) throws DatabaseException {
835                                 HistoryResource HIS = HistoryResource.getInstance(graph);
836                                 Layer0 L0 = Layer0.getInstance(graph);
837
838                                 // Separate items format
839                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
840                                 {
841                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
842                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
843                                         if (info!=null) {
844                                                 graph.deny(oldItem, HIS.History_Item_Info);
845                                                 graph.deny(info);
846                                         }
847                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
848                                         if (data!=null) {
849                                                 graph.deny(oldItem, HIS.History_Item_Series);
850                                                 graph.deny(data);
851                                         }
852                                         graph.deny(history, L0.ConsistsOf, oldItem);
853                                         graph.deny(oldItem);
854                                 }
855
856                                 // Archived format
857                                 graph.denyValue(history, HIS.History_archive);
858                         }
859                 };
860         }
861
862
863         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
864                 TarArchiveEntry entry = new TarArchiveEntry(entryName);
865                 entry.setSize(size);
866                 out.putArchiveEntry(entry);
867         }
868
869         /**
870          * Shall not close the underlying RandomAccessBinary.
871          */
872         private static class RABOutputStream extends OutputStream {
873
874                 private final RandomAccessBinary output;
875
876                 public RABOutputStream(RandomAccessBinary output) {
877                         this.output = output;
878                 }
879
880                 @Override
881                 public void write(byte[] b, int off, int len) throws IOException {
882                         output.write(b, off, len);
883                 }
884
885                 @Override
886                 public void write(int b) throws IOException {
887                         output.write(b);
888                 }
889
890         }
891
892         /**
893          * Shall not close the underlying RandomAccessBinary.
894          */
895         private static class RABInputStream extends InputStream {
896
897                 private final RandomAccessBinary input;
898
899                 public RABInputStream(RandomAccessBinary input) {
900                         this.input = input;
901                 }
902
903                 @Override
904                 public int available() throws IOException {
905                         long avail = input.length() - input.position();
906                         if ((avail & 0xffffffff00000000L) != 0)
907                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
908                         return (int) avail;
909                 }
910
911                 @Override
912                 public int read() throws IOException {
913                         try {
914                                 return input.readUnsignedByte();
915                         } catch (EOFException e) {
916                                 return -1;
917                         }
918                 }
919
920                 @Override
921                 public int read(byte[] b, int off, int len) throws IOException {
922                         long available = input.length() - input.position();
923                         if (available == 0)
924                                 return -1;
925                         int l = (int) Math.min(available, (long) len);
926                         input.readFully(b, off, l);
927                         return l;
928                 }
929
930                 @Override
931                 public long skip(long n) throws IOException {
932                         long count = 0;
933                         while (count < n) {
934                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
935                                 count += input.skipBytes(l);
936                         }
937                         return count;
938                 }
939
940         }
941
942         private static long profile(Long t1, String string) {
943                 if (PROFILE) {
944                         long t2 = System.nanoTime();
945                         long delta = t1 == null ? 0 : t2-t1;
946                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
947                         return t2;
948                 }
949                 return 0L;
950         }
951
952         public static void truncateHistory(double toBeforeTime, HistoryManager history, Collector collector) throws AccessorException, BindingException, HistoryException {
953                 Double t = toBeforeTime;
954                 Binding timeBinding = null;
955                 CollectorState state = collector != null ? (CollectorState) collector.getState() : null;
956
957                 Bean[] items = history.getItems();
958                 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
959
960                 for (Bean item : items) {
961                         String id = (String) item.getField("id");
962                         StreamAccessor sa = history.openStream(id, "w");
963                         try {
964                                 Stream s = new Stream(sa);
965                                 timeBinding = s.timeBinding;
966                                 int currentSize = sa.size();
967                                 int index = s.binarySearch(timeBinding, toBeforeTime);
968                                 int newSize = truncationSize(index);
969                                 if (newSize < currentSize) {
970                                         //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
971                                         sa.setSize(newSize);
972
973                                         if (state != null) {
974                                                 Object prevTime  = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
975                                                 Object prevValue = newSize > 0 ? sa.get(newSize - 1, s.valueBinding) : null;
976                                                 boolean isNan = isNaN(prevValue);
977
978                                                 VariableState vs = state.values.get(id);
979                                                 if (vs != null && vs.value != null && prevValue != null) {
980                                                         vs.value.setValue(vs.value.getBinding(), prevValue);
981                                                         vs.isValid = true;
982                                                         vs.isNan = isNan;
983                                                 }
984
985                                                 CollectorState.Item is = state.itemStates.get(id);
986                                                 if (is != null) {
987                                                         is.firstTime = Double.NaN;
988                                                         is.firstValue = null;
989                                                         is.currentTime = toTime(prevTime);
990                                                         is.currentValue = prevValue != null ? new MutableVariant(s.valueBinding, prevValue) : null;
991                                                         is.isNaN = isNaN(is.currentValue);
992                                                         is.isValid = is.currentValue != null;
993                                                         is.sum = Double.NaN;
994                                                         is.count = 0;
995                                                         is.ooDeadband = false;
996                                                         is.firstDisabledTime = Double.NaN;
997                                                         is.lastDisabledTime = Double.NaN;
998                                                         is.median = null;
999                                                 }
1000                                         }
1001                                 }
1002                         } finally {
1003                                 sa.close();
1004                         }
1005                 }
1006
1007                 if (timeBinding != null && state != null) {
1008                         state.time.setValue(timeBinding, t);
1009                         state.dT = 1.0;
1010                         collector.setState(state);
1011                 }
1012         }
1013
1014         private static double toTime(Object time) {
1015                 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1016         }
1017
1018         private static boolean isNaN(Object value) {
1019                 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1020         }
1021
1022         private static int truncationSize(int binarySearchResult) {
1023                 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
1024         }
1025
1026 }