1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.graph.db;
14 import java.io.DataInput;
15 import java.io.DataOutput;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.TreeMap;
23 import java.util.UUID;
24 import java.util.function.BiFunction;
26 import org.simantics.databoard.Accessors;
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.Datatypes;
29 import org.simantics.databoard.accessor.error.AccessorConstructionException;
30 import org.simantics.databoard.accessor.error.AccessorException;
31 import org.simantics.databoard.accessor.file.FileVariantAccessor;
32 import org.simantics.databoard.binding.Binding;
33 import org.simantics.databoard.binding.error.BindingConstructionException;
34 import org.simantics.databoard.binding.error.DatatypeConstructionException;
35 import org.simantics.databoard.binding.mutable.Variant;
36 import org.simantics.databoard.container.DataContainer;
37 import org.simantics.databoard.container.DataContainers;
38 import org.simantics.databoard.serialization.SerializationException;
39 import org.simantics.databoard.serialization.Serializer;
40 import org.simantics.databoard.type.Datatype;
41 import org.simantics.databoard.util.binary.BinaryFile;
42 import org.simantics.databoard.util.binary.RandomAccessBinary;
43 import org.simantics.db.ReadGraph;
44 import org.simantics.db.RequestProcessor;
45 import org.simantics.db.Resource;
46 import org.simantics.db.Session;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.WriteOnlyGraph;
50 import org.simantics.db.common.CommentMetadata;
51 import org.simantics.db.common.request.ReadRequest;
52 import org.simantics.db.common.request.UniqueRead;
53 import org.simantics.db.common.request.WriteOnlyRequest;
54 import org.simantics.db.common.request.WriteRequest;
55 import org.simantics.db.exception.CancelTransactionException;
56 import org.simantics.db.exception.DatabaseException;
57 import org.simantics.db.service.SerialisationSupport;
58 import org.simantics.db.service.VirtualGraphSupport;
59 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
60 import org.simantics.graph.diff.TransferableGraphDelta1;
61 import org.simantics.graph.representation.Extensions;
62 import org.simantics.graph.representation.External;
63 import org.simantics.graph.representation.Identity;
64 import org.simantics.graph.representation.TransferableGraph1;
65 import org.simantics.graph.representation.Value;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
69 import gnu.trove.list.array.TIntArrayList;
70 import gnu.trove.map.hash.TObjectIntHashMap;
72 public class TransferableGraphs {
73 final static Logger LOGGER = LoggerFactory.getLogger(TransferableGraphs.class);
75 public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
76 if (tg instanceof TransferableGraph1)
78 return importGraph1(session, (TransferableGraph1) tg, advisor);
80 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
83 public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
84 if (tg instanceof TransferableGraph1)
86 return importGraph1(g, (TransferableGraph1) tg, advisor);
88 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
91 public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {
92 if (tg instanceof TransferableGraph1)
94 return importGraph1(session, (TransferableGraph1) tg);
96 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
99 public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {
100 if (tg instanceof TransferableGraph1)
102 return importGraph1(g, (TransferableGraph1) tg);
104 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
107 public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
108 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());
109 processor.syncRequest(new ReadRequest() {
111 public void run(ReadGraph graph) throws DatabaseException {
112 process.prepare(graph);
115 HashSet<Resource> result = new HashSet<Resource>();
116 for(Identity id : tg.identities) {
117 if(id.definition instanceof External) {
118 result.add(process.resources[id.resource]);
125 * Imports transferable graph version 1 to the database. Root advisor is used
126 * to give identities to roots of the transferable graphs. It may be null,
127 * in which case new resources are created for all roots but the root library.
131 * @param advisor root advisor or <code>null</code>
132 * @throws DatabaseException
134 public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {
136 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
138 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
139 advisor == null ? new ImportAdvisor() : advisor);
140 session.syncRequest(new ReadRequest() {
142 public void run(ReadGraph graph) throws DatabaseException {
143 process.prepare(graph);
146 session.syncRequest(new WriteOnlyRequest() {
148 public void perform(WriteOnlyGraph graph) throws DatabaseException {
149 advisor.beforeWrite(graph, process);
150 process.write(graph);
151 advisor.afterWrite(graph, process);
154 return process.getResourceIds(
155 session.getService(SerialisationSupport.class));
158 public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteOnlyGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
159 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
160 advisor == null ? new ImportAdvisor() : advisor);
161 session.syncRequest(new ReadRequest() {
163 public void run(ReadGraph graph) throws DatabaseException {
164 process.prepare(graph);
167 session.syncRequest(new WriteOnlyRequest() {
169 public void perform(WriteOnlyGraph graph) throws DatabaseException {
170 process.write(graph);
172 callback.apply(graph, process);
177 public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {
178 return importGraph1(session, tg, advisor, null);
181 public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
182 return importGraph1(session, null, tg, advisor, monitor);
185 public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
187 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
189 final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor);
190 session.syncRequest(new ReadRequest() {
192 public void run(ReadGraph graph) throws DatabaseException {
194 process.prepare(graph);
195 } catch (DatabaseException e) {
197 } catch (Exception e) {
198 throw new DatabaseException(e);
202 session.syncRequest(new WriteOnlyRequest(vg) {
204 public void perform(WriteOnlyGraph graph) throws DatabaseException {
206 advisor.beforeWrite(graph, process);
207 process.write(graph);
208 advisor.afterWrite(graph, process);
209 } catch (Exception e) {
210 throw new DatabaseException(e);
215 return new ImportResult(process.missingExternals);
218 public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
219 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
220 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
221 advisor == null ? new ImportAdvisor() : advisor, monitor);
222 session.syncRequest(new ReadRequest() {
224 public void run(ReadGraph graph) throws DatabaseException {
225 process.prepare(graph);
228 session.syncRequest(new WriteOnlyRequest() {
230 public void perform(WriteOnlyGraph graph) throws DatabaseException {
231 advisor.beforeWrite(graph, process);
232 process.write2(graph);
233 advisor.afterWrite(graph, process);
234 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
235 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
236 graph.addMetadata(comments);
241 public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
242 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
243 advisor == null ? new ImportAdvisor() : advisor);
244 session.syncRequest(new ReadRequest() {
246 public void run(ReadGraph graph) throws DatabaseException {
247 process.prepare(graph);
250 session.syncRequest(new WriteRequest() {
252 public void perform(WriteGraph graph) throws DatabaseException {
253 process.write2(graph);
254 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
255 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
256 graph.addMetadata(comments);
258 callback.apply(graph, process);
263 public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
264 final TransferableGraphImportProcess process =
265 new TransferableGraphImportProcess(tg,
267 session.syncRequest(new ReadRequest() {
269 public void run(ReadGraph graph) throws DatabaseException {
270 process.prepare(graph);
273 session.syncRequest(new WriteOnlyRequest() {
275 public void perform(WriteOnlyGraph graph) throws DatabaseException {
276 process.write(graph);
279 return process.getResourceIds(
280 session.getService(SerialisationSupport.class));
283 public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
284 final TransferableGraphImportProcess process =
285 new TransferableGraphImportProcess(tg,
287 process.prepare(graph);
288 process.write2(graph);
289 return process.getResourceIds(
290 graph.getSession().getService(SerialisationSupport.class));
294 * Import transferable graph version 1 to the database. Root advisor is used
295 * to give identities to roots of the transferable graphs. It may be null,
296 * in which case new resources are created for all roots but the root library.
300 * @param advisor root advisor or <code>null</code>
301 * @throws DatabaseException
303 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
304 return importGraph1(graph, tg, advisor, null);
307 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
308 TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
309 advisor == null ? new ImportAdvisor() : advisor, monitor);
310 process.prepare(graph);
311 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);
312 process.write2(graph);
313 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);
314 return process.getResourceIds(
315 graph.getSession().getService(SerialisationSupport.class));
318 public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
319 SerialisationSupport serializer =
320 graph.getSession().getService(SerialisationSupport.class);
322 TGToGraphMap aMap = new TGToGraphMap(delta.a);
323 aMap.addOldResources(serializer, oldResources);
326 TGToGraphMap bMap = new TGToGraphMap(delta.b);
327 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
331 return bMap.getResources(serializer);
334 public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
336 SerialisationSupport serializer =
337 graph.getSession().getService(SerialisationSupport.class);
339 TGToGraphMap aMap = new TGToGraphMap(delta.a);
340 aMap.addOldResources(serializer, oldResources);
341 if(aMap.checkDeny(graph)) return true;
343 TGToGraphMap bMap = new TGToGraphMap(delta.b);
344 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
346 return bMap.checkClaim(graph);
350 public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,
351 ImportAdvisor advisor) throws TransferableGraphException {
352 // TODO HANNU IMPLEMENTS
353 throw new UnsupportedOperationException();
356 public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
357 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
358 advisor == null ? new ImportAdvisor() : advisor);
359 session.syncRequest(new ReadRequest() {
361 public void run(ReadGraph graph) throws DatabaseException {
362 process.prepare(graph);
365 session.syncRequest(new WriteOnlyRequest(vg) {
367 public void perform(WriteOnlyGraph graph) throws DatabaseException {
368 // Needed because process#write does not support virtual WriteOnlyGraph
370 process.write2(graph);
372 process.write(graph);
376 return process.getResourceIds(session.getService(SerialisationSupport.class));
379 public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
380 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
381 advisor == null ? new ImportAdvisor() : advisor);
382 process.prepare(graph);
383 process.write2(graph);
384 return process.getResourceIds(graph.getService(SerialisationSupport.class));
387 public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {
388 FileVariantAccessor va = null;
390 va = Accessors.openAccessor(file);
391 Datatype type = va.getContentType();
392 if(type.equals(Datatypes.getDatatype(TransferableGraph1.class)))
393 return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));
395 throw new SerializationException("Unknown transferable graph data type.");
396 } catch (AccessorException e) {
397 throw new TransferableGraphException(e);
398 } catch (BindingConstructionException e) {
399 throw new TransferableGraphException(e);
400 } catch (SerializationException e) {
401 throw new TransferableGraphException(e);
402 } catch (AccessorConstructionException e) {
403 throw new TransferableGraphException(e);
404 } catch (DatatypeConstructionException e) {
405 throw new TransferableGraphException(e);
410 } catch (AccessorException e) {
416 public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {
418 importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());
419 } catch (DatabaseException e) {
420 throw new TransferableGraphException(e);
424 public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {
425 VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);
426 VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());
427 importVirtualGraph(session, vg, file);
431 public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {
432 writeTransferableGraph(processor, format, 1, source, target);
435 public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {
436 writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);
439 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {
440 writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);
443 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
444 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
445 DataContainers.writeHeader(out, new DataContainer(format, version, metadata, null));
446 writeTransferableGraphVariant(processor, source, out, monitor);
450 public static void writeTransferableGraph(RequestProcessor processor, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
451 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
452 writeTransferableGraphVariant(processor, source, out, monitor);
456 public static void writeTransferableGraphVariant(RequestProcessor processor, TransferableGraphSource source, RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
457 Bindings.getSerializerUnchecked(Datatype.class).serialize(out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));
458 writeTransferableGraph(processor, source, out, monitor);
461 private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {
462 return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;
465 private static class CopyingInputStream extends InputStream {
467 public DataOutput out;
470 public int read() throws IOException {
471 int value = in.readUnsignedByte();
477 private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {
479 long bufferLength = buffer.length;
480 while (read < bytesToCopy) {
481 int l = (int) Math.min(bufferLength, bytesToCopy-read);
482 in.readFully(buffer, 0, l);
483 out.write(buffer, 0, l);
489 private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;
491 private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
492 long start = System.nanoTime();
494 final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
495 final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);
496 final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);
498 int resourceCount = source.getResourceCount();
499 //System.err.println("resourceCount: " + resourceCount);
500 out.writeInt(resourceCount);
501 extensionSerializer.serialize(out, new Extensions(source.getExtensions()));
503 // System.err.println("resource count: " + source.getResourceCount());
504 // System.err.println("identity count: " + source.getIdentityCount());
506 byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];
508 processor.syncRequest(new ReadRequest() {
510 public void run(ReadGraph graph) throws DatabaseException {
512 if (monitor.isCanceled())
513 throw new CancelTransactionException();
515 int identityCount = source.getIdentityCount();
516 TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);
517 out.writeInt(identityCount);
518 //System.err.println("identities: " + identityCount);
519 source.forIdentities(graph, value -> {
520 //System.err.println("id: " + value);
521 identitySerializer.serialize(out, value);
522 identityProgress.worked(1);
525 if (monitor.isCanceled())
526 throw new CancelTransactionException();
528 long statementCountPos = out.position();
529 int originalStatementCount = source.getStatementCount();
530 TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);
531 out.writeInt(originalStatementCount);
532 //System.err.println("original statementCount: " + originalStatementCount);
533 int[] statementCounter = { 0 };
534 source.forStatements(graph, r -> {
535 for (int i = 0; i < 4; ++i)
537 statementCounter[0]++;
538 //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);
539 statementProgress.worked(1);
541 //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");
543 // Rewrite statement count after knowing exactly how many
544 // statements were written. It is possible that some
545 // statements get filtered out at this stage and the
546 // original statement count does not reflect that.
547 long afterStatementsPos = out.position();
548 out.position(statementCountPos);
549 out.writeInt(statementCounter[0]*4);
550 out.position(afterStatementsPos);
552 if (monitor.isCanceled())
553 throw new CancelTransactionException();
555 int valueCount = source.getValueCount();
556 TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);
557 out.writeInt(valueCount);
558 // System.err.println("valueCount: " + valueCount);
559 CopyingInputStream cis = new CopyingInputStream();
561 source.forValues2(graph, new TransferableGraphSourceValueProcedure() {
562 TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();
565 public void rawCopy(int resource, int length, DataInput input) throws Exception {
566 out.writeInt(resource);
567 long copied = copy(buffer, input, out, length);
568 assert copied == length;
569 //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");
570 valueProgress.worked(1);
574 public void execute(int resource, Datatype type, DataInput input) throws Exception {
575 out.writeInt(resource);
577 datatypeSerializer.serialize(out, identities, type);
578 Binding binding = Bindings.getBinding(type);
579 Serializer serializer = Bindings.getSerializer(binding);
581 serializer.skip(cis);
583 valueProgress.worked(1);
587 } catch (DatabaseException e) {
589 } catch (Exception e) {
590 throw new DatabaseException(e);
595 long end = System.nanoTime();
596 LOGGER.info("Wrote transferable graph in {} seconds.", 1e-9*(end-start));
599 public static TransferableGraph1 create(RequestProcessor processor, TransferableGraphSource source) throws DatabaseException {
601 return processor.syncRequest(new UniqueRead<TransferableGraph1>() {
604 public TransferableGraph1 perform(ReadGraph graph) throws DatabaseException {
605 return create(graph, source);
612 public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {
616 ArrayList<Identity> identities = new ArrayList<>(source.getIdentityCount());
617 source.forIdentities(graph, i -> identities.add(i));
618 TIntArrayList statements = new TIntArrayList(source.getStatementCount());
619 source.forStatements(graph, r -> statements.addAll(r));
620 ArrayList<Value> values = new ArrayList<>(source.getValueCount());
621 source.forValues(graph, v -> values.add(v));
623 return new TransferableGraph1(source.getResourceCount(),
624 identities.toArray(new Identity[identities.size()]),
625 statements.toArray(),
626 values.toArray(new Value[values.size()]),
627 source.getExtensions());
629 } catch (Exception e) {
631 throw new DatabaseException(e);