]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulation/src/org/simantics/simulation/history/HistoryUtil.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.simulation / src / org / simantics / simulation / history / HistoryUtil.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2014 Association for Decentralized Information Management in\r
3  * Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *     Semantum Oy - archive implementation\r
12  *******************************************************************************/\r
13 package org.simantics.simulation.history;\r
14 \r
15 import gnu.trove.map.TObjectLongMap;\r
16 import gnu.trove.map.hash.TObjectLongHashMap;\r
17 \r
18 import java.io.EOFException;\r
19 import java.io.IOException;\r
20 import java.io.InputStream;\r
21 import java.io.OutputStream;\r
22 import java.util.Arrays;\r
23 import java.util.HashMap;\r
24 import java.util.Map;\r
25 \r
26 import net.jpountz.lz4.LZ4BlockInputStream;\r
27 import net.jpountz.lz4.LZ4BlockOutputStream;\r
28 \r
29 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;\r
30 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;\r
31 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;\r
32 import org.eclipse.core.runtime.IStatus;\r
33 import org.eclipse.core.runtime.Status;\r
34 import org.simantics.databoard.Bindings;\r
35 import org.simantics.databoard.accessor.StreamAccessor;\r
36 import org.simantics.databoard.accessor.binary.BinaryObject;\r
37 import org.simantics.databoard.accessor.error.AccessorException;\r
38 import org.simantics.databoard.binding.ArrayBinding;\r
39 import org.simantics.databoard.binding.Binding;\r
40 import org.simantics.databoard.binding.RecordBinding;\r
41 import org.simantics.databoard.binding.error.BindingConstructionException;\r
42 import org.simantics.databoard.binding.error.BindingException;\r
43 import org.simantics.databoard.binding.impl.ObjectArrayBinding;\r
44 import org.simantics.databoard.binding.mutable.Variant;\r
45 import org.simantics.databoard.container.DataContainer;\r
46 import org.simantics.databoard.container.DataContainers;\r
47 import org.simantics.databoard.serialization.Serializer;\r
48 import org.simantics.databoard.util.Bean;\r
49 import org.simantics.databoard.util.binary.OutputStreamWriteable;\r
50 import org.simantics.databoard.util.binary.RandomAccessBinary;\r
51 import org.simantics.db.ReadGraph;\r
52 import org.simantics.db.Resource;\r
53 import org.simantics.db.WriteGraph;\r
54 import org.simantics.db.common.request.UniqueRead;\r
55 import org.simantics.db.common.request.WriteRequest;\r
56 import org.simantics.db.exception.DatabaseException;\r
57 import org.simantics.db.exception.ServiceNotFoundException;\r
58 import org.simantics.db.request.Read;\r
59 import org.simantics.fastlz.FastLZ;\r
60 import org.simantics.history.HistoryException;\r
61 import org.simantics.history.HistoryManager;\r
62 import org.simantics.history.ItemManager;\r
63 import org.simantics.history.util.Stream;\r
64 import org.simantics.history.util.ValueBand;\r
65 import org.simantics.layer0.Layer0;\r
66 import org.simantics.simulation.Activator;\r
67 import org.simantics.simulation.ontology.HistoryResource;\r
68 import org.simantics.simulation.ontology.SimulationResource;\r
69 import org.simantics.utils.FileUtils;\r
70 \r
71 /**\r
72  * @author Toni Kalajainen\r
73  * @author Tuukka Lehtonen\r
74  */\r
75 public class HistoryUtil {\r
76 \r
77         private static final boolean DEBUG = false;\r
78         private static final boolean PROFILE = true;\r
79 \r
80         private enum Compression {\r
81                 FLZ,\r
82                 LZ4,\r
83         }\r
84 \r
85         private static final Compression USED_COMPRESSION = Compression.FLZ;\r
86 \r
87         private static final String APPLICATION_X_LZ4 = "application/x-lz4";\r
88         private static final String APPLICATION_X_FLZ = "application/x-flz";\r
89         private static final int ARCHIVE_VERSION_1 = 1; // TarArchiveInput/OutputStream\r
90 \r
91         private static class ArchivedHistory {\r
92                 public long totalSampleSize = 0L;\r
93                 public TObjectLongMap<Bean> itemSizes = new TObjectLongHashMap<Bean>();\r
94         }\r
95 \r
96         /**\r
97          * Create request that exports history to graph.\r
98          * \r
99          * If removeOldUnused is true, items that do not exist in source history\r
100          * are deleted from graph.\r
101          * \r
102          * If item already exists in the graph, it is overwritten.\r
103          * \r
104          * @param history source history\r
105          * @param g write graph\r
106          * @param historyResource Instance of HIS.History to write data to\r
107          * @param removeOldUnused\r
108          * @param timeShift adjust time values by this value \r
109          * @param from time\r
110          * @param end time\r
111          * @return WriteRequest\r
112          */\r
113         public static WriteRequest export(final HistoryManager history, \r
114                         final Resource historyResource,\r
115                         final boolean removeOldUnused,\r
116                         final Double timeShift, \r
117                         final double from, final double end) \r
118         {\r
119                 return new WriteRequest() {\r
120                         public void perform(WriteGraph graph) throws DatabaseException {\r
121                                 HistoryResource HIS = HistoryResource.getInstance(graph);\r
122                                 Layer0 L0 = Layer0.getInstance(graph);\r
123                                 long totalSampleSize = 0L;\r
124                                 long s = System.nanoTime();\r
125 \r
126                                 try {\r
127                                         Bean[] items = history.getItems();\r
128                                         if (PROFILE)\r
129                                                 profile(null, "exporting " + items.length + " history items to database");\r
130 \r
131                                         ItemManager im = new ItemManager( items );\r
132                                         Map<String, Resource> oldResourceMap = new HashMap<String, Resource>();\r
133                                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf))\r
134                                         {\r
135                                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
136                                                 Bean bean = graph.getPossibleRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
137                                                 if ( bean == null ) continue;\r
138                                                 String id = (String) bean.getField("id");\r
139                                                 \r
140                                                 if (!im.exists(id)) {\r
141                                                         // There is item in graph that does not exist in source history\r
142                                                         if ( removeOldUnused ) {\r
143                                                                 graph.deny(oldItem);\r
144                                                         }\r
145                                                 } else {\r
146                                                         // Item already exists, to-be-overwritten\r
147                                                         oldResourceMap.put(id, oldItem);\r
148                                                 }\r
149                                         }\r
150                                         \r
151                                         for (Bean newItem : im.values())\r
152                                         {\r
153                                                 //System.out.println("WRITING ITEM: " + newItem);\r
154                                                 String id = (String) newItem.getField("id");\r
155                                                 Resource historyItemResource = oldResourceMap.get(id);\r
156                                                 Resource data = null;\r
157 \r
158                                                 if ( historyItemResource==null ) {\r
159                                                         historyItemResource = graph.newResource();\r
160                                                         graph.claim(historyResource, L0.ConsistsOf, historyItemResource);\r
161                                                         graph.claim(historyItemResource, L0.InstanceOf, HIS.History_Item);\r
162                                                         graph.claimLiteral(historyItemResource, L0.HasName, id);\r
163                                                 } else {\r
164                                                         data = graph.getPossibleObject(historyItemResource, HIS.History_Item_Series);\r
165                                                 }\r
166                                                 \r
167                                                 Variant v = new Variant(newItem.getBinding(), newItem);\r
168                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_Info, v, Bindings.VARIANT);\r
169                                                 \r
170                                                 if (data == null) {\r
171                                                         data = graph.newResource();\r
172                                                         graph.claim(data, L0.InstanceOf, L0.Variant);\r
173                                                         graph.claim(historyItemResource, HIS.History_Item_Series, data);\r
174                                                 } else {\r
175                                                         graph.denyValue(data);\r
176                                                 }\r
177                                                 \r
178                                                 \r
179                                                 // Write stream\r
180                                                 StreamAccessor sa = history.openStream(id, "r");\r
181                                                 try {\r
182                                                         //System.out.println("WRITING stream of size=" + sa.size());\r
183                                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
184                                                         Serializer serializer = Bindings.getSerializerUnchecked(sampleBinding);\r
185                                                         Integer constantSampleSize = serializer.getConstantSize();\r
186                                                         ValueBand vb = new ValueBand(sampleBinding);\r
187                                                         Stream stream = new Stream(sa, sampleBinding);\r
188                                                         ArrayBinding arrayBinding = new ObjectArrayBinding(sa.type(), sampleBinding);\r
189                                                         Object array = null;\r
190                                                         //System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
191 \r
192                                                         int count = 0;\r
193                                                         readArray: {\r
194                                                                 // Start index\r
195                                                                 int startIndex = stream.binarySearch(Bindings.DOUBLE, from);                                            \r
196                                                                 if ( startIndex < -stream.count() ) break readArray;\r
197                                                                 if ( startIndex<0 ) startIndex = -2-startIndex;\r
198                                                                 if ( startIndex == -1 ) startIndex = 0;\r
199                                                                 \r
200                                                                 // End index\r
201                                                                 int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
202                                                                 if ( endIndex == -1 ) break readArray;\r
203                                                                 if ( endIndex<0 ) endIndex = -1-endIndex;\r
204                                                                 if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
205                                                                 if ( endIndex<startIndex ) break readArray;\r
206                                                         \r
207                                                                 // Write sample count\r
208                                                                 count = endIndex - startIndex + 1;\r
209                                                                 array = arrayBinding.create(count);\r
210                                                                 boolean hasTimeShift = timeShift!=null && timeShift != 0.0;\r
211                                                                 //System.out.println("WRITING stream: count=" + count + ", datatype=" + arrayBinding.type());\r
212                                                                 for (int i=0; i<count; i++) {\r
213                                                                         Object sample = sa.get(i+startIndex, sampleBinding);\r
214                                                                         \r
215                                                                         // Adjust time\r
216                                                                         if ( hasTimeShift ) {\r
217                                                                                 vb.setSample(sample);\r
218                                                                                 if ( vb.hasTime() ) {\r
219                                                                                         double newTime = vb.getTimeDouble() + timeShift;\r
220                                                                                         vb.setTime(Bindings.DOUBLE, newTime);\r
221                                                                                 }\r
222                                                                                 if ( vb.hasEndTime() ) {\r
223                                                                                         double newTime = vb.getEndTimeDouble() + timeShift;\r
224                                                                                         vb.setEndTime(Bindings.DOUBLE, newTime);\r
225                                                                                 }\r
226                                                                         }\r
227                                                                         //System.out.println("\t#" + i + ": sample=" + sample);\r
228                                                                         arrayBinding.set(array, i, sample);\r
229                                                                 }\r
230                                                         }\r
231                                                         if (array==null) array = arrayBinding.create();\r
232                                                         Variant v2 = new Variant(arrayBinding, array);\r
233                                                         graph.claimValue(data, v2, Bindings.VARIANT);\r
234 \r
235                                                         if (constantSampleSize != null) {\r
236                                                                 long itemSampleSize = ((long) count) * ((long) constantSampleSize);\r
237                                                                 //System.out.println("item sample size: " + itemSampleSize);\r
238                                                                 totalSampleSize += itemSampleSize;\r
239                                                                 graph.claimLiteral(historyItemResource, HIS.History_Item_size, L0.Long, itemSampleSize, Bindings.LONG);\r
240                                                         }\r
241                                                 } catch (AccessorException e) {\r
242                                                         throw new DatabaseException(e);\r
243                                                 } finally {\r
244                                                         try {\r
245                                                                 sa.close();\r
246                                                         } catch (AccessorException e) {\r
247                                                         }\r
248                                                 }\r
249                                         }\r
250 \r
251                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, totalSampleSize, Bindings.LONG);\r
252 \r
253                                         if (PROFILE)\r
254                                                 profile(s, "exported " + items.length + " history items to database");\r
255 \r
256                                 } catch (HistoryException e) {\r
257                                         throw new DatabaseException( e );\r
258                                 } catch (BindingException e) {\r
259                                         throw new DatabaseException( e );\r
260                                 } catch (ServiceNotFoundException e) {\r
261                                         throw new DatabaseException( e );\r
262                                 }\r
263                         }\r
264                 };\r
265         }\r
266 \r
267         /**\r
268          * Create request that exports history to graph.\r
269          * \r
270          * If item already exists in the graph, it is overwritten.\r
271          * \r
272          * @param history\r
273          *            source history\r
274          * @param collectorState\r
275          *            complete dump of the source history collector state or\r
276          *            <code>null</code> to skip collector state saving\r
277          * @param historyResource\r
278          *            Instance of HIS.History to write data to\r
279          * @param timeShift\r
280          *            adjust time values by this value\r
281          * @param from\r
282          *            time\r
283          * @param end\r
284          *            time\r
285          * @return WriteRequest\r
286          */\r
287         public static WriteRequest exportArchive(\r
288                         final HistoryManager history,\r
289                         final Bean collectorState,\r
290                         final Resource historyResource,\r
291                         final Double timeShift, \r
292                         final double from,\r
293                         final double end) \r
294         {\r
295                 return new WriteRequest() {\r
296                         public void perform(WriteGraph graph) throws DatabaseException {\r
297                                 HistoryResource HIS = HistoryResource.getInstance(graph);\r
298                                 Layer0 L0 = Layer0.getInstance(graph);\r
299 \r
300                                 long s = System.nanoTime();\r
301                                 if (PROFILE)\r
302                                         profile(null, "archiving history items to database");\r
303 \r
304                                 // Remove all possibly existing old items in the history.\r
305                                 for (Resource entity : graph.getObjects(historyResource, L0.ConsistsOf))\r
306                                         if (graph.isInstanceOf(entity, HIS.History_Item))\r
307                                                 graph.deny(historyResource, L0.ConsistsOf, L0.PartOf, entity);\r
308 \r
309                                 // Create new literal for history archival\r
310                                 //graph.deny(historyResource, HIS.History_archive);\r
311                                 graph.denyValue(historyResource, HIS.History_archive);\r
312                                 Resource archiveLiteral = graph.newResource();\r
313                                 graph.claim(archiveLiteral, L0.InstanceOf, null, L0.ByteArray);\r
314                                 graph.claim(historyResource, HIS.History_archive, archiveLiteral);\r
315 \r
316                                 OutputStream closeable = null;\r
317                                 try {\r
318                                         RandomAccessBinary rab = graph.getRandomAccessBinary(archiveLiteral);\r
319                                         rab.position(0);\r
320                                         rab.skipBytes(4);\r
321 \r
322                                         ArchivedHistory archive = exportCompressedArchive(history, collectorState, historyResource, timeShift, from, end, rab);\r
323 \r
324                                         rab.position(0L);\r
325                                         rab.writeInt((int)(rab.length() - 4L));\r
326 \r
327                                         graph.claimLiteral(historyResource, HIS.History_size, L0.Long, archive.totalSampleSize, Bindings.LONG);\r
328 \r
329                                         if (PROFILE)\r
330                                                 profile(s, "archived history items of size " + archive.totalSampleSize + " to database");\r
331 \r
332                                 } catch (HistoryException e) {\r
333                                         throw new DatabaseException( e );\r
334                                 } catch (IOException e) {\r
335                                         throw new DatabaseException( e );\r
336                                 } finally {\r
337                                         FileUtils.uncheckedClose(closeable);\r
338                                 }\r
339                         }\r
340                 };\r
341         }\r
342 \r
343         private static ArchivedHistory exportCompressedArchive(\r
344                         final HistoryManager history,\r
345                         final Bean collectorState,\r
346                         final Resource historyResource,\r
347                         final Double timeShift, \r
348                         final double from,\r
349                         final double end,\r
350                         RandomAccessBinary rab)\r
351                                         throws IOException, HistoryException\r
352         {\r
353                 switch (USED_COMPRESSION) {\r
354                 case FLZ:\r
355                         return exportArchiveFLZ(history, collectorState, historyResource, timeShift, from, end, rab);\r
356                 case LZ4:\r
357                         return exportArchiveLZ4(history, collectorState, historyResource, timeShift, from, end, rab);\r
358                 default:\r
359                         throw new UnsupportedOperationException("Unsupported compression: " + USED_COMPRESSION);\r
360                 }\r
361         }\r
362 \r
363         private static ArchivedHistory exportArchiveLZ4(\r
364                         final HistoryManager history,\r
365                         final Bean collectorState,\r
366                         final Resource historyResource,\r
367                         final Double timeShift, \r
368                         final double from,\r
369                         final double end,\r
370                         RandomAccessBinary rab)\r
371                                         throws IOException, HistoryException\r
372         {\r
373                 OutputStream closeable = null;\r
374                 try {\r
375                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_LZ4, ARCHIVE_VERSION_1, null));\r
376                         closeable = new RABOutputStream( rab );\r
377                         closeable = new LZ4BlockOutputStream( closeable );\r
378 \r
379                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
380 \r
381                         closeable.flush();\r
382                         closeable.close();\r
383                         closeable = null;\r
384 \r
385                         return archive;\r
386                 } finally {\r
387                         FileUtils.uncheckedClose(closeable);\r
388                 }\r
389         }\r
390 \r
391         @SuppressWarnings("resource")\r
392         private static ArchivedHistory exportArchiveFLZ(\r
393                         final HistoryManager history,\r
394                         final Bean collectorState,\r
395                         final Resource historyResource,\r
396                         final Double timeShift, \r
397                         final double from,\r
398                         final double end,\r
399                         RandomAccessBinary rab)\r
400                                         throws IOException, HistoryException\r
401         {\r
402                 OutputStream closeable = null;\r
403                 try {\r
404                         DataContainers.writeHeader(rab, new DataContainer(APPLICATION_X_FLZ, ARCHIVE_VERSION_1, null));\r
405                         closeable = new RABOutputStream( rab );\r
406                         closeable = FastLZ.write( closeable );\r
407 \r
408                         ArchivedHistory archive = exportArchive( closeable, history, collectorState, timeShift, from, end );\r
409 \r
410                         closeable.flush();\r
411                         closeable.close();\r
412                         closeable = null;\r
413 \r
414                         return archive;\r
415                 } finally {\r
416                         FileUtils.uncheckedClose(closeable);\r
417                 }\r
418         }\r
419 \r
420         /**\r
421          * Store the specified history manager's contents into a single archive\r
422          * file.\r
423          * \r
424          * @param outputStream\r
425          *            the output stream where the history archive is written\r
426          * @param history\r
427          *            source history\r
428          * @param collectorState\r
429          *            complete state of the history collector at the time of saving\r
430          *            or <code>null</code> to not save any state\r
431          * @param timeShift\r
432          *            adjust time values by this value\r
433          * @param from\r
434          *            time\r
435          * @param end\r
436          *            time\r
437          * @return ArchivedHistory instance describing the created archive\r
438          * @throws HistoryException\r
439          *             when there's a problem with reading the history data from the\r
440          *             history work area\r
441          * @throws IOException\r
442          *             when there's a problem writing the resulting history archive\r
443          *             file\r
444          */\r
445         private static ArchivedHistory exportArchive(\r
446                         OutputStream outputStream,\r
447                         HistoryManager history,\r
448                         Bean collectorState,\r
449                         Double timeShift,\r
450                         double from,\r
451                         double end)\r
452                                         throws IOException, HistoryException\r
453         {\r
454                 ArchivedHistory result = new ArchivedHistory();\r
455 \r
456                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
457                 boolean hasTimeShift = timeShift != null && timeShift != 0.0;\r
458 \r
459                 TarArchiveOutputStream out = new TarArchiveOutputStream(outputStream);\r
460                 OutputStreamWriteable dataOutput = new OutputStreamWriteable(out);\r
461 \r
462                 try {\r
463                         if (collectorState != null) {\r
464                                 // Write complete collector state into the archive\r
465                                 if (DEBUG)\r
466                                         System.out.println("WRITING collector state: " + collectorState);\r
467                                 byte[] serializedCollectorState = beanSerializer.serialize( collectorState );\r
468                                 putNextEntry(out, "state", serializedCollectorState.length);\r
469                                 out.write(serializedCollectorState);\r
470                                 out.closeArchiveEntry();\r
471                         }\r
472 \r
473                         ItemManager im = new ItemManager( history.getItems() );\r
474 \r
475                         for (Bean item : im.values()) {\r
476                                 if (DEBUG)\r
477                                         System.out.println("STORING ITEM: " + item);\r
478                                 String id = (String) item.getField("id");\r
479 \r
480                                 // Write data stream metadata\r
481                                 byte[] metadata = beanSerializer.serialize( item );\r
482                                 putNextEntry(out, id + ".txt", metadata.length);\r
483                                 out.write(metadata);\r
484                                 out.closeArchiveEntry();\r
485                                 if (DEBUG)\r
486                                         System.out.println("SERIALIZED ITEM " + id + ": " + Arrays.toString(metadata));\r
487 \r
488 \r
489                                 // Write data stream as a separate entry\r
490                                 StreamAccessor sa = history.openStream(id, "r");\r
491                                 try {\r
492                                         if (DEBUG)\r
493                                                 System.out.println("STREAM SIZE=" + sa.size());\r
494                                         RecordBinding sampleBinding = (RecordBinding) Bindings.getBeanBinding( sa.type().componentType );\r
495                                         Serializer sampleSerializer = Bindings.getSerializerUnchecked(sampleBinding);\r
496                                         Integer constantSampleSize = sampleSerializer.getConstantSize();\r
497 \r
498                                         if (constantSampleSize == null) {\r
499                                                 // Variable length samples - no support for now.\r
500                                                 System.err.println("Skipping item " + item + " in history archival due to variable length samples not being supported at the moment.");\r
501                                                 continue;\r
502                                         } else {\r
503                                                 ValueBand vb = new ValueBand(sampleBinding);\r
504                                                 Stream stream = new Stream(sa, sampleBinding);\r
505                                                 if (DEBUG)\r
506                                                         System.out.println("WRITING stream: sample size=" + constantSampleSize);\r
507 \r
508                                                 readArray: {\r
509                                                         // Start index\r
510                                                         int startIndex = stream.binarySearch(Bindings.DOUBLE, from);\r
511                                                         if ( startIndex < -stream.count() ) break readArray;\r
512                                                         if ( startIndex<0 ) startIndex = -2-startIndex;\r
513                                                         if ( startIndex == -1 ) startIndex = 0;\r
514 \r
515                                                         // End index\r
516                                                         int endIndex = stream.binarySearch(Bindings.DOUBLE, end);\r
517                                                         if ( endIndex == -1 ) break readArray;\r
518                                                         if ( endIndex<0 ) endIndex = -1-endIndex;\r
519                                                         if ( endIndex == sa.size() ) endIndex = sa.size()-1;\r
520                                                         if ( endIndex<startIndex ) break readArray;\r
521 \r
522                                                         // Written sample count\r
523                                                         int count = endIndex - startIndex + 1;\r
524                                                         if (DEBUG)\r
525                                                                 System.out.println("WRITTEN sample count=" + count);\r
526 \r
527                                                         long savedSamplesSize = ((long) count) * ((long) constantSampleSize);\r
528                                                         if (DEBUG)\r
529                                                                 System.out.println("saved samples size in bytes: " + savedSamplesSize);\r
530                                                         putNextEntry(out, id + ".data", savedSamplesSize);\r
531 \r
532                                                         for (int i=0; i<count; i++) {\r
533                                                                 Object sample = sa.get(i+startIndex, sampleBinding);\r
534 \r
535                                                                 // Adjust time\r
536                                                                 if ( hasTimeShift ) {\r
537                                                                         vb.setSample(sample);\r
538                                                                         if ( vb.hasTime() ) {\r
539                                                                                 double newTime = vb.getTimeDouble() + timeShift;\r
540                                                                                 vb.setTime(Bindings.DOUBLE, newTime);\r
541                                                                         }\r
542                                                                         if ( vb.hasEndTime() ) {\r
543                                                                                 double newTime = vb.getEndTimeDouble() + timeShift;\r
544                                                                                 vb.setEndTime(Bindings.DOUBLE, newTime);\r
545                                                                         }\r
546                                                                 }\r
547                                                                 //System.out.println("\t#" + i + ": sample=" + sample);\r
548                                                                 sampleSerializer.serialize(dataOutput, sample);\r
549                                                         }\r
550                                                         out.closeArchiveEntry();\r
551 \r
552                                                         result.itemSizes.put(item, savedSamplesSize);\r
553                                                         result.totalSampleSize += savedSamplesSize;\r
554                                                 }\r
555                                         }\r
556                                 } catch (AccessorException e) {\r
557                                         throw new IOException(e);\r
558                                 } finally {\r
559                                         try {\r
560                                                 sa.close();\r
561                                         } catch (AccessorException e) {\r
562                                         }\r
563                                 }\r
564                                 \r
565                         }\r
566 \r
567                         return result;\r
568                 } catch (BindingException e) {\r
569                         throw new HistoryException(e);\r
570                 } finally {\r
571                         out.finish();\r
572                 }\r
573         }\r
574 \r
575         /**\r
576          * Import all history items from graph into a history manager.\r
577          * \r
578          * @param r history resource or initial condition resource source HIS.History resource\r
579          * @param history destination history\r
580          * @return read request that always returns a HistoryImportResult instance\r
581          */\r
582         public static Read<HistoryImportResult> importHistory(final Resource r, final HistoryManager history)\r
583         {\r
584                 return new UniqueRead<HistoryImportResult>() {\r
585                         @Override\r
586                         public HistoryImportResult perform(ReadGraph graph) throws DatabaseException {\r
587                                 HistoryImportResult result = new HistoryImportResult();\r
588 \r
589                                 HistoryResource HIS = HistoryResource.getInstance(graph);\r
590                                 SimulationResource SIM = SimulationResource.getInstance(graph);\r
591                                 Resource historyResource = r;\r
592 \r
593                                 if (!graph.isInstanceOf(historyResource, HIS.History))\r
594                                         historyResource = graph.getPossibleObject(r, SIM.State_History);\r
595                                 if (historyResource == null)\r
596                                         return result;\r
597                                 if (!graph.isInstanceOf(historyResource, HIS.History))\r
598                                         return result;\r
599 \r
600                                 long s = System.nanoTime();\r
601 \r
602                                 Resource archive = graph.getPossibleObject(historyResource, HIS.History_archive);\r
603                                 if (archive == null) {\r
604                                         if (PROFILE)\r
605                                                 profile(null, "importing history items from old database format to disk workarea");\r
606                                         importHistoryItems(graph, historyResource, history);\r
607                                 } else {\r
608                                         try {\r
609                                                 if (PROFILE)\r
610                                                         profile(null, "importing history items from archived format to disk workarea");\r
611                                                 importHistoryArchive(graph, historyResource, archive, history, result);\r
612                                         } catch (IOException e) {\r
613                                                 throw new DatabaseException(e);\r
614                                         } catch (HistoryException e) {\r
615                                                 throw new DatabaseException(e);\r
616                                         }\r
617                                 }\r
618 \r
619                                 if (PROFILE)\r
620                                         profile(s, "imported history items from database to disk workarea");\r
621 \r
622                                 return result;\r
623                         }\r
624                 };\r
625         }\r
626 \r
627         /**\r
628          * Import all history items from graph into a history manager.\r
629          * \r
630          * @param r history resource or initial condition resource source HIS.History resource\r
631          * @param history destination history\r
632          * @return <code>true</code> if successful\r
633          */\r
634         private static boolean importHistoryItems(ReadGraph graph, final Resource historyResource, final HistoryManager history) throws DatabaseException\r
635         {\r
636                 HistoryResource HIS = HistoryResource.getInstance(graph);\r
637                 Layer0 L0 = Layer0.getInstance(graph);\r
638 \r
639                 try {\r
640                         for (Resource oldItem : graph.getObjects(historyResource, L0.ConsistsOf)) {\r
641                                 if (!graph.isInstanceOf(oldItem, HIS.History_Item))\r
642                                         continue;\r
643 \r
644                                 Bean bean = graph.getRelatedValue(oldItem, HIS.History_Item_Info, Bindings.BEAN);\r
645                                 String id = (String) bean.getFieldUnchecked("id");\r
646                                 Object array = graph.getRelatedValue(oldItem, HIS.History_Item_Series, Bindings.BEAN);\r
647                                 Binding arrayBinding = Bindings.getBinding( array.getClass() );\r
648                                 history.modify(bean);\r
649                                 StreamAccessor sa = history.openStream(id, "rw");\r
650                                 try {\r
651                                         sa.setValue(arrayBinding, array);\r
652                                 } finally {\r
653                                         try {\r
654                                                 sa.close();\r
655                                         } catch (AccessorException e) {\r
656                                         }\r
657                                 }\r
658                         }\r
659 \r
660                         return true;\r
661                 } catch (AccessorException e) {\r
662                         throw new DatabaseException(e);\r
663                 } catch (BindingConstructionException e) {\r
664                         throw new DatabaseException(e);\r
665                 } catch (HistoryException e) {\r
666                         throw new DatabaseException(e);\r
667                 }\r
668         }\r
669 \r
670         /**\r
671          * Import all history items from database archive into a history manager.\r
672          * \r
673          * @param graph\r
674          * @param historyResource\r
675          * @param archive \r
676          * @param history\r
677          * @return <code>true</code> if successful\r
678          * @throws DatabaseException \r
679          */\r
680         @SuppressWarnings("resource")\r
681         private static boolean importHistoryArchive(ReadGraph graph, Resource historyResource, Resource archive, HistoryManager history, HistoryImportResult result)\r
682                         throws DatabaseException, IOException, HistoryException\r
683         {\r
684                 RandomAccessBinary rab = graph.getRandomAccessBinary(archive);\r
685                 rab.position(4);\r
686                 DataContainer dc = DataContainers.readHeader(rab);\r
687 \r
688                 if (DEBUG)\r
689                         System.out.println("COMPRESSED HISTORY DATA SIZE: " + (rab.length() - rab.position()));\r
690 \r
691                 if (APPLICATION_X_LZ4.equals(dc.format)) {\r
692                         if (dc.version == ARCHIVE_VERSION_1) {\r
693                                 InputStream in = new RABInputStream(rab);\r
694                                 //in = new BufferedInputStream(in);\r
695                                 in = new LZ4BlockInputStream(in);\r
696                                 return extractHistoryArchiveTar(graph, history, in, result);\r
697                         }\r
698                 } else if (APPLICATION_X_FLZ.equals(dc.format)) {\r
699                         if (dc.version == ARCHIVE_VERSION_1) {\r
700                                 InputStream in = null;\r
701                                 try {\r
702                                         in = new RABInputStream(rab);\r
703                                         in = FastLZ.read(in);\r
704                                         return extractHistoryArchiveTar(graph, history, in, result);\r
705                                 } finally {\r
706                                         FileUtils.uncheckedClose(in);\r
707                                 }\r
708                         }\r
709                 }\r
710                 throw new UnsupportedOperationException("Unsupported format " + dc.format + " and version " + dc.version);\r
711         }\r
712 \r
713         /**\r
714          * Import all history items from graph into a history manager\r
715          * \r
716          * @param graph database access\r
717          * @param history destination history\r
718          * @param rab the archive to extract from\r
719          * @return <code>true</code> if successful\r
720          * @throws DatabaseException \r
721          */\r
722         private static boolean extractHistoryArchiveTar(ReadGraph graph, HistoryManager history, InputStream in, HistoryImportResult result)\r
723                         throws DatabaseException, IOException, HistoryException\r
724         {\r
725                 // tar is not closed on purpose because we do not want to close RandomAccessBinary rab.\r
726                 TarArchiveInputStream tar = new TarArchiveInputStream(in);\r
727 \r
728                 Serializer beanSerializer = Bindings.getSerializerUnchecked(Bindings.BEAN);\r
729 \r
730                 Bean lastItem = null;\r
731                 String lastItemId = null;\r
732                 while (true) {\r
733                         TarArchiveEntry entry = tar.getNextTarEntry();\r
734                         if (entry == null)\r
735                                 break;\r
736 \r
737                         String name = entry.getName();\r
738                         boolean state = name.equals("state");\r
739                         boolean metadata = name.endsWith(".txt");\r
740                         boolean data = name.endsWith(".data");\r
741 \r
742                         if (state) {\r
743                                 if (result != null) {\r
744                                         byte[] st = new byte[(int) entry.getSize()];\r
745                                         tar.read(st);\r
746                                         result.collectorState = (Bean) beanSerializer.deserialize(st);\r
747                                         if (DEBUG)\r
748                                                 System.out.println("READ collector state: " + result.collectorState);\r
749                                 } else {\r
750                                         tar.skip(entry.getSize());\r
751                                 }\r
752                         } else if (metadata) {\r
753                                 byte[] md = new byte[(int) entry.getSize()];\r
754                                 tar.read(md);\r
755                                 if (DEBUG) {\r
756                                         System.out.println("READING Item metadata: " + name);\r
757                                         System.out.println("SERIALIZED ITEM " + name.replace(".txt", "") + ": " + Arrays.toString(md));\r
758                                 }\r
759 \r
760                                 try {\r
761                                         Bean bean = (Bean) beanSerializer.deserialize(md);\r
762                                         if (DEBUG)\r
763                                                 System.out.println("READ Item metadata: " + bean);\r
764                                         history.modify(bean);\r
765                                         lastItem = bean;\r
766                                         lastItemId = (String) bean.getFieldUnchecked("id");\r
767                                 } catch (ClassFormatError e) {\r
768                                         // This is here because LZ4BlockInput/OutputStream seemed to\r
769                                         // be causing weird deserialization errors to occur for\r
770                                         // single data items when trying to deserialize beans. The\r
771                                         // reason being that the serialized data that was read only\r
772                                         // contains the first 512 bytes of the data. After that all\r
773                                         // data is zeros, which causes things like this to happen in\r
774                                         // deserialization:\r
775                                         //signature: rsstzzzzzzzze_b7b532e9\r
776                                         //component(o): id = String\r
777                                         //component(1): variableId = String\r
778                                         //component(2): format = Temp1\r
779                                         //annotation: @org.simantics.databoard.annotations.Union(value=[class org.simantics.databoard.type.BooleanType, class org.simantics.databoard.type.ByteType, class org.simantics.databoard.type.IntegerType, class org.simantics.databoard.type.LongType, class org.simantics.databoard.type.FloatType, class org.simantics.databoard.type.DoubleType, class org.simantics.databoard.type.StringType, class org.simantics.databoard.type.RecordType, class org.simantics.databoard.type.ArrayType, class org.simantics.databoard.type.MapType, class org.simantics.databoard.type.OptionalType, class org.simantics.databoard.type.UnionType, class org.simantics.databoard.type.VariantType])\r
780                                         //component(3):  = Boolean\r
781                                         //component(4):  = Boolean\r
782                                         //component(5):  = Boolean\r
783                                         //component(devil):  = Boolean\r
784                                         //component(7):  = Boolean\r
785                                         //component(music):  = Boolean\r
786                                         //component(9):  = Boolean\r
787                                         //component(10):  = Boolean\r
788                                         //\r
789                                         // For this reason we've switched the default compression to FastLZ\r
790                                         // for now. This is reflected in the input format also.\r
791 \r
792                                         Activator.getDefault().getLog().log(\r
793                                                         new Status(IStatus.ERROR, Activator.PLUGIN_ID, "History item " + name + " metadata deserialization failed.", e));;\r
794                                 }\r
795                         } else if (data && lastItem != null) {\r
796                                 StreamAccessor sa = history.openStream(lastItemId, "rw");\r
797                                 try {\r
798                                         if (sa instanceof BinaryObject) {\r
799                                                 BinaryObject bo = (BinaryObject) sa;\r
800                                                 RandomAccessBinary output = bo.getBinary();\r
801                                                 output.position(0L);\r
802                                                 output.setLength(0L);\r
803                                                 long copiedBytes = FileUtils.copy(tar, new RABOutputStream(output), entry.getSize());\r
804                                                 if (copiedBytes != entry.getSize()) {\r
805                                                         System.err.println("WARNING: item " + lastItemId + ", extracted " + copiedBytes + " bytes while TAR entry said the size to be " + entry.getSize() + " bytes");\r
806                                                 }\r
807                                         } else {\r
808                                                 System.err.println("Skipping item " + lastItemId + " in history import due to StreamAccessor not being an instanceof BinaryObject. StreamAccessor=" + sa);\r
809                                         }\r
810                                 } finally {\r
811                                         try {\r
812                                                 sa.close();\r
813                                                 lastItem = null;\r
814                                                 lastItemId = null;\r
815                                         } catch (AccessorException e) {\r
816                                         }\r
817                                 }\r
818                         }\r
819                 }\r
820 \r
821                 return true;\r
822         }\r
823 \r
824         /**\r
825          * Get request that clears history\r
826          * @return cleaning request\r
827          */\r
828         public static WriteRequest clear(final Resource history) {\r
829                 return new WriteRequest() {\r
830                         @Override\r
831                         public void perform(WriteGraph graph) throws DatabaseException {\r
832                                 HistoryResource HIS = HistoryResource.getInstance(graph);\r
833                                 Layer0 L0 = Layer0.getInstance(graph);\r
834 \r
835                                 // Separate items format\r
836                                 for (Resource oldItem : graph.getObjects(history, L0.ConsistsOf))\r
837                                 {\r
838                                         if (!graph.isInstanceOf(oldItem, HIS.History_Item)) continue;\r
839                                         Resource info = graph.getPossibleObject(oldItem, HIS.History_Item);\r
840                                         if (info!=null) {\r
841                                                 graph.deny(oldItem, HIS.History_Item_Info);\r
842                                                 graph.deny(info);\r
843                                         }\r
844                                         Resource data = graph.getPossibleObject(oldItem, HIS.History_Item_Series);\r
845                                         if (data!=null) {\r
846                                                 graph.deny(oldItem, HIS.History_Item_Series);\r
847                                                 graph.deny(data);\r
848                                         }\r
849                                         graph.deny(history, L0.ConsistsOf, oldItem);\r
850                                         graph.deny(oldItem);\r
851                                 }\r
852 \r
853                                 // Archived format\r
854                                 graph.denyValue(history, HIS.History_archive);\r
855                         }\r
856                 };\r
857         }\r
858 \r
859 \r
860         private static void putNextEntry(TarArchiveOutputStream out, String entryName, long size) throws IOException {\r
861                 TarArchiveEntry entry = new TarArchiveEntry(entryName);\r
862                 entry.setSize(size);\r
863                 out.putArchiveEntry(entry);\r
864         }\r
865 \r
866         /**\r
867          * Shall not close the underlying RandomAccessBinary.\r
868          */\r
869         private static class RABOutputStream extends OutputStream {\r
870 \r
871                 private final RandomAccessBinary output;\r
872 \r
873                 public RABOutputStream(RandomAccessBinary output) {\r
874                         this.output = output;\r
875                 }\r
876 \r
877                 @Override\r
878                 public void write(byte[] b, int off, int len) throws IOException {\r
879                         output.write(b, off, len);\r
880                 }\r
881 \r
882                 @Override\r
883                 public void write(int b) throws IOException {\r
884                         output.write(b);\r
885                 }\r
886 \r
887         }\r
888 \r
889         /**\r
890          * Shall not close the underlying RandomAccessBinary.\r
891          */\r
892         private static class RABInputStream extends InputStream {\r
893 \r
894                 private final RandomAccessBinary input;\r
895 \r
896                 public RABInputStream(RandomAccessBinary input) {\r
897                         this.input = input;\r
898                 }\r
899 \r
900                 @Override\r
901                 public int available() throws IOException {\r
902                         long avail = input.length() - input.position();\r
903                         if ((avail & 0xffffffff00000000L) != 0)\r
904                                 throw new IOException("Number of available bytes truncated in long->int conversion: " + avail);\r
905                         return (int) avail;\r
906                 }\r
907 \r
908                 @Override\r
909                 public int read() throws IOException {\r
910                         try {\r
911                                 return input.readUnsignedByte();\r
912                         } catch (EOFException e) {\r
913                                 return -1;\r
914                         }\r
915                 }\r
916 \r
917                 @Override\r
918                 public int read(byte[] b, int off, int len) throws IOException {\r
919                         long available = input.length() - input.position();\r
920                         if (available == 0)\r
921                                 return -1;\r
922                         int l = (int) Math.min(available, (long) len);\r
923                         input.readFully(b, off, l);\r
924                         return l;\r
925                 }\r
926 \r
927                 @Override\r
928                 public long skip(long n) throws IOException {\r
929                         long count = 0;\r
930                         while (count < n) {\r
931                                 int l = (int) Math.min(n, (long) Integer.MAX_VALUE);\r
932                                 count += input.skipBytes(l);\r
933                         }\r
934                         return count;\r
935                 }\r
936 \r
937         }\r
938 \r
939         private static long profile(Long t1, String string) {\r
940                 if (PROFILE) {\r
941                         long t2 = System.nanoTime();\r
942                         long delta = t1 == null ? 0 : t2-t1;\r
943                         System.out.println("[" + HistoryUtil.class.getSimpleName() + "] PROFILE: " + string + (t1 == null ? "" : " in " + (((double)delta)*1e-6) + " ms"));\r
944                         return t2;\r
945                 }\r
946                 return 0L;\r
947         }\r
948 \r
949 }\r