--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.graph.db;\r
+\r
+import java.io.BufferedOutputStream;\r
+import java.io.DataInput;\r
+import java.io.DataOutput;\r
+import java.io.DataOutputStream;\r
+import java.io.File;\r
+import java.io.FileOutputStream;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.HashSet;\r
+import java.util.TreeMap;\r
+import java.util.UUID;\r
+\r
+import org.simantics.databoard.Accessors;\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.Datatypes;\r
+import org.simantics.databoard.accessor.error.AccessorConstructionException;\r
+import org.simantics.databoard.accessor.error.AccessorException;\r
+import org.simantics.databoard.accessor.file.FileVariantAccessor;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.error.BindingConstructionException;\r
+import org.simantics.databoard.binding.error.DatatypeConstructionException;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.container.DataContainer;\r
+import org.simantics.databoard.container.DataContainers;\r
+import org.simantics.databoard.serialization.SerializationException;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.RequestProcessor;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.WriteGraph;\r
+import org.simantics.db.WriteOnlyGraph;\r
+import org.simantics.db.common.CommentMetadata;\r
+import org.simantics.db.common.request.ReadRequest;\r
+import org.simantics.db.common.request.WriteOnlyRequest;\r
+import org.simantics.db.common.request.WriteRequest;\r
+import org.simantics.db.exception.CancelTransactionException;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.service.SerialisationSupport;\r
+import org.simantics.db.service.VirtualGraphSupport;\r
+import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;\r
+import org.simantics.graph.diff.TransferableGraphDelta1;\r
+import org.simantics.graph.representation.Extensions;\r
+import org.simantics.graph.representation.External;\r
+import org.simantics.graph.representation.Identity;\r
+import org.simantics.graph.representation.TransferableGraph1;\r
+import org.simantics.graph.representation.Value;\r
+import org.simantics.utils.datastructures.BinaryFunction;\r
+\r
+import gnu.trove.list.array.TIntArrayList;\r
+import gnu.trove.map.hash.TObjectIntHashMap;\r
+\r
+public class TransferableGraphs {\r
+\r
+ public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {\r
+ if (tg instanceof TransferableGraph1) \r
+ {\r
+ return importGraph1(session, (TransferableGraph1) tg, advisor);\r
+ }\r
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
+ }\r
+\r
+ public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {\r
+ if (tg instanceof TransferableGraph1) \r
+ {\r
+ return importGraph1(g, (TransferableGraph1) tg, advisor);\r
+ }\r
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
+ }\r
+ \r
+ public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {\r
+ if (tg instanceof TransferableGraph1) \r
+ {\r
+ return importGraph1(session, (TransferableGraph1) tg);\r
+ }\r
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
+ }\r
+\r
+ public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {\r
+ if (tg instanceof TransferableGraph1) \r
+ {\r
+ return importGraph1(g, (TransferableGraph1) tg);\r
+ }\r
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
+ }\r
+ \r
+ public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());\r
+ processor.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ });\r
+ HashSet<Resource> result = new HashSet<Resource>();\r
+ for(Identity id : tg.identities) {\r
+ if(id.definition instanceof External) {\r
+ result.add(process.resources[id.resource]);\r
+ }\r
+ }\r
+ return result;\r
+ }\r
+ \r
+ /**\r
+ * Imports transferable graph version 1 to the database. Root advisor is used\r
+ * to give identities to roots of the transferable graphs. It may be null,\r
+ * in which case new resources are created for all roots but the root library.\r
+ * \r
+ * @param session\r
+ * @param tg\r
+ * @param advisor root advisor or <code>null</code>\r
+ * @throws DatabaseException\r
+ */\r
+ public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {\r
+ \r
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
+ \r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ }); \r
+ session.syncRequest(new WriteOnlyRequest() {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ advisor.beforeWrite(graph, process);\r
+ process.write(graph);\r
+ advisor.afterWrite(graph, process);\r
+ }\r
+ });\r
+ return process.getResourceIds(\r
+ session.getService(SerialisationSupport.class));\r
+ }\r
+\r
+ public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteOnlyGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ });\r
+ session.syncRequest(new WriteOnlyRequest() {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ process.write(graph);\r
+ if(callback != null)\r
+ callback.call(graph, process);\r
+ }\r
+ });\r
+ }\r
+\r
+ public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {\r
+ importGraph1(session, tg, advisor, null);\r
+ }\r
+\r
+ public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {\r
+ importGraph1(session, null, tg, advisor, monitor);\r
+ }\r
+\r
+ public static void importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {\r
+ \r
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
+\r
+ final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ try {\r
+ process.prepare(graph);\r
+ } catch (DatabaseException e) {\r
+ throw e;\r
+ } catch (Exception e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+ });\r
+ session.syncRequest(new WriteOnlyRequest(vg) {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ try {\r
+ advisor.beforeWrite(graph, process);\r
+ process.write(graph);\r
+ advisor.afterWrite(graph, process);\r
+ } catch (Exception e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+ });\r
+ }\r
+\r
+ public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {\r
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor, monitor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ });\r
+ session.syncRequest(new WriteOnlyRequest() {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ advisor.beforeWrite(graph, process);\r
+ process.write2(graph);\r
+ advisor.afterWrite(graph, process);\r
+ CommentMetadata comments = graph.getMetadata(CommentMetadata.class);\r
+ comments.add("Imported transferable graph with " + tg.resourceCount + " resources");\r
+ graph.addMetadata(comments);\r
+ }\r
+ });\r
+ }\r
+\r
+ public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ });\r
+ session.syncRequest(new WriteRequest() {\r
+ @Override\r
+ public void perform(WriteGraph graph) throws DatabaseException {\r
+ process.write2(graph);\r
+ CommentMetadata comments = graph.getMetadata(CommentMetadata.class);\r
+ comments.add("Imported transferable graph with " + tg.resourceCount + " resources");\r
+ graph.addMetadata(comments);\r
+ if(callback != null)\r
+ callback.call(graph, process);\r
+ }\r
+ });\r
+ }\r
+ \r
+ public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
+ final TransferableGraphImportProcess process = \r
+ new TransferableGraphImportProcess(tg, \r
+ null);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ }); \r
+ session.syncRequest(new WriteOnlyRequest() {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ process.write(graph);\r
+ }\r
+ });\r
+ return process.getResourceIds(\r
+ session.getService(SerialisationSupport.class));\r
+ }\r
+ \r
+ public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
+ final TransferableGraphImportProcess process = \r
+ new TransferableGraphImportProcess(tg, \r
+ null);\r
+ process.prepare(graph);\r
+ process.write2(graph);\r
+ return process.getResourceIds(\r
+ graph.getSession().getService(SerialisationSupport.class));\r
+ }\r
+ \r
+ /**\r
+ * Import transferable graph version 1 to the database. Root advisor is used\r
+ * to give identities to roots of the transferable graphs. It may be null,\r
+ * in which case new resources are created for all roots but the root library.\r
+ * \r
+ * @param session\r
+ * @param tg\r
+ * @param advisor root advisor or <code>null</code>\r
+ * @throws DatabaseException\r
+ */\r
+ public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
+ return importGraph1(graph, tg, advisor, null);\r
+ }\r
+\r
+ public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {\r
+ TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor, monitor);\r
+ process.prepare(graph);\r
+ if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);\r
+ process.write2(graph);\r
+ if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);\r
+ return process.getResourceIds(\r
+ graph.getSession().getService(SerialisationSupport.class));\r
+ }\r
+\r
+ public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {\r
+ SerialisationSupport serializer = \r
+ graph.getSession().getService(SerialisationSupport.class);\r
+ \r
+ TGToGraphMap aMap = new TGToGraphMap(delta.a);\r
+ aMap.addOldResources(serializer, oldResources);\r
+ aMap.deny(graph);\r
+ \r
+ TGToGraphMap bMap = new TGToGraphMap(delta.b);\r
+ bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());\r
+ bMap.prepare(graph);\r
+ bMap.claim(graph);\r
+ \r
+ return bMap.getResources(serializer);\r
+ }\r
+ \r
+ public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {\r
+ \r
+ SerialisationSupport serializer = \r
+ graph.getSession().getService(SerialisationSupport.class);\r
+ \r
+ TGToGraphMap aMap = new TGToGraphMap(delta.a);\r
+ aMap.addOldResources(serializer, oldResources);\r
+ if(aMap.checkDeny(graph)) return true;\r
+ \r
+ TGToGraphMap bMap = new TGToGraphMap(delta.b);\r
+ bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());\r
+ bMap.prepare(graph);\r
+ return bMap.checkClaim(graph);\r
+ \r
+ }\r
+\r
+ public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,\r
+ ImportAdvisor advisor) throws TransferableGraphException {\r
+ // TODO HANNU IMPLEMENTS\r
+ throw new UnsupportedOperationException();\r
+ } \r
+ \r
+ public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor);\r
+ session.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ process.prepare(graph);\r
+ }\r
+ });\r
+ session.syncRequest(new WriteOnlyRequest(vg) {\r
+ @Override\r
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
+ // Needed because process#write does not support virtual WriteOnlyGraph\r
+ if (vg != null)\r
+ process.write2(graph);\r
+ else\r
+ process.write(graph);\r
+ }\r
+ \r
+ });\r
+ return process.getResourceIds(session.getService(SerialisationSupport.class));\r
+ } \r
+\r
+ public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
+ advisor == null ? new ImportAdvisor() : advisor);\r
+ process.prepare(graph);\r
+ process.write2(graph);\r
+ return process.getResourceIds(graph.getService(SerialisationSupport.class));\r
+ } \r
+ \r
+ public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {\r
+ FileVariantAccessor va = null;\r
+ try {\r
+ va = Accessors.openAccessor(file);\r
+ Datatype type = va.getContentType();\r
+ if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) \r
+ return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));\r
+ else\r
+ throw new SerializationException("Unknown transferable graph data type.");\r
+ } catch (AccessorException e) {\r
+ throw new TransferableGraphException(e);\r
+ } catch (BindingConstructionException e) {\r
+ throw new TransferableGraphException(e);\r
+ } catch (SerializationException e) {\r
+ throw new TransferableGraphException(e);\r
+ } catch (AccessorConstructionException e) {\r
+ throw new TransferableGraphException(e);\r
+ } catch (DatatypeConstructionException e) {\r
+ throw new TransferableGraphException(e);\r
+ } finally {\r
+ if(va != null) {\r
+ try {\r
+ va.close();\r
+ } catch (AccessorException e) {\r
+ }\r
+ }\r
+ } \r
+ }\r
+ \r
+ public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {\r
+ try {\r
+ importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());\r
+ } catch (DatabaseException e) {\r
+ throw new TransferableGraphException(e);\r
+ }\r
+ }\r
+ \r
+ public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {\r
+ VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);\r
+ VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());\r
+ importVirtualGraph(session, vg, file);\r
+ return vg;\r
+ }\r
+ \r
+ public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {\r
+ writeTransferableGraph(processor, format, 1, source, target);\r
+ }\r
+\r
+ public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {\r
+ writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);\r
+ }\r
+\r
+ public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {\r
+ writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);\r
+ }\r
+\r
+ public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {\r
+ final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);\r
+ try (DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(target), 128*1024))) {\r
+ DataContainer container = new DataContainer(format, version, metadata, null);\r
+ DataContainers.writeHeader(out, container);\r
+ datatypeSerializer.serialize((DataOutput) out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));\r
+ writeTransferableGraph(processor, source, out, monitor);\r
+ }\r
+ }\r
+\r
+ private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {\r
+ return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;\r
+ }\r
+\r
+ private static class CopyingInputStream extends InputStream {\r
+ public DataInput in;\r
+ public DataOutput out;\r
+\r
+ @Override\r
+ public int read() throws IOException {\r
+ int value = in.readUnsignedByte();\r
+ out.write(value);\r
+ return value;\r
+ }\r
+ }\r
+\r
+ private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {\r
+ int read = 0;\r
+ long bufferLength = buffer.length;\r
+ while (read < bytesToCopy) {\r
+ int l = (int) Math.min(bufferLength, bytesToCopy-read);\r
+ in.readFully(buffer, 0, l);\r
+ out.write(buffer, 0, l);\r
+ read += l;\r
+ }\r
+ return read;\r
+ }\r
+\r
+ private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;\r
+\r
+ private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final DataOutput out, TGStatusMonitor monitor) throws Exception {\r
+ long start = System.nanoTime();\r
+\r
+ final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);\r
+ final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);\r
+ final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);\r
+\r
+ int totalCount = source.getIdentityCount() + source.getStatementCount()/4 + source.getValueCount();\r
+ TGStatusMonitor.Updater progress = new TGStatusMonitor.Updater(safeMonitor(monitor), totalCount);\r
+\r
+ out.writeInt(source.getResourceCount());\r
+ extensionSerializer.serialize(out, new Extensions(source.getExtensions()));\r
+\r
+// System.err.println("resource count: " + source.getResourceCount());\r
+// System.err.println("identity count: " + source.getIdentityCount());\r
+\r
+ byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];\r
+\r
+ processor.syncRequest(new ReadRequest() {\r
+ @Override\r
+ public void run(ReadGraph graph) throws DatabaseException {\r
+ try {\r
+ if (monitor.isCanceled())\r
+ throw new CancelTransactionException();\r
+\r
+ out.writeInt(source.getIdentityCount());\r
+ source.forIdentities(graph, value -> {\r
+ //System.err.println("id: " + value);\r
+ identitySerializer.serialize(out, value);\r
+ progress.worked(1);\r
+ });\r
+\r
+ if (monitor.isCanceled())\r
+ throw new CancelTransactionException();\r
+\r
+ out.writeInt(source.getStatementCount());\r
+ //System.err.println("stms: " + source.getStatementCount());\r
+ //int[] counter = {0};\r
+ source.forStatements(graph, r -> {\r
+ for (int i = 0; i < 4; ++i)\r
+ out.writeInt(r[i]);\r
+ //System.err.println("stm " + (counter[0]++) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);\r
+ progress.worked(1);\r
+ });\r
+\r
+ if (monitor.isCanceled())\r
+ throw new CancelTransactionException();\r
+\r
+ out.writeInt(source.getValueCount());\r
+ //System.err.println("vals: " + source.getValueCount());\r
+ CopyingInputStream cis = new CopyingInputStream();\r
+ cis.out = out;\r
+ source.forValues2(graph, new TransferableGraphSourceValueProcedure() {\r
+ TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();\r
+\r
+ @Override\r
+ public void rawCopy(int resource, int length, DataInput input) throws Exception {\r
+ out.writeInt(resource);\r
+ long copied = copy(buffer, input, out, length);\r
+ assert copied == length;\r
+ //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");\r
+ progress.worked(1);\r
+ }\r
+\r
+ @Override\r
+ public void execute(int resource, Datatype type, DataInput input) throws Exception {\r
+ out.writeInt(resource);\r
+ identities.clear();\r
+ datatypeSerializer.serialize(out, identities, type);\r
+ Binding binding = Bindings.getBinding(type);\r
+ Serializer serializer = Bindings.getSerializer(binding);\r
+ cis.in = input;\r
+ serializer.skip(cis);\r
+ cis.in = null;\r
+ progress.worked(1);\r
+ }\r
+ });\r
+\r
+ } catch (DatabaseException e) {\r
+ throw e;\r
+ } catch (Exception e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+ });\r
+\r
+ long end = System.nanoTime();\r
+ System.err.println("Wrote transferable graph in " + 1e-9*(end-start) + " seconds.");\r
+ }\r
+\r
+ public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {\r
+ \r
+ final TIntArrayList statements = new TIntArrayList();\r
+ final ArrayList<Value> values = new ArrayList<>();\r
+ final ArrayList<Identity> identities = new ArrayList<>();\r
+ \r
+ try {\r
+\r
+ source.forStatements(graph, r -> statements.addAll(r));\r
+ source.forValues(graph, v -> values.add(v));\r
+ source.forIdentities(graph, i -> identities.add(i));\r
+\r
+ return new TransferableGraph1(source.getResourceCount(), \r
+ identities.toArray(new Identity[identities.size()]),\r
+ statements.toArray(),\r
+ values.toArray(new Value[values.size()]),\r
+ source.getExtensions());\r
+ \r
+ } catch (Exception e) {\r
+ \r
+ throw new DatabaseException(e);\r
+ \r
+ }\r
+ \r
+ }\r
+\r
+}\r