1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.graph.db;
14 import java.io.DataInput;
15 import java.io.DataOutput;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.TreeMap;
23 import java.util.UUID;
25 import org.simantics.databoard.Accessors;
26 import org.simantics.databoard.Bindings;
27 import org.simantics.databoard.Datatypes;
28 import org.simantics.databoard.accessor.error.AccessorConstructionException;
29 import org.simantics.databoard.accessor.error.AccessorException;
30 import org.simantics.databoard.accessor.file.FileVariantAccessor;
31 import org.simantics.databoard.binding.Binding;
32 import org.simantics.databoard.binding.error.BindingConstructionException;
33 import org.simantics.databoard.binding.error.DatatypeConstructionException;
34 import org.simantics.databoard.binding.mutable.Variant;
35 import org.simantics.databoard.container.DataContainer;
36 import org.simantics.databoard.container.DataContainers;
37 import org.simantics.databoard.serialization.SerializationException;
38 import org.simantics.databoard.serialization.Serializer;
39 import org.simantics.databoard.type.Datatype;
40 import org.simantics.databoard.util.binary.BinaryFile;
41 import org.simantics.databoard.util.binary.RandomAccessBinary;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.RequestProcessor;
44 import org.simantics.db.Resource;
45 import org.simantics.db.Session;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.WriteGraph;
48 import org.simantics.db.WriteOnlyGraph;
49 import org.simantics.db.common.CommentMetadata;
50 import org.simantics.db.common.request.ReadRequest;
51 import org.simantics.db.common.request.WriteOnlyRequest;
52 import org.simantics.db.common.request.WriteRequest;
53 import org.simantics.db.exception.CancelTransactionException;
54 import org.simantics.db.exception.DatabaseException;
55 import org.simantics.db.service.SerialisationSupport;
56 import org.simantics.db.service.VirtualGraphSupport;
57 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
58 import org.simantics.graph.diff.TransferableGraphDelta1;
59 import org.simantics.graph.representation.Extensions;
60 import org.simantics.graph.representation.External;
61 import org.simantics.graph.representation.Identity;
62 import org.simantics.graph.representation.TransferableGraph1;
63 import org.simantics.graph.representation.Value;
64 import org.simantics.utils.datastructures.BinaryFunction;
66 import gnu.trove.list.array.TIntArrayList;
67 import gnu.trove.map.hash.TObjectIntHashMap;
69 public class TransferableGraphs {
71 public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
72 if (tg instanceof TransferableGraph1)
74 return importGraph1(session, (TransferableGraph1) tg, advisor);
76 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
79 public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
80 if (tg instanceof TransferableGraph1)
82 return importGraph1(g, (TransferableGraph1) tg, advisor);
84 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
87 public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {
88 if (tg instanceof TransferableGraph1)
90 return importGraph1(session, (TransferableGraph1) tg);
92 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
95 public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {
96 if (tg instanceof TransferableGraph1)
98 return importGraph1(g, (TransferableGraph1) tg);
100 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
103 public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
104 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());
105 processor.syncRequest(new ReadRequest() {
107 public void run(ReadGraph graph) throws DatabaseException {
108 process.prepare(graph);
111 HashSet<Resource> result = new HashSet<Resource>();
112 for(Identity id : tg.identities) {
113 if(id.definition instanceof External) {
114 result.add(process.resources[id.resource]);
121 * Imports transferable graph version 1 to the database. Root advisor is used
122 * to give identities to roots of the transferable graphs. It may be null,
123 * in which case new resources are created for all roots but the root library.
127 * @param advisor root advisor or <code>null</code>
128 * @throws DatabaseException
130 public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {
132 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
134 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
135 advisor == null ? new ImportAdvisor() : advisor);
136 session.syncRequest(new ReadRequest() {
138 public void run(ReadGraph graph) throws DatabaseException {
139 process.prepare(graph);
142 session.syncRequest(new WriteOnlyRequest() {
144 public void perform(WriteOnlyGraph graph) throws DatabaseException {
145 advisor.beforeWrite(graph, process);
146 process.write(graph);
147 advisor.afterWrite(graph, process);
150 return process.getResourceIds(
151 session.getService(SerialisationSupport.class));
154 public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteOnlyGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {
155 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
156 advisor == null ? new ImportAdvisor() : advisor);
157 session.syncRequest(new ReadRequest() {
159 public void run(ReadGraph graph) throws DatabaseException {
160 process.prepare(graph);
163 session.syncRequest(new WriteOnlyRequest() {
165 public void perform(WriteOnlyGraph graph) throws DatabaseException {
166 process.write(graph);
168 callback.call(graph, process);
173 public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {
174 importGraph1(session, tg, advisor, null);
177 public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
178 importGraph1(session, null, tg, advisor, monitor);
181 public static void importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
183 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
185 final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor);
186 session.syncRequest(new ReadRequest() {
188 public void run(ReadGraph graph) throws DatabaseException {
190 process.prepare(graph);
191 } catch (DatabaseException e) {
193 } catch (Exception e) {
194 throw new DatabaseException(e);
198 session.syncRequest(new WriteOnlyRequest(vg) {
200 public void perform(WriteOnlyGraph graph) throws DatabaseException {
202 advisor.beforeWrite(graph, process);
203 process.write(graph);
204 advisor.afterWrite(graph, process);
205 } catch (Exception e) {
206 throw new DatabaseException(e);
212 public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
213 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
214 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
215 advisor == null ? new ImportAdvisor() : advisor, monitor);
216 session.syncRequest(new ReadRequest() {
218 public void run(ReadGraph graph) throws DatabaseException {
219 process.prepare(graph);
222 session.syncRequest(new WriteOnlyRequest() {
224 public void perform(WriteOnlyGraph graph) throws DatabaseException {
225 advisor.beforeWrite(graph, process);
226 process.write2(graph);
227 advisor.afterWrite(graph, process);
228 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
229 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
230 graph.addMetadata(comments);
235 public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {
236 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
237 advisor == null ? new ImportAdvisor() : advisor);
238 session.syncRequest(new ReadRequest() {
240 public void run(ReadGraph graph) throws DatabaseException {
241 process.prepare(graph);
244 session.syncRequest(new WriteRequest() {
246 public void perform(WriteGraph graph) throws DatabaseException {
247 process.write2(graph);
248 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
249 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
250 graph.addMetadata(comments);
252 callback.call(graph, process);
257 public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
258 final TransferableGraphImportProcess process =
259 new TransferableGraphImportProcess(tg,
261 session.syncRequest(new ReadRequest() {
263 public void run(ReadGraph graph) throws DatabaseException {
264 process.prepare(graph);
267 session.syncRequest(new WriteOnlyRequest() {
269 public void perform(WriteOnlyGraph graph) throws DatabaseException {
270 process.write(graph);
273 return process.getResourceIds(
274 session.getService(SerialisationSupport.class));
277 public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
278 final TransferableGraphImportProcess process =
279 new TransferableGraphImportProcess(tg,
281 process.prepare(graph);
282 process.write2(graph);
283 return process.getResourceIds(
284 graph.getSession().getService(SerialisationSupport.class));
288 * Import transferable graph version 1 to the database. Root advisor is used
289 * to give identities to roots of the transferable graphs. It may be null,
290 * in which case new resources are created for all roots but the root library.
294 * @param advisor root advisor or <code>null</code>
295 * @throws DatabaseException
297 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
298 return importGraph1(graph, tg, advisor, null);
301 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
302 TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
303 advisor == null ? new ImportAdvisor() : advisor, monitor);
304 process.prepare(graph);
305 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);
306 process.write2(graph);
307 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);
308 return process.getResourceIds(
309 graph.getSession().getService(SerialisationSupport.class));
312 public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
313 SerialisationSupport serializer =
314 graph.getSession().getService(SerialisationSupport.class);
316 TGToGraphMap aMap = new TGToGraphMap(delta.a);
317 aMap.addOldResources(serializer, oldResources);
320 TGToGraphMap bMap = new TGToGraphMap(delta.b);
321 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
325 return bMap.getResources(serializer);
328 public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
330 SerialisationSupport serializer =
331 graph.getSession().getService(SerialisationSupport.class);
333 TGToGraphMap aMap = new TGToGraphMap(delta.a);
334 aMap.addOldResources(serializer, oldResources);
335 if(aMap.checkDeny(graph)) return true;
337 TGToGraphMap bMap = new TGToGraphMap(delta.b);
338 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
340 return bMap.checkClaim(graph);
344 public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,
345 ImportAdvisor advisor) throws TransferableGraphException {
346 // TODO HANNU IMPLEMENTS
347 throw new UnsupportedOperationException();
350 public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
351 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
352 advisor == null ? new ImportAdvisor() : advisor);
353 session.syncRequest(new ReadRequest() {
355 public void run(ReadGraph graph) throws DatabaseException {
356 process.prepare(graph);
359 session.syncRequest(new WriteOnlyRequest(vg) {
361 public void perform(WriteOnlyGraph graph) throws DatabaseException {
362 // Needed because process#write does not support virtual WriteOnlyGraph
364 process.write2(graph);
366 process.write(graph);
370 return process.getResourceIds(session.getService(SerialisationSupport.class));
373 public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
374 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
375 advisor == null ? new ImportAdvisor() : advisor);
376 process.prepare(graph);
377 process.write2(graph);
378 return process.getResourceIds(graph.getService(SerialisationSupport.class));
381 public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {
382 FileVariantAccessor va = null;
384 va = Accessors.openAccessor(file);
385 Datatype type = va.getContentType();
386 if(type.equals(Datatypes.getDatatype(TransferableGraph1.class)))
387 return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));
389 throw new SerializationException("Unknown transferable graph data type.");
390 } catch (AccessorException e) {
391 throw new TransferableGraphException(e);
392 } catch (BindingConstructionException e) {
393 throw new TransferableGraphException(e);
394 } catch (SerializationException e) {
395 throw new TransferableGraphException(e);
396 } catch (AccessorConstructionException e) {
397 throw new TransferableGraphException(e);
398 } catch (DatatypeConstructionException e) {
399 throw new TransferableGraphException(e);
404 } catch (AccessorException e) {
410 public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {
412 importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());
413 } catch (DatabaseException e) {
414 throw new TransferableGraphException(e);
418 public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {
419 VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);
420 VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());
421 importVirtualGraph(session, vg, file);
425 public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {
426 writeTransferableGraph(processor, format, 1, source, target);
429 public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {
430 writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);
433 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {
434 writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);
437 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
438 final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
439 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
440 DataContainer container = new DataContainer(format, version, metadata, null);
441 DataContainers.writeHeader(out, container);
442 datatypeSerializer.serialize((DataOutput) out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));
443 writeTransferableGraph(processor, source, out, monitor);
447 private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {
448 return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;
451 private static class CopyingInputStream extends InputStream {
453 public DataOutput out;
456 public int read() throws IOException {
457 int value = in.readUnsignedByte();
463 private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {
465 long bufferLength = buffer.length;
466 while (read < bytesToCopy) {
467 int l = (int) Math.min(bufferLength, bytesToCopy-read);
468 in.readFully(buffer, 0, l);
469 out.write(buffer, 0, l);
475 private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;
477 private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
478 long start = System.nanoTime();
480 final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
481 final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);
482 final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);
484 int resourceCount = source.getResourceCount();
485 //System.err.println("resourceCount: " + resourceCount);
486 out.writeInt(resourceCount);
487 extensionSerializer.serialize(out, new Extensions(source.getExtensions()));
489 // System.err.println("resource count: " + source.getResourceCount());
490 // System.err.println("identity count: " + source.getIdentityCount());
492 byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];
494 processor.syncRequest(new ReadRequest() {
496 public void run(ReadGraph graph) throws DatabaseException {
498 if (monitor.isCanceled())
499 throw new CancelTransactionException();
501 int identityCount = source.getIdentityCount();
502 TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);
503 out.writeInt(identityCount);
504 //System.err.println("identities: " + identityCount);
505 source.forIdentities(graph, value -> {
506 //System.err.println("id: " + value);
507 identitySerializer.serialize(out, value);
508 identityProgress.worked(1);
511 if (monitor.isCanceled())
512 throw new CancelTransactionException();
514 long statementCountPos = out.position();
515 int originalStatementCount = source.getStatementCount();
516 TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);
517 out.writeInt(originalStatementCount);
518 //System.err.println("original statementCount: " + originalStatementCount);
519 int[] statementCounter = { 0 };
520 source.forStatements(graph, r -> {
521 for (int i = 0; i < 4; ++i)
523 statementCounter[0]++;
524 //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);
525 statementProgress.worked(1);
527 //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");
529 // Rewrite statement count after knowing exactly how many
530 // statements were written. It is possible that some
531 // statements get filtered out at this stage and the
532 // original statement count does not reflect that.
533 long afterStatementsPos = out.position();
534 out.position(statementCountPos);
535 out.writeInt(statementCounter[0]*4);
536 out.position(afterStatementsPos);
538 if (monitor.isCanceled())
539 throw new CancelTransactionException();
541 int valueCount = source.getValueCount();
542 TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);
543 out.writeInt(valueCount);
544 // System.err.println("valueCount: " + valueCount);
545 CopyingInputStream cis = new CopyingInputStream();
547 source.forValues2(graph, new TransferableGraphSourceValueProcedure() {
548 TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();
551 public void rawCopy(int resource, int length, DataInput input) throws Exception {
552 out.writeInt(resource);
553 long copied = copy(buffer, input, out, length);
554 assert copied == length;
555 //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");
556 valueProgress.worked(1);
560 public void execute(int resource, Datatype type, DataInput input) throws Exception {
561 out.writeInt(resource);
563 datatypeSerializer.serialize(out, identities, type);
564 Binding binding = Bindings.getBinding(type);
565 Serializer serializer = Bindings.getSerializer(binding);
567 serializer.skip(cis);
569 valueProgress.worked(1);
573 } catch (DatabaseException e) {
575 } catch (Exception e) {
576 throw new DatabaseException(e);
581 long end = System.nanoTime();
582 System.err.println("Wrote transferable graph in " + 1e-9*(end-start) + " seconds.");
585 public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {
587 final TIntArrayList statements = new TIntArrayList();
588 final ArrayList<Value> values = new ArrayList<>();
589 final ArrayList<Identity> identities = new ArrayList<>();
593 source.forStatements(graph, r -> statements.addAll(r));
594 source.forValues(graph, v -> values.add(v));
595 source.forIdentities(graph, i -> identities.add(i));
597 return new TransferableGraph1(source.getResourceCount(),
598 identities.toArray(new Identity[identities.size()]),
599 statements.toArray(),
600 values.toArray(new Value[values.size()]),
601 source.getExtensions());
603 } catch (Exception e) {
605 throw new DatabaseException(e);