-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.graph.db;\r
-\r
-import java.io.DataInput;\r
-import java.io.DataOutput;\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.HashSet;\r
-import java.util.TreeMap;\r
-import java.util.UUID;\r
-\r
-import org.simantics.databoard.Accessors;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.Datatypes;\r
-import org.simantics.databoard.accessor.error.AccessorConstructionException;\r
-import org.simantics.databoard.accessor.error.AccessorException;\r
-import org.simantics.databoard.accessor.file.FileVariantAccessor;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.error.BindingConstructionException;\r
-import org.simantics.databoard.binding.error.DatatypeConstructionException;\r
-import org.simantics.databoard.binding.mutable.Variant;\r
-import org.simantics.databoard.container.DataContainer;\r
-import org.simantics.databoard.container.DataContainers;\r
-import org.simantics.databoard.serialization.SerializationException;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.databoard.util.binary.BinaryFile;\r
-import org.simantics.databoard.util.binary.RandomAccessBinary;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.WriteOnlyGraph;\r
-import org.simantics.db.common.CommentMetadata;\r
-import org.simantics.db.common.request.ReadRequest;\r
-import org.simantics.db.common.request.WriteOnlyRequest;\r
-import org.simantics.db.common.request.WriteRequest;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.db.service.VirtualGraphSupport;\r
-import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;\r
-import org.simantics.graph.diff.TransferableGraphDelta1;\r
-import org.simantics.graph.representation.Extensions;\r
-import org.simantics.graph.representation.External;\r
-import org.simantics.graph.representation.Identity;\r
-import org.simantics.graph.representation.TransferableGraph1;\r
-import org.simantics.graph.representation.Value;\r
-import org.simantics.utils.datastructures.BinaryFunction;\r
-\r
-import gnu.trove.list.array.TIntArrayList;\r
-import gnu.trove.map.hash.TObjectIntHashMap;\r
-\r
-public class TransferableGraphs {\r
-\r
- public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {\r
- if (tg instanceof TransferableGraph1) \r
- {\r
- return importGraph1(session, (TransferableGraph1) tg, advisor);\r
- }\r
- throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
- }\r
-\r
- public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {\r
- if (tg instanceof TransferableGraph1) \r
- {\r
- return importGraph1(g, (TransferableGraph1) tg, advisor);\r
- }\r
- throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
- }\r
- \r
- public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {\r
- if (tg instanceof TransferableGraph1) \r
- {\r
- return importGraph1(session, (TransferableGraph1) tg);\r
- }\r
- throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
- }\r
-\r
- public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {\r
- if (tg instanceof TransferableGraph1) \r
- {\r
- return importGraph1(g, (TransferableGraph1) tg);\r
- }\r
- throw new TransferableGraphException("Cannot import "+tg.getClass().getName());\r
- }\r
- \r
- public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());\r
- processor.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- });\r
- HashSet<Resource> result = new HashSet<Resource>();\r
- for(Identity id : tg.identities) {\r
- if(id.definition instanceof External) {\r
- result.add(process.resources[id.resource]);\r
- }\r
- }\r
- return result;\r
- }\r
- \r
- /**\r
- * Imports transferable graph version 1 to the database. Root advisor is used\r
- * to give identities to roots of the transferable graphs. It may be null,\r
- * in which case new resources are created for all roots but the root library.\r
- * \r
- * @param session\r
- * @param tg\r
- * @param advisor root advisor or <code>null</code>\r
- * @throws DatabaseException\r
- */\r
- public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {\r
- \r
- final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
- \r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- }); \r
- session.syncRequest(new WriteOnlyRequest() {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- advisor.beforeWrite(graph, process);\r
- process.write(graph);\r
- advisor.afterWrite(graph, process);\r
- }\r
- });\r
- return process.getResourceIds(\r
- session.getService(SerialisationSupport.class));\r
- }\r
-\r
- public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteOnlyGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- });\r
- session.syncRequest(new WriteOnlyRequest() {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- process.write(graph);\r
- if(callback != null)\r
- callback.call(graph, process);\r
- }\r
- });\r
- }\r
-\r
- public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {\r
- importGraph1(session, tg, advisor, null);\r
- }\r
-\r
- public static void importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {\r
- importGraph1(session, null, tg, advisor, monitor);\r
- }\r
-\r
- public static void importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {\r
- \r
- final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
-\r
- final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- try {\r
- process.prepare(graph);\r
- } catch (DatabaseException e) {\r
- throw e;\r
- } catch (Exception e) {\r
- throw new DatabaseException(e);\r
- }\r
- }\r
- });\r
- session.syncRequest(new WriteOnlyRequest(vg) {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- try {\r
- advisor.beforeWrite(graph, process);\r
- process.write(graph);\r
- advisor.afterWrite(graph, process);\r
- } catch (Exception e) {\r
- throw new DatabaseException(e);\r
- }\r
- }\r
- });\r
- }\r
-\r
- public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {\r
- final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor, monitor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- });\r
- session.syncRequest(new WriteOnlyRequest() {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- advisor.beforeWrite(graph, process);\r
- process.write2(graph);\r
- advisor.afterWrite(graph, process);\r
- CommentMetadata comments = graph.getMetadata(CommentMetadata.class);\r
- comments.add("Imported transferable graph with " + tg.resourceCount + " resources");\r
- graph.addMetadata(comments);\r
- }\r
- });\r
- }\r
-\r
- public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BinaryFunction<Boolean, WriteGraph, TransferableGraphImportProcess> callback) throws DatabaseException, TransferableGraphException {\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- });\r
- session.syncRequest(new WriteRequest() {\r
- @Override\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
- process.write2(graph);\r
- CommentMetadata comments = graph.getMetadata(CommentMetadata.class);\r
- comments.add("Imported transferable graph with " + tg.resourceCount + " resources");\r
- graph.addMetadata(comments);\r
- if(callback != null)\r
- callback.call(graph, process);\r
- }\r
- });\r
- }\r
- \r
- public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
- final TransferableGraphImportProcess process = \r
- new TransferableGraphImportProcess(tg, \r
- null);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- }); \r
- session.syncRequest(new WriteOnlyRequest() {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- process.write(graph);\r
- }\r
- });\r
- return process.getResourceIds(\r
- session.getService(SerialisationSupport.class));\r
- }\r
- \r
- public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {\r
- final TransferableGraphImportProcess process = \r
- new TransferableGraphImportProcess(tg, \r
- null);\r
- process.prepare(graph);\r
- process.write2(graph);\r
- return process.getResourceIds(\r
- graph.getSession().getService(SerialisationSupport.class));\r
- }\r
- \r
- /**\r
- * Import transferable graph version 1 to the database. Root advisor is used\r
- * to give identities to roots of the transferable graphs. It may be null,\r
- * in which case new resources are created for all roots but the root library.\r
- * \r
- * @param session\r
- * @param tg\r
- * @param advisor root advisor or <code>null</code>\r
- * @throws DatabaseException\r
- */\r
- public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
- return importGraph1(graph, tg, advisor, null);\r
- }\r
-\r
- public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {\r
- TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor, monitor);\r
- process.prepare(graph);\r
- if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);\r
- process.write2(graph);\r
- if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);\r
- return process.getResourceIds(\r
- graph.getSession().getService(SerialisationSupport.class));\r
- }\r
-\r
- public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {\r
- SerialisationSupport serializer = \r
- graph.getSession().getService(SerialisationSupport.class);\r
- \r
- TGToGraphMap aMap = new TGToGraphMap(delta.a);\r
- aMap.addOldResources(serializer, oldResources);\r
- aMap.deny(graph);\r
- \r
- TGToGraphMap bMap = new TGToGraphMap(delta.b);\r
- bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());\r
- bMap.prepare(graph);\r
- bMap.claim(graph);\r
- \r
- return bMap.getResources(serializer);\r
- }\r
- \r
- public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {\r
- \r
- SerialisationSupport serializer = \r
- graph.getSession().getService(SerialisationSupport.class);\r
- \r
- TGToGraphMap aMap = new TGToGraphMap(delta.a);\r
- aMap.addOldResources(serializer, oldResources);\r
- if(aMap.checkDeny(graph)) return true;\r
- \r
- TGToGraphMap bMap = new TGToGraphMap(delta.b);\r
- bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());\r
- bMap.prepare(graph);\r
- return bMap.checkClaim(graph);\r
- \r
- }\r
-\r
- public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,\r
- ImportAdvisor advisor) throws TransferableGraphException {\r
- // TODO HANNU IMPLEMENTS\r
- throw new UnsupportedOperationException();\r
- } \r
- \r
- public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor);\r
- session.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- process.prepare(graph);\r
- }\r
- });\r
- session.syncRequest(new WriteOnlyRequest(vg) {\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- // Needed because process#write does not support virtual WriteOnlyGraph\r
- if (vg != null)\r
- process.write2(graph);\r
- else\r
- process.write(graph);\r
- }\r
- \r
- });\r
- return process.getResourceIds(session.getService(SerialisationSupport.class));\r
- } \r
-\r
- public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {\r
- final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, \r
- advisor == null ? new ImportAdvisor() : advisor);\r
- process.prepare(graph);\r
- process.write2(graph);\r
- return process.getResourceIds(graph.getService(SerialisationSupport.class));\r
- } \r
- \r
- public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {\r
- FileVariantAccessor va = null;\r
- try {\r
- va = Accessors.openAccessor(file);\r
- Datatype type = va.getContentType();\r
- if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) \r
- return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));\r
- else\r
- throw new SerializationException("Unknown transferable graph data type.");\r
- } catch (AccessorException e) {\r
- throw new TransferableGraphException(e);\r
- } catch (BindingConstructionException e) {\r
- throw new TransferableGraphException(e);\r
- } catch (SerializationException e) {\r
- throw new TransferableGraphException(e);\r
- } catch (AccessorConstructionException e) {\r
- throw new TransferableGraphException(e);\r
- } catch (DatatypeConstructionException e) {\r
- throw new TransferableGraphException(e);\r
- } finally {\r
- if(va != null) {\r
- try {\r
- va.close();\r
- } catch (AccessorException e) {\r
- }\r
- }\r
- } \r
- }\r
- \r
- public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {\r
- try {\r
- importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());\r
- } catch (DatabaseException e) {\r
- throw new TransferableGraphException(e);\r
- }\r
- }\r
- \r
- public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {\r
- VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);\r
- VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());\r
- importVirtualGraph(session, vg, file);\r
- return vg;\r
- }\r
- \r
- public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {\r
- writeTransferableGraph(processor, format, 1, source, target);\r
- }\r
-\r
- public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {\r
- writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);\r
- }\r
-\r
- public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {\r
- writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);\r
- }\r
-\r
- public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {\r
- final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);\r
- try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {\r
- DataContainer container = new DataContainer(format, version, metadata, null);\r
- DataContainers.writeHeader(out, container);\r
- datatypeSerializer.serialize((DataOutput) out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));\r
- writeTransferableGraph(processor, source, out, monitor);\r
- }\r
- }\r
-\r
- private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {\r
- return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;\r
- }\r
-\r
- private static class CopyingInputStream extends InputStream {\r
- public DataInput in;\r
- public DataOutput out;\r
-\r
- @Override\r
- public int read() throws IOException {\r
- int value = in.readUnsignedByte();\r
- out.write(value);\r
- return value;\r
- }\r
- }\r
-\r
- private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {\r
- int read = 0;\r
- long bufferLength = buffer.length;\r
- while (read < bytesToCopy) {\r
- int l = (int) Math.min(bufferLength, bytesToCopy-read);\r
- in.readFully(buffer, 0, l);\r
- out.write(buffer, 0, l);\r
- read += l;\r
- }\r
- return read;\r
- }\r
-\r
- private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;\r
-\r
- private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {\r
- long start = System.nanoTime();\r
-\r
- final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);\r
- final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);\r
- final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);\r
-\r
- int resourceCount = source.getResourceCount();\r
- //System.err.println("resourceCount: " + resourceCount);\r
- out.writeInt(resourceCount);\r
- extensionSerializer.serialize(out, new Extensions(source.getExtensions()));\r
-\r
-// System.err.println("resource count: " + source.getResourceCount());\r
-// System.err.println("identity count: " + source.getIdentityCount());\r
-\r
- byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];\r
-\r
- processor.syncRequest(new ReadRequest() {\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
- try {\r
- if (monitor.isCanceled())\r
- throw new CancelTransactionException();\r
-\r
- int identityCount = source.getIdentityCount();\r
- TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);\r
- out.writeInt(identityCount);\r
- //System.err.println("identities: " + identityCount);\r
- source.forIdentities(graph, value -> {\r
- //System.err.println("id: " + value);\r
- identitySerializer.serialize(out, value);\r
- identityProgress.worked(1);\r
- });\r
-\r
- if (monitor.isCanceled())\r
- throw new CancelTransactionException();\r
-\r
- long statementCountPos = out.position();\r
- int originalStatementCount = source.getStatementCount();\r
- TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);\r
- out.writeInt(originalStatementCount);\r
- //System.err.println("original statementCount: " + originalStatementCount);\r
- int[] statementCounter = { 0 };\r
- source.forStatements(graph, r -> {\r
- for (int i = 0; i < 4; ++i)\r
- out.writeInt(r[i]);\r
- statementCounter[0]++;\r
- //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);\r
- statementProgress.worked(1);\r
- });\r
- //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");\r
-\r
- // Rewrite statement count after knowing exactly how many\r
- // statements were written. It is possible that some\r
- // statements get filtered out at this stage and the\r
- // original statement count does not reflect that.\r
- long afterStatementsPos = out.position();\r
- out.position(statementCountPos);\r
- out.writeInt(statementCounter[0]*4);\r
- out.position(afterStatementsPos);\r
-\r
- if (monitor.isCanceled())\r
- throw new CancelTransactionException();\r
-\r
- int valueCount = source.getValueCount();\r
- TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);\r
- out.writeInt(valueCount);\r
-// System.err.println("valueCount: " + valueCount);\r
- CopyingInputStream cis = new CopyingInputStream();\r
- cis.out = out;\r
- source.forValues2(graph, new TransferableGraphSourceValueProcedure() {\r
- TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();\r
-\r
- @Override\r
- public void rawCopy(int resource, int length, DataInput input) throws Exception {\r
- out.writeInt(resource);\r
- long copied = copy(buffer, input, out, length);\r
- assert copied == length;\r
- //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");\r
- valueProgress.worked(1);\r
- }\r
-\r
- @Override\r
- public void execute(int resource, Datatype type, DataInput input) throws Exception {\r
- out.writeInt(resource);\r
- identities.clear();\r
- datatypeSerializer.serialize(out, identities, type);\r
- Binding binding = Bindings.getBinding(type);\r
- Serializer serializer = Bindings.getSerializer(binding);\r
- cis.in = input;\r
- serializer.skip(cis);\r
- cis.in = null;\r
- valueProgress.worked(1);\r
- }\r
- });\r
-\r
- } catch (DatabaseException e) {\r
- throw e;\r
- } catch (Exception e) {\r
- throw new DatabaseException(e);\r
- }\r
- }\r
- });\r
-\r
- long end = System.nanoTime();\r
- System.err.println("Wrote transferable graph in " + 1e-9*(end-start) + " seconds.");\r
- }\r
-\r
- public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {\r
- \r
- final TIntArrayList statements = new TIntArrayList();\r
- final ArrayList<Value> values = new ArrayList<>();\r
- final ArrayList<Identity> identities = new ArrayList<>();\r
- \r
- try {\r
-\r
- source.forStatements(graph, r -> statements.addAll(r));\r
- source.forValues(graph, v -> values.add(v));\r
- source.forIdentities(graph, i -> identities.add(i));\r
-\r
- return new TransferableGraph1(source.getResourceCount(), \r
- identities.toArray(new Identity[identities.size()]),\r
- statements.toArray(),\r
- values.toArray(new Value[values.size()]),\r
- source.getExtensions());\r
- \r
- } catch (Exception e) {\r
- \r
- throw new DatabaseException(e);\r
- \r
- }\r
- \r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.graph.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import org.simantics.databoard.Accessors;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.Datatypes;
+import org.simantics.databoard.accessor.error.AccessorConstructionException;
+import org.simantics.databoard.accessor.error.AccessorException;
+import org.simantics.databoard.accessor.file.FileVariantAccessor;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.error.BindingConstructionException;
+import org.simantics.databoard.binding.error.DatatypeConstructionException;
+import org.simantics.databoard.binding.mutable.Variant;
+import org.simantics.databoard.container.DataContainer;
+import org.simantics.databoard.container.DataContainers;
+import org.simantics.databoard.serialization.SerializationException;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.databoard.util.binary.BinaryFile;
+import org.simantics.databoard.util.binary.RandomAccessBinary;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.RequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.WriteOnlyGraph;
+import org.simantics.db.common.CommentMetadata;
+import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.common.request.WriteOnlyRequest;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.service.SerialisationSupport;
+import org.simantics.db.service.VirtualGraphSupport;
+import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
+import org.simantics.graph.diff.TransferableGraphDelta1;
+import org.simantics.graph.representation.Extensions;
+import org.simantics.graph.representation.External;
+import org.simantics.graph.representation.Identity;
+import org.simantics.graph.representation.TransferableGraph1;
+import org.simantics.graph.representation.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.list.array.TIntArrayList;
+import gnu.trove.map.hash.TObjectIntHashMap;
+
+public class TransferableGraphs {
+ final static Logger LOGGER = LoggerFactory.getLogger(TransferableGraphs.class);
+
+ public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
+ if (tg instanceof TransferableGraph1)
+ {
+ return importGraph1(session, (TransferableGraph1) tg, advisor);
+ }
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
+ }
+
+ public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
+ if (tg instanceof TransferableGraph1)
+ {
+ return importGraph1(g, (TransferableGraph1) tg, advisor);
+ }
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
+ }
+
+ public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {
+ if (tg instanceof TransferableGraph1)
+ {
+ return importGraph1(session, (TransferableGraph1) tg);
+ }
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
+ }
+
+ public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {
+ if (tg instanceof TransferableGraph1)
+ {
+ return importGraph1(g, (TransferableGraph1) tg);
+ }
+ throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
+ }
+
+ public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());
+ processor.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ HashSet<Resource> result = new HashSet<Resource>();
+ for(Identity id : tg.identities) {
+ if(id.definition instanceof External) {
+ result.add(process.resources[id.resource]);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Imports transferable graph version 1 to the database. Root advisor is used
+ * to give identities to roots of the transferable graphs. It may be null,
+ * in which case new resources are created for all roots but the root library.
+ *
+ * @param session
+ * @param tg
+ * @param advisor root advisor or <code>null</code>
+ * @throws DatabaseException
+ */
+ public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {
+
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
+
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest() {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ advisor.beforeWrite(graph, process);
+ process.write(graph);
+ advisor.afterWrite(graph, process);
+ }
+ });
+ return process.getResourceIds(
+ session.getService(SerialisationSupport.class));
+ }
+
+ public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteOnlyGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest() {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ process.write(graph);
+ if(callback != null)
+ callback.apply(graph, process);
+ }
+ });
+ }
+
+ public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {
+ return importGraph1(session, tg, advisor, null);
+ }
+
+ public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
+ return importGraph1(session, null, tg, advisor, monitor);
+ }
+
+ public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
+
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
+
+ final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ try {
+ process.prepare(graph);
+ } catch (DatabaseException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatabaseException(e);
+ }
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest(vg) {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ try {
+ advisor.beforeWrite(graph, process);
+ process.write(graph);
+ advisor.afterWrite(graph, process);
+ } catch (Exception e) {
+ throw new DatabaseException(e);
+ }
+ }
+ });
+
+ return new ImportResult(process.missingExternals);
+ }
+
+ public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
+ final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor, monitor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest() {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ advisor.beforeWrite(graph, process);
+ process.write2(graph);
+ advisor.afterWrite(graph, process);
+ CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
+ comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
+ graph.addMetadata(comments);
+ }
+ });
+ }
+
+ public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteRequest() {
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ process.write2(graph);
+ CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
+ comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
+ graph.addMetadata(comments);
+ if(callback != null)
+ callback.apply(graph, process);
+ }
+ });
+ }
+
+ public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
+ final TransferableGraphImportProcess process =
+ new TransferableGraphImportProcess(tg,
+ null);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest() {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ process.write(graph);
+ }
+ });
+ return process.getResourceIds(
+ session.getService(SerialisationSupport.class));
+ }
+
+ public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
+ final TransferableGraphImportProcess process =
+ new TransferableGraphImportProcess(tg,
+ null);
+ process.prepare(graph);
+ process.write2(graph);
+ return process.getResourceIds(
+ graph.getSession().getService(SerialisationSupport.class));
+ }
+
+ /**
+ * Import transferable graph version 1 to the database. Root advisor is used
+ * to give identities to roots of the transferable graphs. It may be null,
+ * in which case new resources are created for all roots but the root library.
+ *
+ * @param session
+ * @param tg
+ * @param advisor root advisor or <code>null</code>
+ * @throws DatabaseException
+ */
+ public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
+ return importGraph1(graph, tg, advisor, null);
+ }
+
+ public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
+ TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor, monitor);
+ process.prepare(graph);
+ if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);
+ process.write2(graph);
+ if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);
+ return process.getResourceIds(
+ graph.getSession().getService(SerialisationSupport.class));
+ }
+
+ public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
+ SerialisationSupport serializer =
+ graph.getSession().getService(SerialisationSupport.class);
+
+ TGToGraphMap aMap = new TGToGraphMap(delta.a);
+ aMap.addOldResources(serializer, oldResources);
+ aMap.deny(graph);
+
+ TGToGraphMap bMap = new TGToGraphMap(delta.b);
+ bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
+ bMap.prepare(graph);
+ bMap.claim(graph);
+
+ return bMap.getResources(serializer);
+ }
+
+ public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
+
+ SerialisationSupport serializer =
+ graph.getSession().getService(SerialisationSupport.class);
+
+ TGToGraphMap aMap = new TGToGraphMap(delta.a);
+ aMap.addOldResources(serializer, oldResources);
+ if(aMap.checkDeny(graph)) return true;
+
+ TGToGraphMap bMap = new TGToGraphMap(delta.b);
+ bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
+ bMap.prepare(graph);
+ return bMap.checkClaim(graph);
+
+ }
+
+ public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,
+ ImportAdvisor advisor) throws TransferableGraphException {
+ // TODO HANNU IMPLEMENTS
+ throw new UnsupportedOperationException();
+ }
+
+ public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor);
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ process.prepare(graph);
+ }
+ });
+ session.syncRequest(new WriteOnlyRequest(vg) {
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ // Needed because process#write does not support virtual WriteOnlyGraph
+ if (vg != null)
+ process.write2(graph);
+ else
+ process.write(graph);
+ }
+
+ });
+ return process.getResourceIds(session.getService(SerialisationSupport.class));
+ }
+
+ public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
+ final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg,
+ advisor == null ? new ImportAdvisor() : advisor);
+ process.prepare(graph);
+ process.write2(graph);
+ return process.getResourceIds(graph.getService(SerialisationSupport.class));
+ }
+
+ public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {
+ FileVariantAccessor va = null;
+ try {
+ va = Accessors.openAccessor(file);
+ Datatype type = va.getContentType();
+ if(type.equals(Datatypes.getDatatype(TransferableGraph1.class)))
+ return (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));
+ else
+ throw new SerializationException("Unknown transferable graph data type.");
+ } catch (AccessorException e) {
+ throw new TransferableGraphException(e);
+ } catch (BindingConstructionException e) {
+ throw new TransferableGraphException(e);
+ } catch (SerializationException e) {
+ throw new TransferableGraphException(e);
+ } catch (AccessorConstructionException e) {
+ throw new TransferableGraphException(e);
+ } catch (DatatypeConstructionException e) {
+ throw new TransferableGraphException(e);
+ } finally {
+ if(va != null) {
+ try {
+ va.close();
+ } catch (AccessorException e) {
+ }
+ }
+ }
+ }
+
+ public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {
+ try {
+ importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());
+ } catch (DatabaseException e) {
+ throw new TransferableGraphException(e);
+ }
+ }
+
+ public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {
+ VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);
+ VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());
+ importVirtualGraph(session, vg, file);
+ return vg;
+ }
+
+ public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {
+ writeTransferableGraph(processor, format, 1, source, target);
+ }
+
+ public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {
+ writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);
+ }
+
+ public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {
+ writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);
+ }
+
+ public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
+ try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
+ DataContainers.writeHeader(out, new DataContainer(format, version, metadata, null));
+ writeTransferableGraphVariant(processor, source, out, monitor);
+ }
+ }
+
+ public static void writeTransferableGraph(RequestProcessor processor, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
+ try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
+ writeTransferableGraphVariant(processor, source, out, monitor);
+ }
+ }
+
+ public static void writeTransferableGraphVariant(RequestProcessor processor, TransferableGraphSource source, RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
+ Bindings.getSerializerUnchecked(Datatype.class).serialize(out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));
+ writeTransferableGraph(processor, source, out, monitor);
+ }
+
+ private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {
+ return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;
+ }
+
+ private static class CopyingInputStream extends InputStream {
+ public DataInput in;
+ public DataOutput out;
+
+ @Override
+ public int read() throws IOException {
+ int value = in.readUnsignedByte();
+ out.write(value);
+ return value;
+ }
+ }
+
+ private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {
+ int read = 0;
+ long bufferLength = buffer.length;
+ while (read < bytesToCopy) {
+ int l = (int) Math.min(bufferLength, bytesToCopy-read);
+ in.readFully(buffer, 0, l);
+ out.write(buffer, 0, l);
+ read += l;
+ }
+ return read;
+ }
+
+ private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;
+
+ private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
+ long start = System.nanoTime();
+
+ final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
+ final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);
+ final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);
+
+ int resourceCount = source.getResourceCount();
+ //System.err.println("resourceCount: " + resourceCount);
+ out.writeInt(resourceCount);
+ extensionSerializer.serialize(out, new Extensions(source.getExtensions()));
+
+// System.err.println("resource count: " + source.getResourceCount());
+// System.err.println("identity count: " + source.getIdentityCount());
+
+ byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];
+
+ processor.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ try {
+ if (monitor.isCanceled())
+ throw new CancelTransactionException();
+
+ int identityCount = source.getIdentityCount();
+ TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);
+ out.writeInt(identityCount);
+ //System.err.println("identities: " + identityCount);
+ source.forIdentities(graph, value -> {
+ //System.err.println("id: " + value);
+ identitySerializer.serialize(out, value);
+ identityProgress.worked(1);
+ });
+
+ if (monitor.isCanceled())
+ throw new CancelTransactionException();
+
+ long statementCountPos = out.position();
+ int originalStatementCount = source.getStatementCount();
+ TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);
+ out.writeInt(originalStatementCount);
+ //System.err.println("original statementCount: " + originalStatementCount);
+ int[] statementCounter = { 0 };
+ source.forStatements(graph, r -> {
+ for (int i = 0; i < 4; ++i)
+ out.writeInt(r[i]);
+ statementCounter[0]++;
+ //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);
+ statementProgress.worked(1);
+ });
+ //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");
+
+ // Rewrite statement count after knowing exactly how many
+ // statements were written. It is possible that some
+ // statements get filtered out at this stage and the
+ // original statement count does not reflect that.
+ long afterStatementsPos = out.position();
+ out.position(statementCountPos);
+ out.writeInt(statementCounter[0]*4);
+ out.position(afterStatementsPos);
+
+ if (monitor.isCanceled())
+ throw new CancelTransactionException();
+
+ int valueCount = source.getValueCount();
+ TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);
+ out.writeInt(valueCount);
+// System.err.println("valueCount: " + valueCount);
+ CopyingInputStream cis = new CopyingInputStream();
+ cis.out = out;
+ source.forValues2(graph, new TransferableGraphSourceValueProcedure() {
+ TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();
+
+ @Override
+ public void rawCopy(int resource, int length, DataInput input) throws Exception {
+ out.writeInt(resource);
+ long copied = copy(buffer, input, out, length);
+ assert copied == length;
+ //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");
+ valueProgress.worked(1);
+ }
+
+ @Override
+ public void execute(int resource, Datatype type, DataInput input) throws Exception {
+ out.writeInt(resource);
+ identities.clear();
+ datatypeSerializer.serialize(out, identities, type);
+ Binding binding = Bindings.getBinding(type);
+ Serializer serializer = Bindings.getSerializer(binding);
+ cis.in = input;
+ serializer.skip(cis);
+ cis.in = null;
+ valueProgress.worked(1);
+ }
+ });
+
+ } catch (DatabaseException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatabaseException(e);
+ }
+ }
+ });
+
+ long end = System.nanoTime();
+ LOGGER.info("Wrote transferable graph in {} seconds.", 1e-9*(end-start));
+ }
+
+ public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {
+
+ try {
+
+ ArrayList<Identity> identities = new ArrayList<>(source.getIdentityCount());
+ source.forIdentities(graph, i -> identities.add(i));
+ TIntArrayList statements = new TIntArrayList(source.getStatementCount());
+ source.forStatements(graph, r -> statements.addAll(r));
+ ArrayList<Value> values = new ArrayList<>(source.getValueCount());
+ source.forValues(graph, v -> values.add(v));
+
+ return new TransferableGraph1(source.getResourceCount(),
+ identities.toArray(new Identity[identities.size()]),
+ statements.toArray(),
+ values.toArray(new Value[values.size()]),
+ source.getExtensions());
+
+ } catch (Exception e) {
+
+ throw new DatabaseException(e);
+
+ }
+
+ }
+
+}