27f2bf895bff382b0ae05cf251cd160425249739
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2014 Association for Decentralized Information Management in
3  * Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - archive implementation
12  *******************************************************************************/
13 package org.simantics.simulation.history;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Arrays;
20 import java.util.HashMap;
21 import java.util.Map;
22
23 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
24 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
25 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
26 import org.eclipse.core.runtime.IStatus;
27 import org.eclipse.core.runtime.Status;
28 import org.simantics.databoard.Bindings;
29 import org.simantics.databoard.accessor.StreamAccessor;
30 import org.simantics.databoard.accessor.binary.BinaryObject;
31 import org.simantics.databoard.accessor.error.AccessorException;
32 import org.simantics.databoard.binding.ArrayBinding;
33 import org.simantics.databoard.binding.Binding;
34 import org.simantics.databoard.binding.RecordBinding;
35 import org.simantics.databoard.binding.error.BindingConstructionException;
36 import org.simantics.databoard.binding.error.BindingException;
37 import org.simantics.databoard.binding.impl.ObjectArrayBinding;
38 import org.simantics.databoard.binding.mutable.MutableVariant;
39 import org.simantics.databoard.binding.mutable.Variant;
40 import org.simantics.databoard.container.DataContainer;
41 import org.simantics.databoard.container.DataContainers;
42 import org.simantics.databoard.serialization.Serializer;
43 import org.simantics.databoard.util.Bean;
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
45 import org.simantics.databoard.util.binary.RandomAccessBinary;
46 import org.simantics.db.ReadGraph;
47 import org.simantics.db.Resource;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.common.request.UniqueRead;
50 import org.simantics.db.common.request.WriteRequest;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ServiceNotFoundException;
53 import org.simantics.db.request.Read;
54 import org.simantics.fastlz.FastLZ;
55 import org.simantics.history.HistoryException;
56 import org.simantics.history.HistoryManager;
57 import org.simantics.history.ItemManager;
58 import org.simantics.history.impl.CollectorImpl;
59 import org.simantics.history.impl.CollectorState;
60 import org.simantics.history.impl.CollectorState.VariableState;
61 import org.simantics.history.util.Stream;
62 import org.simantics.history.util.ValueBand;
63 import org.simantics.history.util.WeightedMedian;
64 import org.simantics.layer0.Layer0;
65 import org.simantics.simulation.Activator;
66 import org.simantics.simulation.ontology.HistoryResource;
67 import org.simantics.simulation.ontology.SimulationResource;
68 import org.simantics.utils.FileUtils;
69
70 import gnu.trove.map.TObjectLongMap;
71 import gnu.trove.map.hash.TObjectLongHashMap;
72 import net.jpountz.lz4.LZ4BlockInputStream;
73 import net.jpountz.lz4.LZ4BlockOutputStream;
74
75 /**
76  * @author Toni Kalajainen
77  * @author Tuukka Lehtonen
78  */
79 public class HistoryUtil {
80
81         private static final boolean DEBUG = false;
82         private static final boolean PROFILE = true;
83
84         private enum Compression {
85                 FLZ,
86                 LZ4,
87         }
88
89         private static final Compression USED_COMPRESSION = Compression.FLZ;
90
91         private static final String APPLICATION_X_LZ4 = "application/x-lz4";
92         private static final String APPLICATION_X_FLZ = "application/x-flz";
93         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream
94
95         private static class ArchivedHistory {
96                 public long totalSampleSize = 0L;
97                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();
98         }
99
100         /**
101          * Create request that exports history to graph.
102          * 
103          * If removeOldUnused is true, items that do not exist in source history
104          * are deleted from graph.
105          * 
106          * If item already exists in the graph, it is overwritten.
107          * 
108          * @param history source history
109          * @param g write graph
110          * @param historyResource Instance of HIS.History to write data to
111          * @param removeOldUnused
112          * @param timeShift adjust time values by this value 
113          * @param from time
114          * @param end time
115          * @return WriteRequest
116          */
117         public static WriteRequest export(final HistoryManager history, 
118                         final Resource historyResource,
119                         final boolean removeOldUnused,
120                         final Double timeShift, 
121                         final double from, final double end) 
122         {
123                 return new WriteRequest() {
124                         public void perform(WriteGraph graph) throws DatabaseException {
125                                 HistoryResource HIS = HistoryResource.getInstance(graph);
126                                 Layer0 L0 = Layer0.getInstance(graph);
127                                 long totalSampleSize = 0L;
128                                 long s = System.nanoTime();
129
130                                 try {
131                                         Bean[] items = history.getItems();
132                                         if (PROFILE)
133                                                 profile(null, "exporting " + items.length + " history items to database");
134
135                                         ItemManager im = new ItemManager( items );
136                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();
137                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))
138                                         {
139                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
140                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
141                                                 if ( bean == null ) continue;
142                                                 String id = (String) bean.getField("id");
143                                                 
144                                                 if (!im.exists(id)) {
145                                                         // There is item in graph that does not exist in source history
146                                                         if ( removeOldUnused ) {
147                                                                 graph.deny(oldItem);
148                                                         }
149                                                 } else {
150                                                         // Item already exists, to-be-overwritten
151                                                         oldResourceMap.put(id, oldItem);
152                                                 }
153                                         }
154                                         
155                                         for (Bean newItem : im.values())
156                                         {
157                                                 //System.out.println("WRITING ITEM: " + newItem);
158                                                 String id = (String) newItem.getField("id");
159                                                 Resource historyItemResource = oldResourceMap.get(id);
160                                                 Resource data = null;
161
162                                                 if ( historyItemResource==null ) {
163                                                         historyItemResource = graph.newResource();
164                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);
165                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);
166                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);
167                                                 } else {
168                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);
169                                                 }
170                                                 
171                                                 Variant v = new Variant(newItem.getBinding(), newItem);
172                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);
173                                                 
174                                                 if (data == null) {
175                                                         data = graph.newResource();
176                                                         graph.claim(data, L0.InstanceOf, L0.Variant);
177                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);
178                                                 } else {
179                                                         graph.denyValue(data);
180                                                 }
181                                                 
182                                                 
183                                                 // Write stream
184                                                 StreamAccessor sa = history.openStream(id, "r");
185                                                 try {
186                                                         //System.out.println("WRITING stream of size=" + sa.size());
187                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
188                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);
189                                                         Integer constantSampleSize = serializer.getConstantSize();
190                                                         ValueBand vb = new ValueBand(sampleBinding);
191                                                         Stream stream = new Stream(sa, sampleBinding);
192                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);
193                                                         Object array = null;
194                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);
195
196                                                         int count = 0;
197                                                         readArray: {
198                                                                 // Start index
199                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            
200                                                                 if ( startIndex < -stream.count() ) break readArray;
201                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;
202                                                                 if ( startIndex == -1 ) startIndex = 0;
203                                                                 
204                                                                 // End index
205                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
206                                                                 if ( endIndex == -1 ) break readArray;
207                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;
208                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;
209                                                                 if ( endIndex<startIndex ) break readArray;
210                                                         
211                                                                 // Write sample count
212                                                                 count = endIndex - startIndex + 1;
213                                                                 array = arrayBinding.create(count);
214                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;
215                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());
216                                                                 for (int i=0; i<count; i++) {
217                                                                         Object sample = sa.get(i+startIndex, sampleBinding);
218                                                                         
219                                                                         // Adjust time
220                                                                         if ( hasTimeShift ) {
221                                                                                 vb.setSample(sample);
222                                                                                 if ( vb.hasTime() ) {
223                                                                                         double newTime = vb.getTimeDouble() + timeShift;
224                                                                                         vb.setTime(Bindings.DOUBLE, newTime);
225                                                                                 }
226                                                                                 if ( vb.hasEndTime() ) {
227                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;
228                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);
229                                                                                 }
230                                                                         }
231                                                                         //System.out.println("\t#" + i + ": sample=" + sample);
232                                                                         arrayBinding.set(array, i, sample);
233                                                                 }
234                                                         }
235                                                         if (array==null) array = arrayBinding.create();
236                                                         Variant v2 = new Variant(arrayBinding, array);
237                                                         graph.claimValue(data, v2, Bindings.VARIANT);
238
239                                                         if (constantSampleSize != null) {
240                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);
241                                                                 //System.out.println("item sample size: " + itemSampleSize);
242                                                                 totalSampleSize += itemSampleSize;
243                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);
244                                                         }
245                                                 } catch (AccessorException e) {
246                                                         throw new DatabaseException(e);
247                                                 } finally {
248                                                         try {
249                                                                 sa.close();
250                                                         } catch (AccessorException e) {
251                                                         }
252                                                 }
253                                         }
254
255                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);
256
257                                         if (PROFILE)
258                                                 profile(s, "exported " + items.length + " history items to database");
259
260                                 } catch (HistoryException e) {
261                                         throw new DatabaseException( e );
262                                 } catch (BindingException e) {
263                                         throw new DatabaseException( e );
264                                 } catch (ServiceNotFoundException e) {
265                                         throw new DatabaseException( e );
266                                 }
267                         }
268                 };
269         }
270
271         /**
272          * Create request that exports history to graph.
273          * 
274          * If item already exists in the graph, it is overwritten.
275          * 
276          * @param history
277          *            source history
278          * @param collectorState
279          *            complete dump of the source history collector state or
280          *            <code>null</code> to skip collector state saving
281          * @param historyResource
282          *            Instance of HIS.History to write data to
283          * @param timeShift
284          *            adjust time values by this value
285          * @param from
286          *            time
287          * @param end
288          *            time
289          * @return WriteRequest
290          */
291         public static WriteRequest exportArchive(
292                         final HistoryManager history,
293                         final Bean collectorState,
294                         final Resource historyResource,
295                         final Double timeShift, 
296                         final double from,
297                         final double end) 
298         {
299                 return new WriteRequest() {
300                         public void perform(WriteGraph graph) throws DatabaseException {
301                                 HistoryResource HIS = HistoryResource.getInstance(graph);
302                                 Layer0 L0 = Layer0.getInstance(graph);
303
304                                 long s = System.nanoTime();
305                                 if (PROFILE)
306                                         profile(null, "archiving history items to database");
307
308                                 // Remove all possibly existing old items in the history.
309                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))
310                                         if (graph.isInstanceOf(entity, HIS.History_Item))
311                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);
312
313                                 // Create new literal for history archival
314                                 //graph.deny(historyResource, HIS.History_archive);
315                                 graph.denyValue(historyResource, HIS.History_archive);
316                                 Resource archiveLiteral = graph.newResource();
317                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);
318                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);
319
320                                 OutputStream closeable = null;
321                                 try {
322                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);
323                                         rab.position(0);
324                                         rab.skipBytes(4);
325
326                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);
327
328                                         rab.position(0L);
329                                         rab.writeInt((int)(rab.length() - 4L));
330
331                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);
332
333                                         if (PROFILE)
334                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");
335
336                                 } catch (HistoryException e) {
337                                         throw new DatabaseException( e );
338                                 } catch (IOException e) {
339                                         throw new DatabaseException( e );
340                                 } finally {
341                                         FileUtils.uncheckedClose(closeable);
342                                 }
343                         }
344                 };
345         }
346
347         private static ArchivedHistory exportCompressedArchive(
348                         final HistoryManager history,
349                         final Bean collectorState,
350                         final Resource historyResource,
351                         final Double timeShift, 
352                         final double from,
353                         final double end,
354                         RandomAccessBinary rab)
355                                         throws IOException, HistoryException
356         {
357                 switch (USED_COMPRESSION) {
358                 case FLZ:
359                         return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);
360                 case LZ4:
361                         return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);
362                 default:
363                         throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);
364                 }
365         }
366
367         private static ArchivedHistory exportArchiveLZ4(
368                         final HistoryManager history,
369                         final Bean collectorState,
370                         final Resource historyResource,
371                         final Double timeShift, 
372                         final double from,
373                         final double end,
374                         RandomAccessBinary rab)
375                                         throws IOException, HistoryException
376         {
377                 OutputStream closeable = null;
378                 try {
379                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));
380                         closeable = new RABOutputStream( rab );
381                         closeable = new LZ4BlockOutputStream( closeable );
382
383                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
384
385                         closeable.flush();
386                         closeable.close();
387                         closeable = null;
388
389                         return archive;
390                 } finally {
391                         FileUtils.uncheckedClose(closeable);
392                 }
393         }
394
395         @SuppressWarnings("resource")
396         private static ArchivedHistory exportArchiveFLZ(
397                         final HistoryManager history,
398                         final Bean collectorState,
399                         final Resource historyResource,
400                         final Double timeShift, 
401                         final double from,
402                         final double end,
403                         RandomAccessBinary rab)
404                                         throws IOException, HistoryException
405         {
406                 OutputStream closeable = null;
407                 try {
408                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));
409                         closeable = new RABOutputStream( rab );
410                         closeable = FastLZ.write( closeable );
411
412                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );
413
414                         closeable.flush();
415                         closeable.close();
416                         closeable = null;
417
418                         return archive;
419                 } finally {
420                         FileUtils.uncheckedClose(closeable);
421                 }
422         }
423
424         /**
425          * Store the specified history manager's contents into a single archive
426          * file.
427          * 
428          * @param outputStream
429          *            the output stream where the history archive is written
430          * @param history
431          *            source history
432          * @param collectorState
433          *            complete state of the history collector at the time of saving
434          *            or <code>null</code> to not save any state
435          * @param timeShift
436          *            adjust time values by this value
437          * @param from
438          *            time
439          * @param end
440          *            time
441          * @return ArchivedHistory instance describing the created archive
442          * @throws HistoryException
443          *             when there's a problem with reading the history data from the
444          *             history work area
445          * @throws IOException
446          *             when there's a problem writing the resulting history archive
447          *             file
448          */
449         private static ArchivedHistory exportArchive(
450                         OutputStream outputStream,
451                         HistoryManager history,
452                         Bean collectorState,
453                         Double timeShift,
454                         double from,
455                         double end)
456                                         throws IOException, HistoryException
457         {
458                 ArchivedHistory result = new ArchivedHistory();
459
460                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
461                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;
462
463                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);
464                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);
465
466                 try {
467                         if (collectorState != null) {
468                                 // Write complete collector state into the archive
469                                 if (DEBUG)
470                                         System.out.println("WRITING collector state: " + collectorState);
471                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );
472                                 putNextEntry(out, "state", serializedCollectorState.length);
473                                 out.write(serializedCollectorState);
474                                 out.closeArchiveEntry();
475                         }
476
477                         ItemManager im = new ItemManager( history.getItems() );
478
479                         for (Bean item : im.values()) {
480                                 if (DEBUG)
481                                         System.out.println("STORING ITEM: " + item);
482                                 String id = (String) item.getField("id");
483
484                                 // Write data stream metadata
485                                 byte[] metadata = beanSerializer.serialize( item );
486                                 putNextEntry(out, id + ".txt", metadata.length);
487                                 out.write(metadata);
488                                 out.closeArchiveEntry();
489                                 if (DEBUG)
490                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));
491
492
493                                 // Write data stream as a separate entry
494                                 StreamAccessor sa = history.openStream(id, "r");
495                                 try {
496                                         if (DEBUG)
497                                                 System.out.println("STREAM SIZE=" + sa.size());
498                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );
499                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);
500                                         Integer constantSampleSize = sampleSerializer.getConstantSize();
501
502                                         if (constantSampleSize == null) {
503                                                 // Variable length samples - no support for now.
504                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");
505                                                 continue;
506                                         } else {
507                                                 ValueBand vb = new ValueBand(sampleBinding);
508                                                 Stream stream = new Stream(sa, sampleBinding);
509                                                 if (DEBUG)
510                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);
511
512                                                 readArray: {
513                                                         // Start index
514                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);
515                                                         if ( startIndex < -stream.count() ) break readArray;
516                                                         if ( startIndex<0 ) startIndex = -2-startIndex;
517                                                         if ( startIndex == -1 ) startIndex = 0;
518
519                                                         // End index
520                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);
521                                                         if ( endIndex == -1 ) break readArray;
522                                                         if ( endIndex<0 ) endIndex = -1-endIndex;
523                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;
524                                                         if ( endIndex<startIndex ) break readArray;
525
526                                                         // Written sample count
527                                                         int count = endIndex - startIndex + 1;
528                                                         if (DEBUG)
529                                                                 System.out.println("WRITTEN sample count=" + count);
530
531                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);
532                                                         if (DEBUG)
533                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);
534                                                         putNextEntry(out, id + ".data", savedSamplesSize);
535
536                                                         for (int i=0; i<count; i++) {
537                                                                 Object sample = sa.get(i+startIndex, sampleBinding);
538
539                                                                 // Adjust time
540                                                                 if ( hasTimeShift ) {
541                                                                         vb.setSample(sample);
542                                                                         if ( vb.hasTime() ) {
543                                                                                 double newTime = vb.getTimeDouble() + timeShift;
544                                                                                 vb.setTime(Bindings.DOUBLE, newTime);
545                                                                         }
546                                                                         if ( vb.hasEndTime() ) {
547                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;
548                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);
549                                                                         }
550                                                                 }
551                                                                 //System.out.println("\t#" + i + ": sample=" + sample);
552                                                                 sampleSerializer.serialize(dataOutput, sample);
553                                                         }
554                                                         out.closeArchiveEntry();
555
556                                                         result.itemSizes.put(item, savedSamplesSize);
557                                                         result.totalSampleSize += savedSamplesSize;
558                                                 }
559                                         }
560                                 } catch (AccessorException e) {
561                                         throw new IOException(e);
562                                 } finally {
563                                         try {
564                                                 sa.close();
565                                         } catch (AccessorException e) {
566                                         }
567                                 }
568                                 
569                         }
570
571                         return result;
572                 } catch (BindingException e) {
573                         throw new HistoryException(e);
574                 } finally {
575                         out.finish();
576                 }
577         }
578
579         /**
580          * Import all history items from graph into a history manager.
581          * 
582          * @param r history resource or initial condition resource source HIS.History resource
583          * @param history destination history
584          * @return read request that always returns a HistoryImportResult instance
585          */
586         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)
587         {
588                 return new UniqueRead<HistoryImportResult>() {
589                         @Override
590                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {
591                                 HistoryImportResult result = new HistoryImportResult();
592
593                                 HistoryResource HIS = HistoryResource.getInstance(graph);
594                                 SimulationResource SIM = SimulationResource.getInstance(graph);
595                                 Resource historyResource = r;
596
597                                 if (!graph.isInstanceOf(historyResource, HIS.History))
598                                         historyResource = graph.getPossibleObject(r, SIM.State_History);
599                                 if (historyResource == null)
600                                         return result;
601                                 if (!graph.isInstanceOf(historyResource, HIS.History))
602                                         return result;
603
604                                 long s = System.nanoTime();
605
606                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);
607                                 if (archive == null) {
608                                         if (PROFILE)
609                                                 profile(null, "importing history items from old database format to disk workarea");
610                                         importHistoryItems(graph, historyResource, history);
611                                 } else {
612                                         try {
613                                                 if (PROFILE)
614                                                         profile(null, "importing history items from archived format to disk workarea");
615                                                 importHistoryArchive(graph, historyResource, archive, history, result);
616                                         } catch (IOException e) {
617                                                 throw new DatabaseException(e);
618                                         } catch (HistoryException e) {
619                                                 throw new DatabaseException(e);
620                                         }
621                                 }
622
623                                 if (PROFILE)
624                                         profile(s, "imported history items from database to disk workarea");
625
626                                 return result;
627                         }
628                 };
629         }
630
631         /**
632          * Import all history items from graph into a history manager.
633          * 
634          * @param r history resource or initial condition resource source HIS.History resource
635          * @param history destination history
636          * @return <code>true</code> if successful
637          */
638         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException
639         {
640                 HistoryResource HIS = HistoryResource.getInstance(graph);
641                 Layer0 L0 = Layer0.getInstance(graph);
642
643                 try {
644                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {
645                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))
646                                         continue;
647
648                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);
649                                 String id = (String) bean.getFieldUnchecked("id");
650                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);
651                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );
652                                 history.modify(bean);
653                                 StreamAccessor sa = history.openStream(id, "rw");
654                                 try {
655                                         sa.setValue(arrayBinding, array);
656                                 } finally {
657                                         try {
658                                                 sa.close();
659                                         } catch (AccessorException e) {
660                                         }
661                                 }
662                         }
663
664                         return true;
665                 } catch (AccessorException e) {
666                         throw new DatabaseException(e);
667                 } catch (BindingConstructionException e) {
668                         throw new DatabaseException(e);
669                 } catch (HistoryException e) {
670                         throw new DatabaseException(e);
671                 }
672         }
673
674         /**
675          * Import all history items from database archive into a history manager.
676          * 
677          * @param graph
678          * @param historyResource
679          * @param archive 
680          * @param history
681          * @return <code>true</code> if successful
682          * @throws DatabaseException 
683          */
684         @SuppressWarnings("resource")
685         private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)
686                         throws DatabaseException, IOException, HistoryException
687         {
688                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);
689                 rab.position(4);
690                 DataContainer dc = DataContainers.readHeader(rab);
691
692                 if (DEBUG)
693                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));
694
695                 if (APPLICATION_X_LZ4.equals(dc.format)) {
696                         if (dc.version == ARCHIVE_VERSION_1) {
697                                 InputStream in = new RABInputStream(rab);
698                                 //in = new BufferedInputStream(in);
699                                 in = new LZ4BlockInputStream(in);
700                                 return extractHistoryArchiveTar(graph, history, in, result);
701                         }
702                 } else if (APPLICATION_X_FLZ.equals(dc.format)) {
703                         if (dc.version == ARCHIVE_VERSION_1) {
704                                 InputStream in = null;
705                                 try {
706                                         in = new RABInputStream(rab);
707                                         in = FastLZ.read(in);
708                                         return extractHistoryArchiveTar(graph, history, in, result);
709                                 } finally {
710                                         FileUtils.uncheckedClose(in);
711                                 }
712                         }
713                 }
714                 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);
715         }
716
717         /**
718          * Import all history items from graph into a history manager
719          * 
720          * @param graph database access
721          * @param history destination history
722          * @param rab the archive to extract from
723          * @return <code>true</code> if successful
724          * @throws DatabaseException 
725          */
726         private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)
727                         throws DatabaseException, IOException, HistoryException
728         {
729                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.
730                 TarArchiveInputStream tar = new TarArchiveInputStream(in);
731
732                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);
733
734                 Bean lastItem = null;
735                 String lastItemId = null;
736                 while (true) {
737                         TarArchiveEntry entry = tar.getNextTarEntry();
738                         if (entry == null)
739                                 break;
740
741                         String name = entry.getName();
742                         boolean state = name.equals("state");
743                         boolean metadata = name.endsWith(".txt");
744                         boolean data = name.endsWith(".data");
745
746                         if (state) {
747                                 if (result != null) {
748                                         byte[] st = new byte[(int) entry.getSize()];
749                                         tar.read(st);
750                                         result.collectorState = (Bean) beanSerializer.deserialize(st);
751                                         if (DEBUG)
752                                                 System.out.println("READ collector state: " + result.collectorState);
753                                 } else {
754                                         tar.skip(entry.getSize());
755                                 }
756                         } else if (metadata) {
757                                 byte[] md = new byte[(int) entry.getSize()];
758                                 tar.read(md);
759                                 if (DEBUG) {
760                                         System.out.println("READING Item metadata: " + name);
761                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));
762                                 }
763
764                                 try {
765                                         Bean bean = (Bean) beanSerializer.deserialize(md);
766                                         if (DEBUG)
767                                                 System.out.println("READ Item metadata: " + bean);
768                                         history.modify(bean);
769                                         lastItem = bean;
770                                         lastItemId = (String) bean.getFieldUnchecked("id");
771                                 } catch (ClassFormatError e) {
772                                         // This is here because LZ4BlockInput/OutputStream seemed to
773                                         // be causing weird deserialization errors to occur for
774                                         // single data items when trying to deserialize beans. The
775                                         // reason being that the serialized data that was read only
776                                         // contains the first 512 bytes of the data. After that all
777                                         // data is zeros, which causes things like this to happen in
778                                         // deserialization:
779                                         //signature: rsstzzzzzzzze_b7b532e9
780                                         //component(o): id = String
781                                         //component(1): variableId = String
782                                         //component(2): format = Temp1
783                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])
784                                         //component(3):  = Boolean
785                                         //component(4):  = Boolean
786                                         //component(5):  = Boolean
787                                         //component(devil):  = Boolean
788                                         //component(7):  = Boolean
789                                         //component(music):  = Boolean
790                                         //component(9):  = Boolean
791                                         //component(10):  = Boolean
792                                         //
793                                         // For this reason we've switched the default compression to FastLZ
794                                         // for now. This is reflected in the input format also.
795
796                                         Activator.getDefault().getLog().log(
797                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;
798                                 }
799                         } else if (data && lastItem != null) {
800                                 StreamAccessor sa = history.openStream(lastItemId, "rw");
801                                 try {
802                                         if (sa instanceof BinaryObject) {
803                                                 BinaryObject bo = (BinaryObject) sa;
804                                                 RandomAccessBinary output = bo.getBinary();
805                                                 output.position(0L);
806                                                 output.setLength(0L);
807                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());
808                                                 if (copiedBytes != entry.getSize()) {
809                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");
810                                                 }
811                                         } else {
812                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);
813                                         }
814                                 } finally {
815                                         try {
816                                                 sa.close();
817                                                 lastItem = null;
818                                                 lastItemId = null;
819                                         } catch (AccessorException e) {
820                                         }
821                                 }
822                         }
823                 }
824
825                 return true;
826         }
827
828         /**
829          * Get request that clears history
830          * @return cleaning request
831          */
832         public static WriteRequest clear(final Resource history) {
833                 return new WriteRequest() {
834                         @Override
835                         public void perform(WriteGraph graph) throws DatabaseException {
836                                 HistoryResource HIS = HistoryResource.getInstance(graph);
837                                 Layer0 L0 = Layer0.getInstance(graph);
838
839                                 // Separate items format
840                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))
841                                 {
842                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;
843                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);
844                                         if (info!=null) {
845                                                 graph.deny(oldItem, HIS.History_Item_Info);
846                                                 graph.deny(info);
847                                         }
848                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);
849                                         if (data!=null) {
850                                                 graph.deny(oldItem, HIS.History_Item_Series);
851                                                 graph.deny(data);
852                                         }
853                                         graph.deny(history, L0.ConsistsOf, oldItem);
854                                         graph.deny(oldItem);
855                                 }
856
857                                 // Archived format
858                                 graph.denyValue(history, HIS.History_archive);
859                         }
860                 };
861         }
862
863
864         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {
865                 TarArchiveEntry entry = new TarArchiveEntry(entryName);
866                 entry.setSize(size);
867                 out.putArchiveEntry(entry);
868         }
869
870         /**
871          * Shall not close the underlying RandomAccessBinary.
872          */
873         private static class RABOutputStream extends OutputStream {
874
875                 private final RandomAccessBinary output;
876
877                 public RABOutputStream(RandomAccessBinary output) {
878                         this.output = output;
879                 }
880
881                 @Override
882                 public void write(byte[] b, int off, int len) throws IOException {
883                         output.write(b, off, len);
884                 }
885
886                 @Override
887                 public void write(int b) throws IOException {
888                         output.write(b);
889                 }
890
891         }
892
893         /**
894          * Shall not close the underlying RandomAccessBinary.
895          */
896         private static class RABInputStream extends InputStream {
897
898                 private final RandomAccessBinary input;
899
900                 public RABInputStream(RandomAccessBinary input) {
901                         this.input = input;
902                 }
903
904                 @Override
905                 public int available() throws IOException {
906                         long avail = input.length() - input.position();
907                         if ((avail & 0xffffffff00000000L) != 0)
908                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);
909                         return (int) avail;
910                 }
911
912                 @Override
913                 public int read() throws IOException {
914                         try {
915                                 return input.readUnsignedByte();
916                         } catch (EOFException e) {
917                                 return -1;
918                         }
919                 }
920
921                 @Override
922                 public int read(byte[] b, int off, int len) throws IOException {
923                         long available = input.length() - input.position();
924                         if (available == 0)
925                                 return -1;
926                         int l = (int) Math.min(available, (long) len);
927                         input.readFully(b, off, l);
928                         return l;
929                 }
930
931                 @Override
932                 public long skip(long n) throws IOException {
933                         long count = 0;
934                         while (count < n) {
935                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);
936                                 count += input.skipBytes(l);
937                         }
938                         return count;
939                 }
940
941         }
942
943         private static long profile(Long t1, String string) {
944                 if (PROFILE) {
945                         long t2 = System.nanoTime();
946                         long delta = t1 == null ? 0 : t2-t1;
947                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));
948                         return t2;
949                 }
950                 return 0L;
951         }
952
953         public static CollectorState truncateHistory(double toBeforeTime, HistoryManager history, CollectorState state) throws AccessorException, BindingException, HistoryException {
954                 Double t = toBeforeTime;
955                 Binding timeBinding = null;
956
957                 Bean[] items = history.getItems();
958                 //System.out.println("truncating all samples after t=" + toBeforeTime + " for " + items.length + " history items");
959
960                 for (Bean item : items) {
961                         String id = (String) item.getField("id");
962                         StreamAccessor sa = history.openStream(id, "rw");
963                         try {
964                                 Stream s = new Stream(sa);
965                                 timeBinding = s.timeBinding;
966                                 int currentSize = sa.size();
967                                 int index = s.binarySearch(timeBinding, toBeforeTime);
968                                 int newSize = truncationSize(index);
969                                 if (newSize < currentSize) {
970                                         //System.out.println("truncating item: " + item + " from size " + currentSize + " to " + newSize);
971                                         sa.setSize(newSize);
972
973                                         if (state != null) {
974                                                 Object prevTime  = newSize > 0 ? s.getItemTime(s.timeBinding, newSize - 1) : null;
975                                                 Bean prevSample = newSize > 0 ? (Bean) sa.get(newSize - 1, s.sampleBinding) : null;
976                                                 Object prevValue = prevSample != null ? prevSample.getField(s.valueIndex) : null;
977                                                 boolean isNan = isNaN(prevValue);
978
979                                                 VariableState vs = state.values.get(id);
980                                                 if (vs != null && vs.value != null && prevValue != null) {
981                                                         vs.value.setValue(vs.value.getBinding(), prevValue);
982                                                         vs.isValid = true;
983                                                         vs.isNan = isNan;
984                                                 }
985
986                                                 CollectorState.Item is = state.itemStates.get(id);
987                                                 if (is != null) {
988                                                         is.firstTime = toTime(prevTime);
989                                                         is.firstValue = toValue(s.valueBinding, prevValue);
990                                                         is.currentTime = is.firstTime;
991                                                         is.currentValue = toValue(s.valueBinding, prevValue);
992                                                         is.isNaN = isNan;
993                                                         is.isValid = prevValue != null;
994                                                         is.sum = Double.NaN;
995                                                         is.count = 0;
996                                                         is.ooDeadband = false;
997                                                         is.firstDisabledTime = Double.NaN;
998                                                         is.lastDisabledTime = Double.NaN;
999                                                         is.median = new WeightedMedian( CollectorImpl.MEDIAN_LIMIT );
1000                                                 }
1001                                         }
1002                                 }
1003                         } finally {
1004                                 sa.close();
1005                         }
1006                 }
1007
1008                 if (timeBinding != null && state != null) {
1009                         state.time.setValue(timeBinding, t);
1010                         state.dT = 1.0;
1011                 }
1012
1013                 return state;
1014         }
1015
1016         private static double toTime(Object time) {
1017                 return time instanceof Number ? ((Number) time).doubleValue() : Double.NaN;
1018         }
1019
1020         private static MutableVariant toValue(Binding valueBinding, Object value) {
1021                 return new MutableVariant(valueBinding, value != null ? value : valueBinding.createDefaultUnchecked());
1022         }
1023
1024         private static boolean isNaN(Object value) {
1025                 return value instanceof Number ? Double.isNaN(((Number) value).doubleValue()) : false;
1026         }
1027
1028         private static int truncationSize(int binarySearchResult) {
1029                 return binarySearchResult >= 0 ? binarySearchResult + 1 : (-binarySearchResult - 1);
1030         }
1031
1032 }