]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - archive implementation
12  *******************************************************************************/
13 package org.simantics.simulation.history;
14
15 import gnu.trove.map.TObjectLongMap;
16 import gnu.trove.map.hash.TObjectLongHashMap;
17
18 import java.io.EOFException;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.Map;
25
26 import net.jpountz.lz4.LZ4BlockInputStream;
27 import net.jpountz.lz4.LZ4BlockOutputStream;
28
29 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
30 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
31 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
32 import org.eclipse.core.runtime.IStatus;
33 import org.eclipse.core.runtime.Status;
34 import org.simantics.databoard.Bindings;
35 import org.simantics.databoard.accessor.StreamAccessor;
36 import org.simantics.databoard.accessor.binary.BinaryObject;
37 import org.simantics.databoard.accessor.error.AccessorException;
38 import org.simantics.databoard.binding.ArrayBinding;
39 import org.simantics.databoard.binding.Binding;
40 import org.simantics.databoard.binding.RecordBinding;
41 import org.simantics.databoard.binding.error.BindingConstructionException;
42 import org.simantics.databoard.binding.error.BindingException;
43 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
44 import org.simantics.databoard.binding.mutable.Variant;
45 import org.simantics.databoard.container.DataContainer;
46 import org.simantics.databoard.container.DataContainers;
47 import org.simantics.databoard.serialization.Serializer;
48 import org.simantics.databoard.util.Bean;
49 import org.simantics.databoard.util.binary.OutputStreamWriteable;
50 import org.simantics.databoard.util.binary.RandomAccessBinary;
51 import org.simantics.db.ReadGraph;
52 import org.simantics.db.Resource;
53 import org.simantics.db.WriteGraph;
54 import org.simantics.db.common.request.UniqueRead;
55 import org.simantics.db.common.request.WriteRequest;
56 import org.simantics.db.exception.DatabaseException;
57 import org.simantics.db.exception.ServiceNotFoundException;
58 import org.simantics.db.request.Read;
59 import org.simantics.fastlz.FastLZ;
60 import org.simantics.history.HistoryException;
61 import org.simantics.history.HistoryManager;
62 import org.simantics.history.ItemManager;
63 import org.simantics.history.util.Stream;
64 import org.simantics.history.util.ValueBand;
65 import org.simantics.layer0.Layer0;
66 import org.simantics.simulation.Activator;
67 import org.simantics.simulation.ontology.HistoryResource;
68 import org.simantics.simulation.ontology.SimulationResource;
69 import org.simantics.utils.FileUtils;
70
71 /**
72  * @author Toni Kalajainen
73  * @author Tuukka Lehtonen
74  */
75 public class HistoryUtil {
76
77         private static final boolean DEBUG = false;
78         private static final boolean PROFILE = true;
79
80         private enum Compression {
81                 FLZ,
82                 LZ4,
83         }
84
85         private static final Compression USED_COMPRESSION = Compression.FLZ;
86
87         private static final String APPLICATION_X_LZ4 = "application/x-lz4";
88         private static final String APPLICATION_X_FLZ = "application/x-flz";
89         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
90
91         private static class ArchivedHistory {
92                 public long totalSampleSize = 0L;
93                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
94         }
95
96         /**
97          * Create request that exports history to graph.
98          * 
99          * If removeOldUnused is true, items that do not exist in source history
100          * are deleted from graph.
101          * 
102          * If item already exists in the graph, it is overwritten.
103          * 
104          * @param history source history
105          * @param g write graph
106          * @param historyResource Instance of HIS.History to write data to
107          * @param removeOldUnused
108          * @param timeShift adjust time values by this value 
109          * @param from time
110          * @param end time
111          * @return WriteRequest
112          */
113         public static WriteRequest export(final HistoryManager history, 
114                         final Resource historyResource,
115                         final boolean removeOldUnused,
116                         final Double timeShift, 
117                         final double from, final double end) 
118         {
119                 return new WriteRequest() {
120                         public void perform(WriteGraph graph) throws DatabaseException {
121                                 HistoryResource HIS = HistoryResource.getInstance(graph);
122                                 Layer0 L0 = Layer0.getInstance(graph);
123                                 long totalSampleSize = 0L;
124                                 long s = System.nanoTime();
125
126                                 try {
127                                         Bean[] items = history.getItems();
128                                         if (PROFILE)
129                                                 profile(null, "exporting " + items.length + " history items to database");
130
131                                         ItemManager im = new ItemManager( items );
132                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
133                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
134                                         {
135                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
136                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
137                                                 if ( bean == null ) continue;
138                                                 String id = (String) bean.getField("id");
139                                                 
140                                                 if (!im.exists(id)) {
141                                                         // There is item in graph that does not exist in source history
142                                                         if ( removeOldUnused ) {
143                                                                 graph.deny(oldItem);
144                                                         }
145                                                 } else {
146                                                         // Item already exists, to-be-overwritten
147                                                         oldResourceMap.put(id, oldItem);
148                                                 }
149                                         }
150                                         
151                                         for (Bean newItem : im.values())
152                                         {
153                                                 //System.out.println("WRITING ITEM: " + newItem);
154                                                 String id = (String) newItem.getField("id");
155                                                 Resource historyItemResource = oldResourceMap.get(id);
156                                                 Resource data = null;
157
158                                                 if ( historyItemResource==null ) {
159                                                         historyItemResource = graph.newResource();
160                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
161                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
162                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);
163                                                 } else {
164                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
165                                                 }
166                                                 
167                                                 Variant v = new Variant(newItem.getBinding(), newItem);
168                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
169                                                 
170                                                 if (data == null) {
171                                                         data = graph.newResource();
172                                                         graph.claim(data, L0.InstanceOf, L0.Variant);
173                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);
174                                                 } else {
175                                                         graph.denyValue(data);
176                                                 }
177                                                 
178                                                 
179                                                 // Write stream
180                                                 StreamAccessor sa = history.openStream(id, "r");
181                                                 try {
182                                                         //System.out.println("WRITING stream of size=" + sa.size());
183                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
184                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
185                                                         Integer constantSampleSize = serializer.getConstantSize();
186                                                         ValueBand vb = new ValueBand(sampleBinding);
187                                                         Stream stream = new Stream(sa, sampleBinding);
188                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
189                                                         Object array = null;
190                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);
191
192                                                         int count = 0;
193                                                         readArray: {
194                                                                 // Start index
195                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            
196                                                                 if ( startIndex < -stream.count() ) break readArray;
197                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;
198                                                                 if ( startIndex == -1 ) startIndex = 0;
199                                                                 
200                                                                 // End index
201                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
202                                                                 if ( endIndex == -1 ) break readArray;
203                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;
204                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
205                                                                 if ( endIndex<startIndex ) break readArray;
206                                                         
207                                                                 // Write sample count
208                                                                 count = endIndex - startIndex + 1;
209                                                                 array = arrayBinding.create(count);
210                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
211                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
212                                                                 for (int i=0; i<count; i++) {
213                                                                         Object sample = sa.get(i+startIndex, sampleBinding);
214                                                                         
215                                                                         // Adjust time
216                                                                         if ( hasTimeShift ) {
217                                                                                 vb.setSample(sample);
218                                                                                 if ( vb.hasTime() ) {
219                                                                                         double newTime = vb.getTimeDouble() + timeShift;
220                                                                                         vb.setTime(Bindings.DOUBLE, newTime);
221                                                                                 }
222                                                                                 if ( vb.hasEndTime() ) {
223                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;
224                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);
225                                                                                 }
226                                                                         }
227                                                                         //System.out.println("\t#" + i + ": sample=" + sample);
228                                                                         arrayBinding.set(array, i, sample);
229                                                                 }
230                                                         }
231                                                         if (array==null) array = arrayBinding.create();
232                                                         Variant v2 = new Variant(arrayBinding, array);
233                                                         graph.claimValue(data, v2, Bindings.VARIANT);
234
235                                                         if (constantSampleSize != null) {
236                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
237                                                                 //System.out.println("item sample size: " + itemSampleSize);
238                                                                 totalSampleSize += itemSampleSize;
239                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
240                                                         }
241                                                 } catch (AccessorException e) {
242                                                         throw new DatabaseException(e);
243                                                 } finally {
244                                                         try {
245                                                                 sa.close();
246                                                         } catch (AccessorException e) {
247                                                         }
248                                                 }
249                                         }
250
251                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
252
253                                         if (PROFILE)
254                                                 profile(s, "exported " + items.length + " history items to database");
255
256                                 } catch (HistoryException e) {
257                                         throw new DatabaseException( e );
258                                 } catch (BindingException e) {
259                                         throw new DatabaseException( e );
260                                 } catch (ServiceNotFoundException e) {
261                                         throw new DatabaseException( e );
262                                 }
263                         }
264                 };
265         }
266
267         /**
268          * Create request that exports history to graph.
269          * 
270          * If item already exists in the graph, it is overwritten.
271          * 
272          * @param history
273          *            source history
274          * @param collectorState
275          *            complete dump of the source history collector state or
276          *            <code>null</code> to skip collector state saving
277          * @param historyResource
278          *            Instance of HIS.History to write data to
279          * @param timeShift
280          *            adjust time values by this value
281          * @param from
282          *            time
283          * @param end
284          *            time
285          * @return WriteRequest
286          */
287         public static WriteRequest exportArchive(
288                         final HistoryManager history,
289                         final Bean collectorState,
290                         final Resource historyResource,
291                         final Double timeShift, 
292                         final double from,
293                         final double end) 
294         {
295                 return new WriteRequest() {
296                         public void perform(WriteGraph graph) throws DatabaseException {
297                                 HistoryResource HIS = HistoryResource.getInstance(graph);
298                                 Layer0 L0 = Layer0.getInstance(graph);
299
300                                 long s = System.nanoTime();
301                                 if (PROFILE)
302                                         profile(null, "archiving history items to database");
303
304                                 // Remove all possibly existing old items in the history.
305                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
306                                         if (graph.isInstanceOf(entity, HIS.History_Item))
307                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
308
309                                 // Create new literal for history archival
310                                 //graph.deny(historyResource, HIS.History_archive);
311                                 graph.denyValue(historyResource, HIS.History_archive);
312                                 Resource archiveLiteral = graph.newResource();
313                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
314                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
315
316                                 OutputStream closeable = null;
317                                 try {
318                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
319                                         rab.position(0);
320                                         rab.skipBytes(4);
321
322                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
323
324                                         rab.position(0L);
325                                         rab.writeInt((int)(rab.length() - 4L));
326
327                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
328
329                                         if (PROFILE)
330                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
331
332                                 } catch (HistoryException e) {
333                                         throw new DatabaseException( e );
334                                 } catch (IOException e) {
335                                         throw new DatabaseException( e );
336                                 } finally {
337                                         FileUtils.uncheckedClose(closeable);
338                                 }
339                         }
340                 };
341         }
342
343         private static ArchivedHistory exportCompressedArchive(
344                         final HistoryManager history,
345                         final Bean collectorState,
346                         final Resource historyResource,
347                         final Double timeShift, 
348                         final double from,
349                         final double end,
350                         RandomAccessBinary rab)
351                                         throws IOException, HistoryException
352         {
353                 switch (USED_COMPRESSION) {
354                 case FLZ:
355                         return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
356                 case LZ4:
357                         return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
358                 default:
359                         throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
360                 }
361         }
362
363         private static ArchivedHistory exportArchiveLZ4(
364                         final HistoryManager history,
365                         final Bean collectorState,
366                         final Resource historyResource,
367                         final Double timeShift, 
368                         final double from,
369                         final double end,
370                         RandomAccessBinary rab)
371                                         throws IOException, HistoryException
372         {
373                 OutputStream closeable = null;
374                 try {
375                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
376                         closeable = new RABOutputStream( rab );
377                         closeable = new LZ4BlockOutputStream( closeable );
378
379                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
380
381                         closeable.flush();
382                         closeable.close();
383                         closeable = null;
384
385                         return archive;
386                 } finally {
387                         FileUtils.uncheckedClose(closeable);
388                 }
389         }
390
391         @SuppressWarnings("resource")
392         private static ArchivedHistory exportArchiveFLZ(
393                         final HistoryManager history,
394                         final Bean collectorState,
395                         final Resource historyResource,
396                         final Double timeShift, 
397                         final double from,
398                         final double end,
399                         RandomAccessBinary rab)
400                                         throws IOException, HistoryException
401         {
402                 OutputStream closeable = null;
403                 try {
404                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
405                         closeable = new RABOutputStream( rab );
406                         closeable = FastLZ.write( closeable );
407
408                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
409
410                         closeable.flush();
411                         closeable.close();
412                         closeable = null;
413
414                         return archive;
415                 } finally {
416                         FileUtils.uncheckedClose(closeable);
417                 }
418         }
419
420         /**
421          * Store the specified history manager's contents into a single archive
422          * file.
423          * 
424          * @param outputStream
425          *            the output stream where the history archive is written
426          * @param history
427          *            source history
428          * @param collectorState
429          *            complete state of the history collector at the time of saving
430          *            or <code>null</code> to not save any state
431          * @param timeShift
432          *            adjust time values by this value
433          * @param from
434          *            time
435          * @param end
436          *            time
437          * @return ArchivedHistory instance describing the created archive
438          * @throws HistoryException
439          *             when there's a problem with reading the history data from the
440          *             history work area
441          * @throws IOException
442          *             when there's a problem writing the resulting history archive
443          *             file
444          */
445         private static ArchivedHistory exportArchive(
446                         OutputStream outputStream,
447                         HistoryManager history,
448                         Bean collectorState,
449                         Double timeShift,
450                         double from,
451                         double end)
452                                         throws IOException, HistoryException
453         {
454                 ArchivedHistory result = new ArchivedHistory();
455
456                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
457                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
458
459                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
460                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
461
462                 try {
463                         if (collectorState != null) {
464                                 // Write complete collector state into the archive
465                                 if (DEBUG)
466                                         System.out.println("WRITING collector state: " + collectorState);
467                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
468                                 putNextEntry(out, "state", serializedCollectorState.length);
469                                 out.write(serializedCollectorState);
470                                 out.closeArchiveEntry();
471                         }
472
473                         ItemManager im = new ItemManager( history.getItems() );
474
475                         for (Bean item : im.values()) {
476                                 if (DEBUG)
477                                         System.out.println("STORING ITEM: " + item);
478                                 String id = (String) item.getField("id");
479
480                                 // Write data stream metadata
481                                 byte[] metadata = beanSerializer.serialize( item );
482                                 putNextEntry(out, id + ".txt", metadata.length);
483                                 out.write(metadata);
484                                 out.closeArchiveEntry();
485                                 if (DEBUG)
486                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
487
488
489                                 // Write data stream as a separate entry
490                                 StreamAccessor sa = history.openStream(id, "r");
491                                 try {
492                                         if (DEBUG)
493                                                 System.out.println("STREAM SIZE=" + sa.size());
494                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
495                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
496                                         Integer constantSampleSize = sampleSerializer.getConstantSize();
497
498                                         if (constantSampleSize == null) {
499                                                 // Variable length samples - no support for now.
500                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
501                                                 continue;
502                                         } else {
503                                                 ValueBand vb = new ValueBand(sampleBinding);
504                                                 Stream stream = new Stream(sa, sampleBinding);
505                                                 if (DEBUG)
506                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);
507
508                                                 readArray: {
509                                                         // Start index
510                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
511                                                         if ( startIndex < -stream.count() ) break readArray;
512                                                         if ( startIndex<0 ) startIndex = -2-startIndex;
513                                                         if ( startIndex == -1 ) startIndex = 0;
514
515                                                         // End index
516                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
517                                                         if ( endIndex == -1 ) break readArray;
518                                                         if ( endIndex<0 ) endIndex = -1-endIndex;
519                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;
520                                                         if ( endIndex<startIndex ) break readArray;
521
522                                                         // Written sample count
523                                                         int count = endIndex - startIndex + 1;
524                                                         if (DEBUG)
525                                                                 System.out.println("WRITTEN sample count=" + count);
526
527                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
528                                                         if (DEBUG)
529                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);
530                                                         putNextEntry(out, id + ".data", savedSamplesSize);
531
532                                                         for (int i=0; i<count; i++) {
533                                                                 Object sample = sa.get(i+startIndex, sampleBinding);
534
535                                                                 // Adjust time
536                                                                 if ( hasTimeShift ) {
537                                                                         vb.setSample(sample);
538                                                                         if ( vb.hasTime() ) {
539                                                                                 double newTime = vb.getTimeDouble() + timeShift;
540                                                                                 vb.setTime(Bindings.DOUBLE, newTime);
541                                                                         }
542                                                                         if ( vb.hasEndTime() ) {
543                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;
544                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);
545                                                                         }
546                                                                 }
547                                                                 //System.out.println("\t#" + i + ": sample=" + sample);
548                                                                 sampleSerializer.serialize(dataOutput, sample);
549                                                         }
550                                                         out.closeArchiveEntry();
551
552                                                         result.itemSizes.put(item, savedSamplesSize);
553                                                         result.totalSampleSize += savedSamplesSize;
554                                                 }
555                                         }
556                                 } catch (AccessorException e) {
557                                         throw new IOException(e);
558                                 } finally {
559                                         try {
560                                                 sa.close();
561                                         } catch (AccessorException e) {
562                                         }
563                                 }
564                                 
565                         }
566
567                         return result;
568                 } catch (BindingException e) {
569                         throw new HistoryException(e);
570                 } finally {
571                         out.finish();
572                 }
573         }
574
575         /**
576          * Import all history items from graph into a history manager.
577          * 
578          * @param r history resource or initial condition resource source HIS.History resource
579          * @param history destination history
580          * @return read request that always returns a HistoryImportResult instance
581          */
582         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
583         {
584                 return new UniqueRead<HistoryImportResult>() {
585                         @Override
586                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
587                                 HistoryImportResult result = new HistoryImportResult();
588
589                                 HistoryResource HIS = HistoryResource.getInstance(graph);
590                                 SimulationResource SIM = SimulationResource.getInstance(graph);
591                                 Resource historyResource = r;
592
593                                 if (!graph.isInstanceOf(historyResource, HIS.History))
594                                         historyResource = graph.getPossibleObject(r, SIM.State_History);
595                                 if (historyResource == null)
596                                         return result;
597                                 if (!graph.isInstanceOf(historyResource, HIS.History))
598                                         return result;
599
600                                 long s = System.nanoTime();
601
602                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
603                                 if (archive == null) {
604                                         if (PROFILE)
605                                                 profile(null, "importing history items from old database format to disk workarea");
606                                         importHistoryItems(graph, historyResource, history);
607                                 } else {
608                                         try {
609                                                 if (PROFILE)
610                                                         profile(null, "importing history items from archived format to disk workarea");
611                                                 importHistoryArchive(graph, historyResource, archive, history, result);
612                                         } catch (IOException e) {
613                                                 throw new DatabaseException(e);
614                                         } catch (HistoryException e) {
615                                                 throw new DatabaseException(e);
616                                         }
617                                 }
618
619                                 if (PROFILE)
620                                         profile(s, "imported history items from database to disk workarea");
621
622                                 return result;
623                         }
624                 };
625         }
626
627         /**
628          * Import all history items from graph into a history manager.
629          * 
630          * @param r history resource or initial condition resource source HIS.History resource
631          * @param history destination history
632          * @return <code>true</code> if successful
633          */
634         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
635         {
636                 HistoryResource HIS = HistoryResource.getInstance(graph);
637                 Layer0 L0 = Layer0.getInstance(graph);
638
639                 try {
640                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
641                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
642                                         continue;
643
644                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
645                                 String id = (String) bean.getFieldUnchecked("id");
646                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
647                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );
648                                 history.modify(bean);
649                                 StreamAccessor sa = history.openStream(id, "rw");
650                                 try {
651                                         sa.setValue(arrayBinding, array);
652                                 } finally {
653                                         try {
654                                                 sa.close();
655                                         } catch (AccessorException e) {
656                                         }
657                                 }
658                         }
659
660                         return true;
661                 } catch (AccessorException e) {
662                         throw new DatabaseException(e);
663                 } catch (BindingConstructionException e) {
664                         throw new DatabaseException(e);
665                 } catch (HistoryException e) {
666                         throw new DatabaseException(e);
667                 }
668         }
669
670         /**
671          * Import all history items from database archive into a history manager.
672          * 
673          * @param graph
674          * @param historyResource
675          * @param archive 
676          * @param history
677          * @return <code>true</code> if successful
678          * @throws DatabaseException 
679          */
680         @SuppressWarnings("resource")
681         private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
682                         throws DatabaseException, IOException, HistoryException
683         {
684                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
685                 rab.position(4);
686                 DataContainer dc = DataContainers.readHeader(rab);
687
688                 if (DEBUG)
689                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
690
691                 if (APPLICATION_X_LZ4.equals(dc.format)) {
692                         if (dc.version == ARCHIVE_VERSION_1) {
693                                 InputStream in = new RABInputStream(rab);
694                                 //in = new BufferedInputStream(in);
695                                 in = new LZ4BlockInputStream(in);
696                                 return extractHistoryArchiveTar(graph, history, in, result);
697                         }
698                 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
699                         if (dc.version == ARCHIVE_VERSION_1) {
700                                 InputStream in = null;
701                                 try {
702                                         in = new RABInputStream(rab);
703                                         in = FastLZ.read(in);
704                                         return extractHistoryArchiveTar(graph, history, in, result);
705                                 } finally {
706                                         FileUtils.uncheckedClose(in);
707                                 }
708                         }
709                 }
710                 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
711         }
712
713         /**
714          * Import all history items from graph into a history manager
715          * 
716          * @param graph database access
717          * @param history destination history
718          * @param rab the archive to extract from
719          * @return <code>true</code> if successful
720          * @throws DatabaseException 
721          */
722         private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
723                         throws DatabaseException, IOException, HistoryException
724         {
725                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
726                 TarArchiveInputStream tar = new TarArchiveInputStream(in);
727
728                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
729
730                 Bean lastItem = null;
731                 String lastItemId = null;
732                 while (true) {
733                         TarArchiveEntry entry = tar.getNextTarEntry();
734                         if (entry == null)
735                                 break;
736
737                         String name = entry.getName();
738                         boolean state = name.equals("state");
739                         boolean metadata = name.endsWith(".txt");
740                         boolean data = name.endsWith(".data");
741
742                         if (state) {
743                                 if (result != null) {
744                                         byte[] st = new byte[(int) entry.getSize()];
745                                         tar.read(st);
746                                         result.collectorState = (Bean) beanSerializer.deserialize(st);
747                                         if (DEBUG)
748                                                 System.out.println("READ collector state: " + result.collectorState);
749                                 } else {
750                                         tar.skip(entry.getSize());
751                                 }
752                         } else if (metadata) {
753                                 byte[] md = new byte[(int) entry.getSize()];
754                                 tar.read(md);
755                                 if (DEBUG) {
756                                         System.out.println("READING Item metadata: " + name);
757                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
758                                 }
759
760                                 try {
761                                         Bean bean = (Bean) beanSerializer.deserialize(md);
762                                         if (DEBUG)
763                                                 System.out.println("READ Item metadata: " + bean);
764                                         history.modify(bean);
765                                         lastItem = bean;
766                                         lastItemId = (String) bean.getFieldUnchecked("id");
767                                 } catch (ClassFormatError e) {
768                                         // This is here because LZ4BlockInput/OutputStream seemed to
769                                         // be causing weird deserialization errors to occur for
770                                         // single data items when trying to deserialize beans. The
771                                         // reason being that the serialized data that was read only
772                                         // contains the first 512 bytes of the data. After that all
773                                         // data is zeros, which causes things like this to happen in
774                                         // deserialization:
775                                         //signature: rsstzzzzzzzze_b7b532e9
776                                         //component(o): id = String
777                                         //component(1): variableId = String
778                                         //component(2): format = Temp1
779                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
780                                         //component(3):  = Boolean
781                                         //component(4):  = Boolean
782                                         //component(5):  = Boolean
783                                         //component(devil):  = Boolean
784                                         //component(7):  = Boolean
785                                         //component(music):  = Boolean
786                                         //component(9):  = Boolean
787                                         //component(10):  = Boolean
788                                         //
789                                         // For this reason we've switched the default compression to FastLZ
790                                         // for now. This is reflected in the input format also.
791
792                                         Activator.getDefault().getLog().log(
793                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
794                                 }
795                         } else if (data && lastItem != null) {
796                                 StreamAccessor sa = history.openStream(lastItemId, "rw");
797                                 try {
798                                         if (sa instanceof BinaryObject) {
799                                                 BinaryObject bo = (BinaryObject) sa;
800                                                 RandomAccessBinary output = bo.getBinary();
801                                                 output.position(0L);
802                                                 output.setLength(0L);
803                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
804                                                 if (copiedBytes != entry.getSize()) {
805                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
806                                                 }
807                                         } else {
808                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
809                                         }
810                                 } finally {
811                                         try {
812                                                 sa.close();
813                                                 lastItem = null;
814                                                 lastItemId = null;
815                                         } catch (AccessorException e) {
816                                         }
817                                 }
818                         }
819                 }
820
821                 return true;
822         }
823
824         /**
825          * Get request that clears history
826          * @return cleaning request
827          */
828         public static WriteRequest clear(final Resource history) {
829                 return new WriteRequest() {
830                         @Override
831                         public void perform(WriteGraph graph) throws DatabaseException {
832                                 HistoryResource HIS = HistoryResource.getInstance(graph);
833                                 Layer0 L0 = Layer0.getInstance(graph);
834
835                                 // Separate items format
836                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
837                                 {
838                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
839                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
840                                         if (info!=null) {
841                                                 graph.deny(oldItem, HIS.History_Item_Info);
842                                                 graph.deny(info);
843                                         }
844                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
845                                         if (data!=null) {
846                                                 graph.deny(oldItem, HIS.History_Item_Series);
847                                                 graph.deny(data);
848                                         }
849                                         graph.deny(history, L0.ConsistsOf, oldItem);
850                                         graph.deny(oldItem);
851                                 }
852
853                                 // Archived format
854                                 graph.denyValue(history, HIS.History_archive);
855                         }
856                 };
857         }
858
859
860         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
861                 TarArchiveEntry entry = new TarArchiveEntry(entryName);
862                 entry.setSize(size);
863                 out.putArchiveEntry(entry);
864         }
865
866         /**
867          * Shall not close the underlying RandomAccessBinary.
868          */
869         private static class RABOutputStream extends OutputStream {
870
871                 private final RandomAccessBinary output;
872
873                 public RABOutputStream(RandomAccessBinary output) {
874                         this.output = output;
875                 }
876
877                 @Override
878                 public void write(byte[] b, int off, int len) throws IOException {
879                         output.write(b, off, len);
880                 }
881
882                 @Override
883                 public void write(int b) throws IOException {
884                         output.write(b);
885                 }
886
887         }
888
889         /**
890          * Shall not close the underlying RandomAccessBinary.
891          */
892         private static class RABInputStream extends InputStream {
893
894                 private final RandomAccessBinary input;
895
896                 public RABInputStream(RandomAccessBinary input) {
897                         this.input = input;
898                 }
899
900                 @Override
901                 public int available() throws IOException {
902                         long avail = input.length() - input.position();
903                         if ((avail & 0xffffffff00000000L) != 0)
904                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
905                         return (int) avail;
906                 }
907
908                 @Override
909                 public int read() throws IOException {
910                         try {
911                                 return input.readUnsignedByte();
912                         } catch (EOFException e) {
913                                 return -1;
914                         }
915                 }
916
917                 @Override
918                 public int read(byte[] b, int off, int len) throws IOException {
919                         long available = input.length() - input.position();
920                         if (available == 0)
921                                 return -1;
922                         int l = (int) Math.min(available, (long) len);
923                         input.readFully(b, off, l);
924                         return l;
925                 }
926
927                 @Override
928                 public long skip(long n) throws IOException {
929                         long count = 0;
930                         while (count < n) {
931                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
932                                 count += input.skipBytes(l);
933                         }
934                         return count;
935                 }
936
937         }
938
939         private static long profile(Long t1, String string) {
940                 if (PROFILE) {
941                         long t2 = System.nanoTime();
942                         long delta = t1 == null ? 0 : t2-t1;
943                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
944                         return t2;
945                 }
946                 return 0L;
947         }
948
949 }