/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.graph.db; import java.io.BufferedOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.TreeMap; import java.util.UUID; import org.simantics.databoard.Accessors; import org.simantics.databoard.Bindings; import org.simantics.databoard.Datatypes; import org.simantics.databoard.accessor.error.AccessorConstructionException; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.accessor.file.FileVariantAccessor; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingConstructionException; import org.simantics.databoard.binding.error.DatatypeConstructionException; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.container.DataContainer; import org.simantics.databoard.container.DataContainers; import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.db.ReadGraph; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.CommentMetadata; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.common.request.WriteOnlyRequest; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.SerialisationSupport; import org.simantics.db.service.VirtualGraphSupport; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; import org.simantics.graph.diff.TransferableGraphDelta1; import org.simantics.graph.representation.Extensions; import org.simantics.graph.representation.External; import org.simantics.graph.representation.Identity; import org.simantics.graph.representation.TransferableGraph1; import org.simantics.graph.representation.Value; import org.simantics.utils.datastructures.BinaryFunction; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TObjectIntHashMap; public class TransferableGraphs { public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { if (tg instanceof TransferableGraph1) { return importGraph1(session, (TransferableGraph1) tg, advisor); } throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); } public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException { if (tg instanceof TransferableGraph1) { return importGraph1(g, (TransferableGraph1) tg, advisor); } throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); } public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException { if (tg instanceof TransferableGraph1) { return importGraph1(session, (TransferableGraph1) tg); } throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); } public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException { if (tg instanceof TransferableGraph1) { return importGraph1(g, (TransferableGraph1) tg); } throw new TransferableGraphException("Cannot import "+tg.getClass().getName()); } public static Collection collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor()); processor.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); HashSet result = new HashSet(); for(Identity id : tg.identities) { if(id.definition instanceof External) { result.add(process.resources[id.resource]); } } return result; } /** * Imports transferable graph version 1 to the database. Root advisor is used * to give identities to roots of the transferable graphs. It may be null, * in which case new resources are created for all roots but the root library. * * @param session * @param tg * @param advisor root advisor or null * @throws DatabaseException */ public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException { final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteOnlyRequest() { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { advisor.beforeWrite(graph, process); process.write(graph); advisor.afterWrite(graph, process); } }); return process.getResourceIds( session.getService(SerialisationSupport.class)); } public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction callback) throws DatabaseException, TransferableGraphException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteOnlyRequest() { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { process.write(graph); if(callback != null) callback.call(graph, process); } }); } public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception { importGraph1(session, tg, advisor, null); } public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { importGraph1(session, null, tg, advisor, monitor); } public static void importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { try { process.prepare(graph); } catch (DatabaseException e) { throw e; } catch (Exception e) { throw new DatabaseException(e); } } }); session.syncRequest(new WriteOnlyRequest(vg) { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { try { advisor.beforeWrite(graph, process); process.write(graph); advisor.afterWrite(graph, process); } catch (Exception e) { throw new DatabaseException(e); } } }); } public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException { final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor, monitor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteOnlyRequest() { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { advisor.beforeWrite(graph, process); process.write2(graph); advisor.afterWrite(graph, process); CommentMetadata comments = graph.getMetadata(CommentMetadata.class); comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); graph.addMetadata(comments); } }); } public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction callback) throws DatabaseException, TransferableGraphException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { process.write2(graph); CommentMetadata comments = graph.getMetadata(CommentMetadata.class); comments.add("Imported transferable graph with " + tg.resourceCount + " resources"); graph.addMetadata(comments); if(callback != null) callback.call(graph, process); } }); } public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, null); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteOnlyRequest() { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { process.write(graph); } }); return process.getResourceIds( session.getService(SerialisationSupport.class)); } public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, null); process.prepare(graph); process.write2(graph); return process.getResourceIds( graph.getSession().getService(SerialisationSupport.class)); } /** * Import transferable graph version 1 to the database. Root advisor is used * to give identities to roots of the transferable graphs. It may be null, * in which case new resources are created for all roots but the root library. * * @param session * @param tg * @param advisor root advisor or null * @throws DatabaseException */ public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { return importGraph1(graph, tg, advisor, null); } public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException { TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor, monitor); process.prepare(graph); if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process); process.write2(graph); if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process); return process.getResourceIds( graph.getSession().getService(SerialisationSupport.class)); } public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { SerialisationSupport serializer = graph.getSession().getService(SerialisationSupport.class); TGToGraphMap aMap = new TGToGraphMap(delta.a); aMap.addOldResources(serializer, oldResources); aMap.deny(graph); TGToGraphMap bMap = new TGToGraphMap(delta.b); bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); bMap.prepare(graph); bMap.claim(graph); return bMap.getResources(serializer); } public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException { SerialisationSupport serializer = graph.getSession().getService(SerialisationSupport.class); TGToGraphMap aMap = new TGToGraphMap(delta.a); aMap.addOldResources(serializer, oldResources); if(aMap.checkDeny(graph)) return true; TGToGraphMap bMap = new TGToGraphMap(delta.b); bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources()); bMap.prepare(graph); return bMap.checkClaim(graph); } public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph, ImportAdvisor advisor) throws TransferableGraphException { // TODO HANNU IMPLEMENTS throw new UnsupportedOperationException(); } public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { process.prepare(graph); } }); session.syncRequest(new WriteOnlyRequest(vg) { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { // Needed because process#write does not support virtual WriteOnlyGraph if (vg != null) process.write2(graph); else process.write(graph); } }); return process.getResourceIds(session.getService(SerialisationSupport.class)); } public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException { final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, advisor == null ? new ImportAdvisor() : advisor); process.prepare(graph); process.write2(graph); return process.getResourceIds(graph.getService(SerialisationSupport.class)); } public static TransferableGraph1 readGraph(File file) throws TransferableGraphException { FileVariantAccessor va = null; try { va = Accessors.openAccessor(file); Datatype type = va.getContentType(); if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class)); else throw new SerializationException("Unknown transferable graph data type."); } catch (AccessorException e) { throw new TransferableGraphException(e); } catch (BindingConstructionException e) { throw new TransferableGraphException(e); } catch (SerializationException e) { throw new TransferableGraphException(e); } catch (AccessorConstructionException e) { throw new TransferableGraphException(e); } catch (DatatypeConstructionException e) { throw new TransferableGraphException(e); } finally { if(va != null) { try { va.close(); } catch (AccessorException e) { } } } } public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException { try { importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor()); } catch (DatabaseException e) { throw new TransferableGraphException(e); } } public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException { VirtualGraphSupport support = session.getService(VirtualGraphSupport.class); VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString()); importVirtualGraph(session, vg, file); return vg; } public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception { writeTransferableGraph(processor, format, 1, source, target); } public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception { writeTransferableGraph(processor, format, version, new TreeMap(), source, target); } public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target) throws Exception { writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR); } public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception { final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class); try (DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(target), 128*1024))) { DataContainer container = new DataContainer(format, version, metadata, null); DataContainers.writeHeader(out, container); datatypeSerializer.serialize((DataOutput) out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class)); writeTransferableGraph(processor, source, out, monitor); } } private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) { return mon == null ? TGStatusMonitor.NULL_MONITOR : mon; } private static class CopyingInputStream extends InputStream { public DataInput in; public DataOutput out; @Override public int read() throws IOException { int value = in.readUnsignedByte(); out.write(value); return value; } } private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException { int read = 0; long bufferLength = buffer.length; while (read < bytesToCopy) { int l = (int) Math.min(bufferLength, bytesToCopy-read); in.readFully(buffer, 0, l); out.write(buffer, 0, l); read += l; } return read; } private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024; private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final DataOutput out, TGStatusMonitor monitor) throws Exception { long start = System.nanoTime(); final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class); final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class); final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class); int totalCount = source.getIdentityCount() + source.getStatementCount()/4 + source.getValueCount(); TGStatusMonitor.Updater progress = new TGStatusMonitor.Updater(safeMonitor(monitor), totalCount); out.writeInt(source.getResourceCount()); extensionSerializer.serialize(out, new Extensions(source.getExtensions())); // System.err.println("resource count: " + source.getResourceCount()); // System.err.println("identity count: " + source.getIdentityCount()); byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE]; processor.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { try { if (monitor.isCanceled()) throw new CancelTransactionException(); out.writeInt(source.getIdentityCount()); source.forIdentities(graph, value -> { //System.err.println("id: " + value); identitySerializer.serialize(out, value); progress.worked(1); }); if (monitor.isCanceled()) throw new CancelTransactionException(); out.writeInt(source.getStatementCount()); //System.err.println("stms: " + source.getStatementCount()); //int[] counter = {0}; source.forStatements(graph, r -> { for (int i = 0; i < 4; ++i) out.writeInt(r[i]); //System.err.println("stm " + (counter[0]++) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]); progress.worked(1); }); if (monitor.isCanceled()) throw new CancelTransactionException(); out.writeInt(source.getValueCount()); //System.err.println("vals: " + source.getValueCount()); CopyingInputStream cis = new CopyingInputStream(); cis.out = out; source.forValues2(graph, new TransferableGraphSourceValueProcedure() { TObjectIntHashMap identities = new TObjectIntHashMap<>(); @Override public void rawCopy(int resource, int length, DataInput input) throws Exception { out.writeInt(resource); long copied = copy(buffer, input, out, length); assert copied == length; //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes"); progress.worked(1); } @Override public void execute(int resource, Datatype type, DataInput input) throws Exception { out.writeInt(resource); identities.clear(); datatypeSerializer.serialize(out, identities, type); Binding binding = Bindings.getBinding(type); Serializer serializer = Bindings.getSerializer(binding); cis.in = input; serializer.skip(cis); cis.in = null; progress.worked(1); } }); } catch (DatabaseException e) { throw e; } catch (Exception e) { throw new DatabaseException(e); } } }); long end = System.nanoTime(); System.err.println("Wrote transferable graph in " + 1e-9*(end-start) + " seconds."); } public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException { final TIntArrayList statements = new TIntArrayList(); final ArrayList values = new ArrayList<>(); final ArrayList identities = new ArrayList<>(); try { source.forStatements(graph, r -> statements.addAll(r)); source.forValues(graph, v -> values.add(v)); source.forIdentities(graph, i -> identities.add(i)); return new TransferableGraph1(source.getResourceCount(), identities.toArray(new Identity[identities.size()]), statements.toArray(), values.toArray(new Value[values.size()]), source.getExtensions()); } catch (Exception e) { throw new DatabaseException(e); } } }