X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.graph.db%2Fsrc%2Forg%2Fsimantics%2Fgraph%2Fdb%2FTransferableGraphs.java;h=0841e1455569ed08e0f184c750c5125e9fe55aa3;hp=c2ca6ac7305d26e824a928ffa6b8ee9c6e53b372;hb=e209f1fed1443bfdfe6ba721e63df496da3c36fe;hpb=3efd7d44a55ae1fff284a1826e7c9afe27e0815e diff --git a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java index c2ca6ac73..0841e1455 100644 --- a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java +++ b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java @@ -1,611 +1,624 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.graph.db; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.TreeMap; -import java.util.UUID; - -import org.simantics.databoard.Accessors; -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Datatypes; -import org.simantics.databoard.accessor.error.AccessorConstructionException; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.accessor.file.FileVariantAccessor; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.error.DatatypeConstructionException; -import org.simantics.databoard.binding.mutable.Variant; -import org.simantics.databoard.container.DataContainer; -import org.simantics.databoard.container.DataContainers; -import org.simantics.databoard.serialization.SerializationException; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.binary.BinaryFile; -import org.simantics.databoard.util.binary.RandomAccessBinary; -import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteGraph; -import org.simantics.db.WriteOnlyGraph; -import org.simantics.db.common.CommentMetadata; -import org.simantics.db.common.request.ReadRequest; -import org.simantics.db.common.request.WriteOnlyRequest; -import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.db.service.VirtualGraphSupport; -import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; -import org.simantics.graph.diff.TransferableGraphDelta1; -import org.simantics.graph.representation.Extensions; -import org.simantics.graph.representation.External; -import org.simantics.graph.representation.Identity; -import org.simantics.graph.representation.TransferableGraph1; -import org.simantics.graph.representation.Value; -import org.simantics.utils.datastructures.BinaryFunction; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TObjectIntHashMap; - -public class TransferableGraphs { - - public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { - if (tg instanceof TransferableGraph1) - { - return importGraph1(session, (TransferableGraph1) tg, advisor); - } - throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); - } - - public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { - if (tg instanceof TransferableGraph1) - { - return importGraph1(g, (TransferableGraph1) tg, advisor); - } - throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); - } - - public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException { - if (tg instanceof TransferableGraph1) - { - return importGraph1(session, (TransferableGraph1) tg); - } - throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); - } - - public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException { - if (tg instanceof TransferableGraph1) - { - return importGraph1(g, (TransferableGraph1) tg); - } - throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); - } - - public static Collection collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor()); - processor.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - HashSet result = new HashSet(); - for(Identity id : tg.identities) { - if(id.definition instanceof External) { - result.add(process.resources[id.resource]); - } - } - return result; - } - - /** - * Imports transferable graph version 1 to the database. Root advisor is used - * to give identities to roots of the transferable graphs. It may be null, - * in which case new resources are created for all roots but the root library. - * - * @param session - * @param tg - * @param advisor root advisor or null - * @throws DatabaseException - */ - public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException { - - final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); - - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteOnlyRequest() { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - advisor.beforeWrite(graph, process); - process.write(graph); - advisor.afterWrite(graph, process); - } - }); - return process.getResourceIds( - session.getService(SerialisationSupport.class)); - } - - public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction callback) throws DatabaseException, TransferableGraphException { - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteOnlyRequest() { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - process.write(graph); - if(callback != null) - callback.call(graph, process); - } - }); - } - - public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception { - importGraph1(session, tg, advisor, null); - } - - public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { - importGraph1(session, null, tg, advisor, monitor); - } - - public static void importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { - - final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); - - final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - try { - process.prepare(graph); - } catch (DatabaseException e) { - throw e; - } catch (Exception e) { - throw new DatabaseException(e); - } - } - }); - session.syncRequest(new WriteOnlyRequest(vg) { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - try { - advisor.beforeWrite(graph, process); - process.write(graph); - advisor.afterWrite(graph, process); - } catch (Exception e) { - throw new DatabaseException(e); - } - } - }); - } - - public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { - final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor, monitor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteOnlyRequest() { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - advisor.beforeWrite(graph, process); - process.write2(graph); - advisor.afterWrite(graph, process); - CommentMetadata comments = graph.getMetadata(CommentMetadata.class); - comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); - graph.addMetadata(comments); - } - }); - } - - public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction callback) throws DatabaseException, TransferableGraphException { - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteRequest() { - @Override - public void perform(WriteGraph graph) throws DatabaseException { - process.write2(graph); - CommentMetadata comments = graph.getMetadata(CommentMetadata.class); - comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); - graph.addMetadata(comments); - if(callback != null) - callback.call(graph, process); - } - }); - } - - public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { - final TransferableGraphImportProcess process = - new TransferableGraphImportProcess(tg, - null); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteOnlyRequest() { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - process.write(graph); - } - }); - return process.getResourceIds( - session.getService(SerialisationSupport.class)); - } - - public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { - final TransferableGraphImportProcess process = - new TransferableGraphImportProcess(tg, - null); - process.prepare(graph); - process.write2(graph); - return process.getResourceIds( - graph.getSession().getService(SerialisationSupport.class)); - } - - /** - * Import transferable graph version 1 to the database. Root advisor is used - * to give identities to roots of the transferable graphs. It may be null, - * in which case new resources are created for all roots but the root library. - * - * @param session - * @param tg - * @param advisor root advisor or null - * @throws DatabaseException - */ - public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { - return importGraph1(graph, tg, advisor, null); - } - - public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { - TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor, monitor); - process.prepare(graph); - if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process); - process.write2(graph); - if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process); - return process.getResourceIds( - graph.getSession().getService(SerialisationSupport.class)); - } - - public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { - SerialisationSupport serializer = - graph.getSession().getService(SerialisationSupport.class); - - TGToGraphMap aMap = new TGToGraphMap(delta.a); - aMap.addOldResources(serializer, oldResources); - aMap.deny(graph); - - TGToGraphMap bMap = new TGToGraphMap(delta.b); - bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); - bMap.prepare(graph); - bMap.claim(graph); - - return bMap.getResources(serializer); - } - - public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { - - SerialisationSupport serializer = - graph.getSession().getService(SerialisationSupport.class); - - TGToGraphMap aMap = new TGToGraphMap(delta.a); - aMap.addOldResources(serializer, oldResources); - if(aMap.checkDeny(graph)) return true; - - TGToGraphMap bMap = new TGToGraphMap(delta.b); - bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); - bMap.prepare(graph); - return bMap.checkClaim(graph); - - } - - public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph, - ImportAdvisor advisor) throws TransferableGraphException { - // TODO HANNU IMPLEMENTS - throw new UnsupportedOperationException(); - } - - public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor); - session.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - process.prepare(graph); - } - }); - session.syncRequest(new WriteOnlyRequest(vg) { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - // Needed because process#write does not support virtual WriteOnlyGraph - if (vg != null) - process.write2(graph); - else - process.write(graph); - } - - }); - return process.getResourceIds(session.getService(SerialisationSupport.class)); - } - - public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { - final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, - advisor == null ? new ImportAdvisor() : advisor); - process.prepare(graph); - process.write2(graph); - return process.getResourceIds(graph.getService(SerialisationSupport.class)); - } - - public static TransferableGraph1 readGraph(File file) throws TransferableGraphException { - FileVariantAccessor va = null; - try { - va = Accessors.openAccessor(file); - Datatype type = va.getContentType(); - if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) - return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class)); - else - throw new SerializationException("Unknown transferable graph data type."); - } catch (AccessorException e) { - throw new TransferableGraphException(e); - } catch (BindingConstructionException e) { - throw new TransferableGraphException(e); - } catch (SerializationException e) { - throw new TransferableGraphException(e); - } catch (AccessorConstructionException e) { - throw new TransferableGraphException(e); - } catch (DatatypeConstructionException e) { - throw new TransferableGraphException(e); - } finally { - if(va != null) { - try { - va.close(); - } catch (AccessorException e) { - } - } - } - } - - public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException { - try { - importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor()); - } catch (DatabaseException e) { - throw new TransferableGraphException(e); - } - } - - public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException { - VirtualGraphSupport support = session.getService(VirtualGraphSupport.class); - VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString()); - importVirtualGraph(session, vg, file); - return vg; - } - - public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception { - writeTransferableGraph(processor, format, 1, source, target); - } - - public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception { - writeTransferableGraph(processor, format, version, new TreeMap(), source, target); - } - - public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target) throws Exception { - writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR); - } - - public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception { - final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class); - try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) { - DataContainer container = new DataContainer(format, version, metadata, null); - DataContainers.writeHeader(out, container); - datatypeSerializer.serialize((DataOutput) out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class)); - writeTransferableGraph(processor, source, out, monitor); - } - } - - private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) { - return mon == null ? TGStatusMonitor.NULL_MONITOR : mon; - } - - private static class CopyingInputStream extends InputStream { - public DataInput in; - public DataOutput out; - - @Override - public int read() throws IOException { - int value = in.readUnsignedByte(); - out.write(value); - return value; - } - } - - private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException { - int read = 0; - long bufferLength = buffer.length; - while (read < bytesToCopy) { - int l = (int) Math.min(bufferLength, bytesToCopy-read); - in.readFully(buffer, 0, l); - out.write(buffer, 0, l); - read += l; - } - return read; - } - - private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024; - - private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception { - long start = System.nanoTime(); - - final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class); - final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class); - final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class); - - int resourceCount = source.getResourceCount(); - //System.err.println("resourceCount: " + resourceCount); - out.writeInt(resourceCount); - extensionSerializer.serialize(out, new Extensions(source.getExtensions())); - -// System.err.println("resource count: " + source.getResourceCount()); -// System.err.println("identity count: " + source.getIdentityCount()); - - byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE]; - - processor.syncRequest(new ReadRequest() { - @Override - public void run(ReadGraph graph) throws DatabaseException { - try { - if (monitor.isCanceled()) - throw new CancelTransactionException(); - - int identityCount = source.getIdentityCount(); - TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount); - out.writeInt(identityCount); - //System.err.println("identities: " + identityCount); - source.forIdentities(graph, value -> { - //System.err.println("id: " + value); - identitySerializer.serialize(out, value); - identityProgress.worked(1); - }); - - if (monitor.isCanceled()) - throw new CancelTransactionException(); - - long statementCountPos = out.position(); - int originalStatementCount = source.getStatementCount(); - TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount); - out.writeInt(originalStatementCount); - //System.err.println("original statementCount: " + originalStatementCount); - int[] statementCounter = { 0 }; - source.forStatements(graph, r -> { - for (int i = 0; i < 4; ++i) - out.writeInt(r[i]); - statementCounter[0]++; - //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]); - statementProgress.worked(1); - }); - //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers"); - - // Rewrite statement count after knowing exactly how many - // statements were written. It is possible that some - // statements get filtered out at this stage and the - // original statement count does not reflect that. - long afterStatementsPos = out.position(); - out.position(statementCountPos); - out.writeInt(statementCounter[0]*4); - out.position(afterStatementsPos); - - if (monitor.isCanceled()) - throw new CancelTransactionException(); - - int valueCount = source.getValueCount(); - TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount); - out.writeInt(valueCount); -// System.err.println("valueCount: " + valueCount); - CopyingInputStream cis = new CopyingInputStream(); - cis.out = out; - source.forValues2(graph, new TransferableGraphSourceValueProcedure() { - TObjectIntHashMap identities = new TObjectIntHashMap<>(); - - @Override - public void rawCopy(int resource, int length, DataInput input) throws Exception { - out.writeInt(resource); - long copied = copy(buffer, input, out, length); - assert copied == length; - //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes"); - valueProgress.worked(1); - } - - @Override - public void execute(int resource, Datatype type, DataInput input) throws Exception { - out.writeInt(resource); - identities.clear(); - datatypeSerializer.serialize(out, identities, type); - Binding binding = Bindings.getBinding(type); - Serializer serializer = Bindings.getSerializer(binding); - cis.in = input; - serializer.skip(cis); - cis.in = null; - valueProgress.worked(1); - } - }); - - } catch (DatabaseException e) { - throw e; - } catch (Exception e) { - throw new DatabaseException(e); - } - } - }); - - long end = System.nanoTime(); - System.err.println("Wrote transferable graph in " + 1e-9*(end-start) + " seconds."); - } - - public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException { - - final TIntArrayList statements = new TIntArrayList(); - final ArrayList values = new ArrayList<>(); - final ArrayList identities = new ArrayList<>(); - - try { - - source.forStatements(graph, r -> statements.addAll(r)); - source.forValues(graph, v -> values.add(v)); - source.forIdentities(graph, i -> identities.add(i)); - - return new TransferableGraph1(source.getResourceCount(), - identities.toArray(new Identity[identities.size()]), - statements.toArray(), - values.toArray(new Value[values.size()]), - source.getExtensions()); - - } catch (Exception e) { - - throw new DatabaseException(e); - - } - - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.graph.db; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.TreeMap; +import java.util.UUID; +import java.util.function.BiFunction; + +import org.simantics.databoard.Accessors; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Datatypes; +import org.simantics.databoard.accessor.error.AccessorConstructionException; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.accessor.file.FileVariantAccessor; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.error.BindingConstructionException; +import org.simantics.databoard.binding.error.DatatypeConstructionException; +import org.simantics.databoard.binding.mutable.Variant; +import org.simantics.databoard.container.DataContainer; +import org.simantics.databoard.container.DataContainers; +import org.simantics.databoard.serialization.SerializationException; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.util.binary.BinaryFile; +import org.simantics.databoard.util.binary.RandomAccessBinary; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteGraph; +import org.simantics.db.WriteOnlyGraph; +import org.simantics.db.common.CommentMetadata; +import org.simantics.db.common.request.ReadRequest; +import org.simantics.db.common.request.WriteOnlyRequest; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.VirtualGraphSupport; +import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; +import org.simantics.graph.diff.TransferableGraphDelta1; +import org.simantics.graph.representation.Extensions; +import org.simantics.graph.representation.External; +import org.simantics.graph.representation.Identity; +import org.simantics.graph.representation.TransferableGraph1; +import org.simantics.graph.representation.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.map.hash.TObjectIntHashMap; + +public class TransferableGraphs { + final static Logger LOGGER = LoggerFactory.getLogger(TransferableGraphs.class); + + public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { + if (tg instanceof TransferableGraph1) + { + return importGraph1(session, (TransferableGraph1) tg, advisor); + } + throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); + } + + public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { + if (tg instanceof TransferableGraph1) + { + return importGraph1(g, (TransferableGraph1) tg, advisor); + } + throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); + } + + public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException { + if (tg instanceof TransferableGraph1) + { + return importGraph1(session, (TransferableGraph1) tg); + } + throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); + } + + public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException { + if (tg instanceof TransferableGraph1) + { + return importGraph1(g, (TransferableGraph1) tg); + } + throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); + } + + public static Collection collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor()); + processor.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + HashSet result = new HashSet(); + for(Identity id : tg.identities) { + if(id.definition instanceof External) { + result.add(process.resources[id.resource]); + } + } + return result; + } + + /** + * Imports transferable graph version 1 to the database. Root advisor is used + * to give identities to roots of the transferable graphs. It may be null, + * in which case new resources are created for all roots but the root library. + * + * @param session + * @param tg + * @param advisor root advisor or null + * @throws DatabaseException + */ + public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException { + + final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); + + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteOnlyRequest() { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + advisor.beforeWrite(graph, process); + process.write(graph); + advisor.afterWrite(graph, process); + } + }); + return process.getResourceIds( + session.getService(SerialisationSupport.class)); + } + + public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction callback) throws DatabaseException, TransferableGraphException { + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteOnlyRequest() { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + process.write(graph); + if(callback != null) + callback.apply(graph, process); + } + }); + } + + public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception { + return importGraph1(session, tg, advisor, null); + } + + public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { + return importGraph1(session, null, tg, advisor, monitor); + } + + public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { + + final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); + + final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + try { + process.prepare(graph); + } catch (DatabaseException e) { + throw e; + } catch (Exception e) { + throw new DatabaseException(e); + } + } + }); + session.syncRequest(new WriteOnlyRequest(vg) { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + try { + advisor.beforeWrite(graph, process); + process.write(graph); + advisor.afterWrite(graph, process); + } catch (Exception e) { + throw new DatabaseException(e); + } + } + }); + + return new ImportResult(process.missingExternals); + } + + public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { + final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor, monitor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteOnlyRequest() { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + advisor.beforeWrite(graph, process); + process.write2(graph); + advisor.afterWrite(graph, process); + CommentMetadata comments = graph.getMetadata(CommentMetadata.class); + comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); + graph.addMetadata(comments); + } + }); + } + + public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction callback) throws DatabaseException, TransferableGraphException { + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteRequest() { + @Override + public void perform(WriteGraph graph) throws DatabaseException { + process.write2(graph); + CommentMetadata comments = graph.getMetadata(CommentMetadata.class); + comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); + graph.addMetadata(comments); + if(callback != null) + callback.apply(graph, process); + } + }); + } + + public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { + final TransferableGraphImportProcess process = + new TransferableGraphImportProcess(tg, + null); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteOnlyRequest() { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + process.write(graph); + } + }); + return process.getResourceIds( + session.getService(SerialisationSupport.class)); + } + + public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { + final TransferableGraphImportProcess process = + new TransferableGraphImportProcess(tg, + null); + process.prepare(graph); + process.write2(graph); + return process.getResourceIds( + graph.getSession().getService(SerialisationSupport.class)); + } + + /** + * Import transferable graph version 1 to the database. Root advisor is used + * to give identities to roots of the transferable graphs. It may be null, + * in which case new resources are created for all roots but the root library. + * + * @param session + * @param tg + * @param advisor root advisor or null + * @throws DatabaseException + */ + public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { + return importGraph1(graph, tg, advisor, null); + } + + public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { + TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor, monitor); + process.prepare(graph); + if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process); + process.write2(graph); + if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process); + return process.getResourceIds( + graph.getSession().getService(SerialisationSupport.class)); + } + + public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { + SerialisationSupport serializer = + graph.getSession().getService(SerialisationSupport.class); + + TGToGraphMap aMap = new TGToGraphMap(delta.a); + aMap.addOldResources(serializer, oldResources); + aMap.deny(graph); + + TGToGraphMap bMap = new TGToGraphMap(delta.b); + bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); + bMap.prepare(graph); + bMap.claim(graph); + + return bMap.getResources(serializer); + } + + public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { + + SerialisationSupport serializer = + graph.getSession().getService(SerialisationSupport.class); + + TGToGraphMap aMap = new TGToGraphMap(delta.a); + aMap.addOldResources(serializer, oldResources); + if(aMap.checkDeny(graph)) return true; + + TGToGraphMap bMap = new TGToGraphMap(delta.b); + bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); + bMap.prepare(graph); + return bMap.checkClaim(graph); + + } + + public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph, + ImportAdvisor advisor) throws TransferableGraphException { + // TODO HANNU IMPLEMENTS + throw new UnsupportedOperationException(); + } + + public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor); + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + process.prepare(graph); + } + }); + session.syncRequest(new WriteOnlyRequest(vg) { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + // Needed because process#write does not support virtual WriteOnlyGraph + if (vg != null) + process.write2(graph); + else + process.write(graph); + } + + }); + return process.getResourceIds(session.getService(SerialisationSupport.class)); + } + + public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { + final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, + advisor == null ? new ImportAdvisor() : advisor); + process.prepare(graph); + process.write2(graph); + return process.getResourceIds(graph.getService(SerialisationSupport.class)); + } + + public static TransferableGraph1 readGraph(File file) throws TransferableGraphException { + FileVariantAccessor va = null; + try { + va = Accessors.openAccessor(file); + Datatype type = va.getContentType(); + if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) + return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class)); + else + throw new SerializationException("Unknown transferable graph data type."); + } catch (AccessorException e) { + throw new TransferableGraphException(e); + } catch (BindingConstructionException e) { + throw new TransferableGraphException(e); + } catch (SerializationException e) { + throw new TransferableGraphException(e); + } catch (AccessorConstructionException e) { + throw new TransferableGraphException(e); + } catch (DatatypeConstructionException e) { + throw new TransferableGraphException(e); + } finally { + if(va != null) { + try { + va.close(); + } catch (AccessorException e) { + } + } + } + } + + public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException { + try { + importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor()); + } catch (DatabaseException e) { + throw new TransferableGraphException(e); + } + } + + public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException { + VirtualGraphSupport support = session.getService(VirtualGraphSupport.class); + VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString()); + importVirtualGraph(session, vg, file); + return vg; + } + + public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception { + writeTransferableGraph(processor, format, 1, source, target); + } + + public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception { + writeTransferableGraph(processor, format, version, new TreeMap(), source, target); + } + + public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target) throws Exception { + writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR); + } + + public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception { + try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) { + DataContainers.writeHeader(out, new DataContainer(format, version, metadata, null)); + writeTransferableGraphVariant(processor, source, out, monitor); + } + } + + public static void writeTransferableGraph(RequestProcessor processor, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception { + try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) { + writeTransferableGraphVariant(processor, source, out, monitor); + } + } + + public static void writeTransferableGraphVariant(RequestProcessor processor, TransferableGraphSource source, RandomAccessBinary out, TGStatusMonitor monitor) throws Exception { + Bindings.getSerializerUnchecked(Datatype.class).serialize(out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class)); + writeTransferableGraph(processor, source, out, monitor); + } + + private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) { + return mon == null ? TGStatusMonitor.NULL_MONITOR : mon; + } + + private static class CopyingInputStream extends InputStream { + public DataInput in; + public DataOutput out; + + @Override + public int read() throws IOException { + int value = in.readUnsignedByte(); + out.write(value); + return value; + } + } + + private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException { + int read = 0; + long bufferLength = buffer.length; + while (read < bytesToCopy) { + int l = (int) Math.min(bufferLength, bytesToCopy-read); + in.readFully(buffer, 0, l); + out.write(buffer, 0, l); + read += l; + } + return read; + } + + private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024; + + private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception { + long start = System.nanoTime(); + + final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class); + final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class); + final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class); + + int resourceCount = source.getResourceCount(); + //System.err.println("resourceCount: " + resourceCount); + out.writeInt(resourceCount); + extensionSerializer.serialize(out, new Extensions(source.getExtensions())); + +// System.err.println("resource count: " + source.getResourceCount()); +// System.err.println("identity count: " + source.getIdentityCount()); + + byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE]; + + processor.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + try { + if (monitor.isCanceled()) + throw new CancelTransactionException(); + + int identityCount = source.getIdentityCount(); + TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount); + out.writeInt(identityCount); + //System.err.println("identities: " + identityCount); + source.forIdentities(graph, value -> { + //System.err.println("id: " + value); + identitySerializer.serialize(out, value); + identityProgress.worked(1); + }); + + if (monitor.isCanceled()) + throw new CancelTransactionException(); + + long statementCountPos = out.position(); + int originalStatementCount = source.getStatementCount(); + TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount); + out.writeInt(originalStatementCount); + //System.err.println("original statementCount: " + originalStatementCount); + int[] statementCounter = { 0 }; + source.forStatements(graph, r -> { + for (int i = 0; i < 4; ++i) + out.writeInt(r[i]); + statementCounter[0]++; + //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]); + statementProgress.worked(1); + }); + //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers"); + + // Rewrite statement count after knowing exactly how many + // statements were written. It is possible that some + // statements get filtered out at this stage and the + // original statement count does not reflect that. + long afterStatementsPos = out.position(); + out.position(statementCountPos); + out.writeInt(statementCounter[0]*4); + out.position(afterStatementsPos); + + if (monitor.isCanceled()) + throw new CancelTransactionException(); + + int valueCount = source.getValueCount(); + TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount); + out.writeInt(valueCount); +// System.err.println("valueCount: " + valueCount); + CopyingInputStream cis = new CopyingInputStream(); + cis.out = out; + source.forValues2(graph, new TransferableGraphSourceValueProcedure() { + TObjectIntHashMap identities = new TObjectIntHashMap<>(); + + @Override + public void rawCopy(int resource, int length, DataInput input) throws Exception { + out.writeInt(resource); + long copied = copy(buffer, input, out, length); + assert copied == length; + //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes"); + valueProgress.worked(1); + } + + @Override + public void execute(int resource, Datatype type, DataInput input) throws Exception { + out.writeInt(resource); + identities.clear(); + datatypeSerializer.serialize(out, identities, type); + Binding binding = Bindings.getBinding(type); + Serializer serializer = Bindings.getSerializer(binding); + cis.in = input; + serializer.skip(cis); + cis.in = null; + valueProgress.worked(1); + } + }); + + } catch (DatabaseException e) { + throw e; + } catch (Exception e) { + throw new DatabaseException(e); + } + } + }); + + long end = System.nanoTime(); + LOGGER.info("Wrote transferable graph in {} seconds.", 1e-9*(end-start)); + } + + public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException { + + final TIntArrayList statements = new TIntArrayList(); + final ArrayList values = new ArrayList<>(); + final ArrayList identities = new ArrayList<>(); + + try { + + source.forStatements(graph, r -> statements.addAll(r)); + source.forValues(graph, v -> values.add(v)); + source.forIdentities(graph, i -> identities.add(i)); + + return new TransferableGraph1(source.getResourceCount(), + identities.toArray(new Identity[identities.size()]), + statements.toArray(), + values.toArray(new Value[values.size()]), + source.getExtensions()); + + } catch (Exception e) { + + throw new DatabaseException(e); + + } + + } + +}