1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.graph.db;
14 import java.io.DataInput;
15 import java.io.DataOutput;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.TreeMap;
23 import java.util.UUID;
24 import java.util.function.BiFunction;
26 import org.simantics.databoard.Accessors;
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.Datatypes;
29 import org.simantics.databoard.accessor.error.AccessorConstructionException;
30 import org.simantics.databoard.accessor.error.AccessorException;
31 import org.simantics.databoard.accessor.file.FileVariantAccessor;
32 import org.simantics.databoard.binding.Binding;
33 import org.simantics.databoard.binding.error.BindingConstructionException;
34 import org.simantics.databoard.binding.error.DatatypeConstructionException;
35 import org.simantics.databoard.binding.mutable.Variant;
36 import org.simantics.databoard.container.DataContainer;
37 import org.simantics.databoard.container.DataContainers;
38 import org.simantics.databoard.serialization.SerializationException;
39 import org.simantics.databoard.serialization.Serializer;
40 import org.simantics.databoard.type.Datatype;
41 import org.simantics.databoard.util.binary.BinaryFile;
42 import org.simantics.databoard.util.binary.RandomAccessBinary;
43 import org.simantics.db.ReadGraph;
44 import org.simantics.db.RequestProcessor;
45 import org.simantics.db.Resource;
46 import org.simantics.db.Session;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.WriteOnlyGraph;
50 import org.simantics.db.common.CommentMetadata;
51 import org.simantics.db.common.request.ReadRequest;
52 import org.simantics.db.common.request.WriteOnlyRequest;
53 import org.simantics.db.common.request.WriteRequest;
54 import org.simantics.db.exception.CancelTransactionException;
55 import org.simantics.db.exception.DatabaseException;
56 import org.simantics.db.service.SerialisationSupport;
57 import org.simantics.db.service.VirtualGraphSupport;
58 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
59 import org.simantics.graph.diff.TransferableGraphDelta1;
60 import org.simantics.graph.representation.Extensions;
61 import org.simantics.graph.representation.External;
62 import org.simantics.graph.representation.Identity;
63 import org.simantics.graph.representation.TransferableGraph1;
64 import org.simantics.graph.representation.Value;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 import gnu.trove.list.array.TIntArrayList;
69 import gnu.trove.map.hash.TObjectIntHashMap;
71 public class TransferableGraphs {
72 final static Logger LOGGER = LoggerFactory.getLogger(TransferableGraphs.class);
74 public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
75 if (tg instanceof TransferableGraph1)
77 return importGraph1(session, (TransferableGraph1) tg, advisor);
79 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
82 public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
83 if (tg instanceof TransferableGraph1)
85 return importGraph1(g, (TransferableGraph1) tg, advisor);
87 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
90 public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {
91 if (tg instanceof TransferableGraph1)
93 return importGraph1(session, (TransferableGraph1) tg);
95 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
98 public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {
99 if (tg instanceof TransferableGraph1)
101 return importGraph1(g, (TransferableGraph1) tg);
103 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
106 public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
107 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());
108 processor.syncRequest(new ReadRequest() {
110 public void run(ReadGraph graph) throws DatabaseException {
111 process.prepare(graph);
114 HashSet<Resource> result = new HashSet<Resource>();
115 for(Identity id : tg.identities) {
116 if(id.definition instanceof External) {
117 result.add(process.resources[id.resource]);
124 * Imports transferable graph version 1 to the database. Root advisor is used
125 * to give identities to roots of the transferable graphs. It may be null,
126 * in which case new resources are created for all roots but the root library.
130 * @param advisor root advisor or <code>null</code>
131 * @throws DatabaseException
133 public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {
135 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
137 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
138 advisor == null ? new ImportAdvisor() : advisor);
139 session.syncRequest(new ReadRequest() {
141 public void run(ReadGraph graph) throws DatabaseException {
142 process.prepare(graph);
145 session.syncRequest(new WriteOnlyRequest() {
147 public void perform(WriteOnlyGraph graph) throws DatabaseException {
148 advisor.beforeWrite(graph, process);
149 process.write(graph);
150 advisor.afterWrite(graph, process);
153 return process.getResourceIds(
154 session.getService(SerialisationSupport.class));
157 public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteOnlyGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
158 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
159 advisor == null ? new ImportAdvisor() : advisor);
160 session.syncRequest(new ReadRequest() {
162 public void run(ReadGraph graph) throws DatabaseException {
163 process.prepare(graph);
166 session.syncRequest(new WriteOnlyRequest() {
168 public void perform(WriteOnlyGraph graph) throws DatabaseException {
169 process.write(graph);
171 callback.apply(graph, process);
176 public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {
177 return importGraph1(session, tg, advisor, null);
180 public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
181 return importGraph1(session, null, tg, advisor, monitor);
184 public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
185 return importGraph1(session, vg, tg, advisor_, monitor, null);
188 public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor, Boolean failOnMissing) throws DatabaseException {
190 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
192 final StreamingTransferableGraphImportProcess process = failOnMissing == null ? new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor)
193 : new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor, failOnMissing);
194 session.syncRequest(new ReadRequest() {
196 public void run(ReadGraph graph) throws DatabaseException {
198 process.prepare(graph);
199 } catch (DatabaseException e) {
201 } catch (Exception e) {
202 throw new DatabaseException(e);
206 session.syncRequest(new WriteOnlyRequest(vg) {
208 public void perform(WriteOnlyGraph graph) throws DatabaseException {
210 advisor.beforeWrite(graph, process);
211 process.write(graph);
212 advisor.afterWrite(graph, process);
213 } catch (Exception e) {
214 throw new DatabaseException(e);
219 return new ImportResult(process.missingExternals);
222 public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
223 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
224 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
225 advisor == null ? new ImportAdvisor() : advisor, monitor);
226 session.syncRequest(new ReadRequest() {
228 public void run(ReadGraph graph) throws DatabaseException {
229 process.prepare(graph);
232 session.syncRequest(new WriteOnlyRequest() {
234 public void perform(WriteOnlyGraph graph) throws DatabaseException {
235 advisor.beforeWrite(graph, process);
236 process.write2(graph);
237 advisor.afterWrite(graph, process);
238 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
239 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
240 graph.addMetadata(comments);
245 public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
246 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
247 advisor == null ? new ImportAdvisor() : advisor);
248 session.syncRequest(new ReadRequest() {
250 public void run(ReadGraph graph) throws DatabaseException {
251 process.prepare(graph);
254 session.syncRequest(new WriteRequest() {
256 public void perform(WriteGraph graph) throws DatabaseException {
257 process.write2(graph);
258 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
259 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
260 graph.addMetadata(comments);
262 callback.apply(graph, process);
267 public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
268 final TransferableGraphImportProcess process =
269 new TransferableGraphImportProcess(tg,
271 session.syncRequest(new ReadRequest() {
273 public void run(ReadGraph graph) throws DatabaseException {
274 process.prepare(graph);
277 session.syncRequest(new WriteOnlyRequest() {
279 public void perform(WriteOnlyGraph graph) throws DatabaseException {
280 process.write(graph);
283 return process.getResourceIds(
284 session.getService(SerialisationSupport.class));
287 public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
288 final TransferableGraphImportProcess process =
289 new TransferableGraphImportProcess(tg,
291 process.prepare(graph);
292 process.write2(graph);
293 return process.getResourceIds(
294 graph.getSession().getService(SerialisationSupport.class));
298 * Import transferable graph version 1 to the database. Root advisor is used
299 * to give identities to roots of the transferable graphs. It may be null,
300 * in which case new resources are created for all roots but the root library.
304 * @param advisor root advisor or <code>null</code>
305 * @throws DatabaseException
307 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
308 return importGraph1(graph, tg, advisor, null);
311 public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
312 TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
313 advisor == null ? new ImportAdvisor() : advisor, monitor);
314 process.prepare(graph);
315 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);
316 process.write2(graph);
317 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);
318 return process.getResourceIds(
319 graph.getSession().getService(SerialisationSupport.class));
322 public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
323 SerialisationSupport serializer =
324 graph.getSession().getService(SerialisationSupport.class);
326 TGToGraphMap aMap = new TGToGraphMap(delta.a);
327 aMap.addOldResources(serializer, oldResources);
330 TGToGraphMap bMap = new TGToGraphMap(delta.b);
331 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
335 return bMap.getResources(serializer);
338 public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
340 SerialisationSupport serializer =
341 graph.getSession().getService(SerialisationSupport.class);
343 TGToGraphMap aMap = new TGToGraphMap(delta.a);
344 aMap.addOldResources(serializer, oldResources);
345 if(aMap.checkDeny(graph)) return true;
347 TGToGraphMap bMap = new TGToGraphMap(delta.b);
348 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
350 return bMap.checkClaim(graph);
354 public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,
355 ImportAdvisor advisor) throws TransferableGraphException {
356 // TODO HANNU IMPLEMENTS
357 throw new UnsupportedOperationException();
360 public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
361 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
362 advisor == null ? new ImportAdvisor() : advisor);
363 session.syncRequest(new ReadRequest() {
365 public void run(ReadGraph graph) throws DatabaseException {
366 process.prepare(graph);
369 session.syncRequest(new WriteOnlyRequest(vg) {
371 public void perform(WriteOnlyGraph graph) throws DatabaseException {
372 // Needed because process#write does not support virtual WriteOnlyGraph
374 process.write2(graph);
376 process.write(graph);
380 return process.getResourceIds(session.getService(SerialisationSupport.class));
383 public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
384 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
385 advisor == null ? new ImportAdvisor() : advisor);
386 process.prepare(graph);
387 process.write2(graph);
388 return process.getResourceIds(graph.getService(SerialisationSupport.class));
391 public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {
392 FileVariantAccessor va = null;
394 va = Accessors.openAccessor(file);
395 Datatype type = va.getContentType();
396 if(type.equals(Datatypes.getDatatype(TransferableGraph1.class)))
397 return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));
399 throw new SerializationException("Unknown transferable graph data type.");
400 } catch (AccessorException e) {
401 throw new TransferableGraphException(e);
402 } catch (BindingConstructionException e) {
403 throw new TransferableGraphException(e);
404 } catch (SerializationException e) {
405 throw new TransferableGraphException(e);
406 } catch (AccessorConstructionException e) {
407 throw new TransferableGraphException(e);
408 } catch (DatatypeConstructionException e) {
409 throw new TransferableGraphException(e);
414 } catch (AccessorException e) {
420 public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {
422 importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());
423 } catch (DatabaseException e) {
424 throw new TransferableGraphException(e);
428 public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {
429 VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);
430 VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());
431 importVirtualGraph(session, vg, file);
435 public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {
436 writeTransferableGraph(processor, format, 1, source, target);
439 public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {
440 writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);
443 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {
444 writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);
447 public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
448 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
449 DataContainers.writeHeader(out, new DataContainer(format, version, metadata, null));
450 writeTransferableGraphVariant(processor, source, out, monitor);
454 public static void writeTransferableGraph(RequestProcessor processor, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
455 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
456 writeTransferableGraphVariant(processor, source, out, monitor);
460 public static void writeTransferableGraphVariant(RequestProcessor processor, TransferableGraphSource source, RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
461 Bindings.getSerializerUnchecked(Datatype.class).serialize(out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));
462 writeTransferableGraph(processor, source, out, monitor);
465 private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {
466 return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;
469 private static class CopyingInputStream extends InputStream {
471 public DataOutput out;
474 public int read() throws IOException {
475 int value = in.readUnsignedByte();
481 private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {
483 long bufferLength = buffer.length;
484 while (read < bytesToCopy) {
485 int l = (int) Math.min(bufferLength, bytesToCopy-read);
486 in.readFully(buffer, 0, l);
487 out.write(buffer, 0, l);
493 private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;
495 private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
496 long start = System.nanoTime();
498 final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
499 final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);
500 final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);
502 int resourceCount = source.getResourceCount();
503 //System.err.println("resourceCount: " + resourceCount);
504 out.writeInt(resourceCount);
505 extensionSerializer.serialize(out, new Extensions(source.getExtensions()));
507 // System.err.println("resource count: " + source.getResourceCount());
508 // System.err.println("identity count: " + source.getIdentityCount());
510 byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];
512 processor.syncRequest(new ReadRequest() {
514 public void run(ReadGraph graph) throws DatabaseException {
516 if (monitor.isCanceled())
517 throw new CancelTransactionException();
519 int identityCount = source.getIdentityCount();
520 TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);
521 out.writeInt(identityCount);
522 //System.err.println("identities: " + identityCount);
523 source.forIdentities(graph, value -> {
524 //System.err.println("id: " + value);
525 identitySerializer.serialize(out, value);
526 identityProgress.worked(1);
529 if (monitor.isCanceled())
530 throw new CancelTransactionException();
532 long statementCountPos = out.position();
533 int originalStatementCount = source.getStatementCount();
534 TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);
535 out.writeInt(originalStatementCount);
536 //System.err.println("original statementCount: " + originalStatementCount);
537 int[] statementCounter = { 0 };
538 source.forStatements(graph, r -> {
539 for (int i = 0; i < 4; ++i)
541 statementCounter[0]++;
542 //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);
543 statementProgress.worked(1);
545 //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");
547 // Rewrite statement count after knowing exactly how many
548 // statements were written. It is possible that some
549 // statements get filtered out at this stage and the
550 // original statement count does not reflect that.
551 long afterStatementsPos = out.position();
552 out.position(statementCountPos);
553 out.writeInt(statementCounter[0]*4);
554 out.position(afterStatementsPos);
556 if (monitor.isCanceled())
557 throw new CancelTransactionException();
559 int valueCount = source.getValueCount();
560 TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);
561 out.writeInt(valueCount);
562 // System.err.println("valueCount: " + valueCount);
563 CopyingInputStream cis = new CopyingInputStream();
565 source.forValues2(graph, new TransferableGraphSourceValueProcedure() {
566 TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();
569 public void rawCopy(int resource, int length, DataInput input) throws Exception {
570 out.writeInt(resource);
571 long copied = copy(buffer, input, out, length);
572 assert copied == length;
573 //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");
574 valueProgress.worked(1);
578 public void execute(int resource, Datatype type, DataInput input) throws Exception {
579 out.writeInt(resource);
581 datatypeSerializer.serialize(out, identities, type);
582 Binding binding = Bindings.getBinding(type);
583 Serializer serializer = Bindings.getSerializer(binding);
585 serializer.skip(cis);
587 valueProgress.worked(1);
591 } catch (DatabaseException e) {
593 } catch (Exception e) {
594 throw new DatabaseException(e);
599 long end = System.nanoTime();
600 LOGGER.info("Wrote transferable graph in {} seconds.", 1e-9*(end-start));
603 public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {
607 ArrayList<Identity> identities = new ArrayList<>(source.getIdentityCount());
608 source.forIdentities(graph, i -> identities.add(i));
609 TIntArrayList statements = new TIntArrayList(source.getStatementCount());
610 source.forStatements(graph, r -> statements.addAll(r));
611 ArrayList<Value> values = new ArrayList<>(source.getValueCount());
612 source.forValues(graph, v -> values.add(v));
614 return new TransferableGraph1(source.getResourceCount(),
615 identities.toArray(new Identity[identities.size()]),
616 statements.toArray(),
617 values.toArray(new Value[values.size()]),
618 source.getExtensions());
620 } catch (Exception e) {
622 throw new DatabaseException(e);