]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java
Multiple readers and variable optimization
[simantics/platform.git] / bundles / org.simantics.graph.db / src / org / simantics / graph / db / TransferableGraphs.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.graph.db;
13
14 import java.io.DataInput;
15 import java.io.DataOutput;
16 import java.io.File;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.TreeMap;
23 import java.util.UUID;
24 import java.util.function.BiFunction;
25
26 import org.simantics.databoard.Accessors;
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.Datatypes;
29 import org.simantics.databoard.accessor.error.AccessorConstructionException;
30 import org.simantics.databoard.accessor.error.AccessorException;
31 import org.simantics.databoard.accessor.file.FileVariantAccessor;
32 import org.simantics.databoard.binding.Binding;
33 import org.simantics.databoard.binding.error.BindingConstructionException;
34 import org.simantics.databoard.binding.error.DatatypeConstructionException;
35 import org.simantics.databoard.binding.mutable.Variant;
36 import org.simantics.databoard.container.DataContainer;
37 import org.simantics.databoard.container.DataContainers;
38 import org.simantics.databoard.serialization.SerializationException;
39 import org.simantics.databoard.serialization.Serializer;
40 import org.simantics.databoard.type.Datatype;
41 import org.simantics.databoard.util.binary.BinaryFile;
42 import org.simantics.databoard.util.binary.RandomAccessBinary;
43 import org.simantics.db.ReadGraph;
44 import org.simantics.db.RequestProcessor;
45 import org.simantics.db.Resource;
46 import org.simantics.db.Session;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.WriteOnlyGraph;
50 import org.simantics.db.common.CommentMetadata;
51 import org.simantics.db.common.request.ReadRequest;
52 import org.simantics.db.common.request.UniqueRead;
53 import org.simantics.db.common.request.WriteOnlyRequest;
54 import org.simantics.db.common.request.WriteRequest;
55 import org.simantics.db.exception.CancelTransactionException;
56 import org.simantics.db.exception.DatabaseException;
57 import org.simantics.db.service.SerialisationSupport;
58 import org.simantics.db.service.VirtualGraphSupport;
59 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
60 import org.simantics.graph.diff.TransferableGraphDelta1;
61 import org.simantics.graph.representation.Extensions;
62 import org.simantics.graph.representation.External;
63 import org.simantics.graph.representation.Identity;
64 import org.simantics.graph.representation.TransferableGraph1;
65 import org.simantics.graph.representation.Value;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 import gnu.trove.list.array.TIntArrayList;
70 import gnu.trove.map.hash.TObjectIntHashMap;
71
72 public class TransferableGraphs {
73     final static Logger LOGGER = LoggerFactory.getLogger(TransferableGraphs.class); 
74
75         public static long[] importGraph(Session session, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
76                 if (tg instanceof TransferableGraph1) 
77                 {
78                         return importGraph1(session, (TransferableGraph1) tg, advisor);
79                 }
80                 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
81         }
82
83         public static long[] importGraph(WriteGraph g, Object tg, IImportAdvisor advisor) throws DatabaseException, TransferableGraphException {
84                 if (tg instanceof TransferableGraph1) 
85                 {
86                         return importGraph1(g, (TransferableGraph1) tg, advisor);
87                 }
88                 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
89         }
90         
91         public static long[] importGraph(Session session, Object tg) throws DatabaseException, TransferableGraphException {
92                 if (tg instanceof TransferableGraph1) 
93                 {
94                         return importGraph1(session, (TransferableGraph1) tg);
95                 }
96                 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
97         }
98
99         public static long[] importGraph(WriteGraph g, Object tg) throws DatabaseException, TransferableGraphException {
100                 if (tg instanceof TransferableGraph1) 
101                 {
102                         return importGraph1(g, (TransferableGraph1) tg);
103                 }
104                 throw new TransferableGraphException("Cannot import "+tg.getClass().getName());
105         }
106         
107         public static Collection<Resource> collectExternals(RequestProcessor processor, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
108                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, new ImportAdvisor());
109                 processor.syncRequest(new ReadRequest() {
110                         @Override
111                         public void run(ReadGraph graph) throws DatabaseException {
112                                 process.prepare(graph);
113                         }
114                 });
115                 HashSet<Resource> result = new HashSet<Resource>();
116                 for(Identity id : tg.identities) {
117                         if(id.definition instanceof External) {
118                                 result.add(process.resources[id.resource]);
119                         }
120                 }
121                 return result;
122         }
123         
124         /**
125          * Imports transferable graph version 1 to the database. Root advisor is used
126          * to give identities to roots of the transferable graphs. It may be null,
127          * in which case new resources are created for all roots but the root library.
128          * 
129          * @param session
130          * @param tg
131      * @param advisor root advisor or <code>null</code>
132          * @throws DatabaseException
133          */
134         public static long[] importGraph1(Session session, final TransferableGraph1 tg, final IImportAdvisor advisor_) throws DatabaseException, TransferableGraphException {
135                 
136                 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
137                 
138                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
139                                 advisor == null ? new ImportAdvisor() : advisor);
140                 session.syncRequest(new ReadRequest() {
141                         @Override
142                         public void run(ReadGraph graph) throws DatabaseException {
143                                 process.prepare(graph);
144                         }
145                 });             
146                 session.syncRequest(new WriteOnlyRequest() {
147                         @Override
148                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
149                                 advisor.beforeWrite(graph, process);
150                                 process.write(graph);
151                                 advisor.afterWrite(graph, process);
152                         }
153                 });
154                 return process.getResourceIds(
155                                 session.getService(SerialisationSupport.class));
156         }
157
158         public static void importGraph1(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteOnlyGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
159                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
160                                 advisor == null ? new ImportAdvisor() : advisor);
161                 session.syncRequest(new ReadRequest() {
162                         @Override
163                         public void run(ReadGraph graph) throws DatabaseException {
164                                 process.prepare(graph);
165                         }
166                 });
167                 session.syncRequest(new WriteOnlyRequest() {
168                         @Override
169                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
170                                 process.write(graph);
171                                 if(callback != null)
172                                         callback.apply(graph, process);
173                         }
174                 });
175         }
176
177     public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor) throws Exception {
178         return importGraph1(session, tg, advisor, null);
179     }
180
181     public static ImportResult importGraph1(Session session, final TransferableGraphSource tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
182         return importGraph1(session, null, tg, advisor, monitor);
183     }
184
185     public static ImportResult importGraph1(Session session, VirtualGraph vg, final TransferableGraphSource tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
186         
187         final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
188
189                 final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor);
190                 session.syncRequest(new ReadRequest() {
191                         @Override
192                         public void run(ReadGraph graph) throws DatabaseException {
193                                 try {
194                                         process.prepare(graph);
195                                 } catch (DatabaseException e) {
196                                         throw e;
197                                 } catch (Exception e) {
198                                         throw new DatabaseException(e);
199                                 }
200                         }
201                 });
202                 session.syncRequest(new WriteOnlyRequest(vg) {
203                         @Override
204                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
205                                 try {
206                                     advisor.beforeWrite(graph, process);
207                                         process.write(graph);
208                     advisor.afterWrite(graph, process);
209                                 } catch (Exception e) {
210                                         throw new DatabaseException(e);
211                                 }
212                         }
213                 });
214
215                 return new ImportResult(process.missingExternals);
216         }
217
218         public static void importGraph1WithMonitor(Session session, final TransferableGraph1 tg, IImportAdvisor advisor_, TGStatusMonitor monitor) throws DatabaseException {
219                 final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
220                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
221                                 advisor == null ? new ImportAdvisor() : advisor, monitor);
222                 session.syncRequest(new ReadRequest() {
223                         @Override
224                         public void run(ReadGraph graph) throws DatabaseException {
225                                 process.prepare(graph);
226                         }
227                 });
228                 session.syncRequest(new WriteOnlyRequest() {
229                         @Override
230                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
231                                 advisor.beforeWrite(graph, process);
232                                 process.write2(graph);
233                                 advisor.afterWrite(graph, process);
234                                 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
235                                 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
236                                 graph.addMetadata(comments);
237                         }
238                 });
239         }
240
241         public static void importGraph1WithChanges(Session session, final TransferableGraph1 tg, IImportAdvisor advisor, final BiFunction<WriteGraph, TransferableGraphImportProcess, Boolean> callback) throws DatabaseException, TransferableGraphException {
242                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
243                                 advisor == null ? new ImportAdvisor() : advisor);
244                 session.syncRequest(new ReadRequest() {
245                         @Override
246                         public void run(ReadGraph graph) throws DatabaseException {
247                                 process.prepare(graph);
248                         }
249                 });
250                 session.syncRequest(new WriteRequest() {
251                         @Override
252                         public void perform(WriteGraph graph) throws DatabaseException {
253                                 process.write2(graph);
254                 CommentMetadata comments = graph.getMetadata(CommentMetadata.class);
255                 comments.add("Imported transferable graph with " + tg.resourceCount + " resources");
256                 graph.addMetadata(comments);
257                                 if(callback != null)
258                                         callback.apply(graph, process);
259                         }
260                 });
261         }
262         
263         public static long[] importGraph1(Session session, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
264                 final TransferableGraphImportProcess process = 
265                         new TransferableGraphImportProcess(tg, 
266                                 null);
267                 session.syncRequest(new ReadRequest() {
268                         @Override
269                         public void run(ReadGraph graph) throws DatabaseException {
270                                 process.prepare(graph);
271                         }
272                 });             
273                 session.syncRequest(new WriteOnlyRequest() {
274                         @Override
275                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
276                                 process.write(graph);
277                         }
278                 });
279                 return process.getResourceIds(
280                                 session.getService(SerialisationSupport.class));
281         }
282         
283         public static long[] importGraph1(WriteGraph graph, final TransferableGraph1 tg) throws DatabaseException, TransferableGraphException {
284                 final TransferableGraphImportProcess process = 
285                         new TransferableGraphImportProcess(tg, 
286                                 null);
287                 process.prepare(graph);
288                 process.write2(graph);
289                 return process.getResourceIds(
290                                 graph.getSession().getService(SerialisationSupport.class));
291         }
292         
293         /**
294          * Import transferable graph version 1 to the database. Root advisor is used
295          * to give identities to roots of the transferable graphs. It may be null,
296          * in which case new resources are created for all roots but the root library.
297          * 
298          * @param session
299          * @param tg
300      * @param advisor root advisor or <code>null</code>
301          * @throws DatabaseException
302          */
303         public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
304             return importGraph1(graph, tg, advisor, null);
305         }
306
307         public static long[] importGraph1(WriteGraph graph, TransferableGraph1 tg, IImportAdvisor advisor, TGStatusMonitor monitor) throws DatabaseException {
308                 TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
309                                 advisor == null ? new ImportAdvisor() : advisor, monitor);
310                 process.prepare(graph);
311                 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).beforeWrite(graph, process);
312                 process.write2(graph);
313                 if(advisor instanceof IImportAdvisor2) ((IImportAdvisor2)advisor).afterWrite(graph, process);
314                 return process.getResourceIds(
315                                 graph.getSession().getService(SerialisationSupport.class));
316         }
317
318         public static long[] applyDelta(WriteGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
319                 SerialisationSupport serializer = 
320                         graph.getSession().getService(SerialisationSupport.class);
321                 
322                 TGToGraphMap aMap = new TGToGraphMap(delta.a);
323                 aMap.addOldResources(serializer, oldResources);
324                 aMap.deny(graph);
325                 
326                 TGToGraphMap bMap = new TGToGraphMap(delta.b);
327                 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
328                 bMap.prepare(graph);
329                 bMap.claim(graph);
330                 
331                 return bMap.getResources(serializer);
332         }
333         
334         public static boolean hasChanges(ReadGraph graph, long[] oldResources, TransferableGraphDelta1 delta) throws DatabaseException {
335                 
336                 SerialisationSupport serializer = 
337                         graph.getSession().getService(SerialisationSupport.class);
338                 
339                 TGToGraphMap aMap = new TGToGraphMap(delta.a);
340                 aMap.addOldResources(serializer, oldResources);
341                 if(aMap.checkDeny(graph)) return true;
342                 
343                 TGToGraphMap bMap = new TGToGraphMap(delta.b);
344                 bMap.addMappedOldResources(serializer, delta.aToB, aMap.getResources());
345                 bMap.prepare(graph);
346                 return bMap.checkClaim(graph);
347                 
348         }
349
350         public static void uninstallGraph(WriteGraph writeGraph, TransferableGraph1 graph,
351                         ImportAdvisor advisor) throws TransferableGraphException {
352                 // TODO HANNU IMPLEMENTS
353                 throw new UnsupportedOperationException();
354         }       
355         
356         public static long[] importVirtualGraph(Session session, final VirtualGraph vg, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
357                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
358                                 advisor == null ? new ImportAdvisor() : advisor);
359                 session.syncRequest(new ReadRequest() {
360                         @Override
361                         public void run(ReadGraph graph) throws DatabaseException {
362                                 process.prepare(graph);
363                         }
364                 });
365                 session.syncRequest(new WriteOnlyRequest(vg) {
366                         @Override
367                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
368                                 // Needed because process#write does not support virtual WriteOnlyGraph
369                                 if (vg != null)
370                                         process.write2(graph);
371                                 else
372                                         process.write(graph);
373                         }
374                         
375                 });
376                 return process.getResourceIds(session.getService(SerialisationSupport.class));
377         }       
378
379         public static long[] importVirtualGraph(WriteGraph graph, final TransferableGraph1 tg, IImportAdvisor advisor) throws DatabaseException {
380                 final TransferableGraphImportProcess process = new TransferableGraphImportProcess(tg, 
381                                 advisor == null ? new ImportAdvisor() : advisor);
382                 process.prepare(graph);
383                 process.write2(graph);
384                 return process.getResourceIds(graph.getService(SerialisationSupport.class));
385         }       
386         
387         public static TransferableGraph1 readGraph(File file) throws TransferableGraphException {
388                 FileVariantAccessor va = null;
389                 try {
390                         va = Accessors.openAccessor(file);
391                         Datatype type = va.getContentType();
392                         if(type.equals(Datatypes.getDatatype(TransferableGraph1.class))) 
393                                 return  (TransferableGraph1)va.getContentValue(Bindings.getBinding(TransferableGraph1.class));
394                         else
395                                 throw new SerializationException("Unknown transferable graph data type.");
396                 } catch (AccessorException e) {
397                         throw new TransferableGraphException(e);
398                 } catch (BindingConstructionException e) {
399                         throw new TransferableGraphException(e);
400                 } catch (SerializationException e) {
401                         throw new TransferableGraphException(e);
402                 } catch (AccessorConstructionException e) {
403                         throw new TransferableGraphException(e);
404                 } catch (DatatypeConstructionException e) {
405                         throw new TransferableGraphException(e);
406                 } finally {
407                         if(va != null) {
408                                 try {
409                                         va.close();
410                                 } catch (AccessorException e) {
411                                 }
412                         }
413                 } 
414         }
415         
416         public static void importVirtualGraph(Session session, VirtualGraph vg, File file) throws TransferableGraphException {
417                 try {
418                         importVirtualGraph(session, vg, readGraph(file), new ImportAdvisor());
419                 } catch (DatabaseException e) {
420                         throw new TransferableGraphException(e);
421                 }
422         }
423         
424         public static VirtualGraph importVirtualGraph(Session session, File file) throws TransferableGraphException {
425                 VirtualGraphSupport support = session.getService(VirtualGraphSupport.class);
426                 VirtualGraph vg = support.getMemoryPersistent(UUID.randomUUID().toString());
427                 importVirtualGraph(session, vg, file);
428                 return vg;
429         }
430         
431         public static void writeTransferableGraph(RequestProcessor processor, final String format, final TransferableGraphSource source, File target) throws Exception {
432                 writeTransferableGraph(processor, format, 1, source, target);
433         }
434
435         public static void writeTransferableGraph(RequestProcessor processor, final String format, final int version, final TransferableGraphSource source, File target) throws Exception {
436                 writeTransferableGraph(processor, format, version, new TreeMap<String,Variant>(), source, target);
437         }
438
439         public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target) throws Exception {
440                 writeTransferableGraph(processor, format, version, metadata, source, target, TGStatusMonitor.NULL_MONITOR);
441         }
442
443         public static void writeTransferableGraph(RequestProcessor processor, String format, int version, TreeMap<String, Variant> metadata, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
444                 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
445                         DataContainers.writeHeader(out, new DataContainer(format, version, metadata, null));
446                         writeTransferableGraphVariant(processor, source, out, monitor);
447                 }
448         }
449
450         public static void writeTransferableGraph(RequestProcessor processor, TransferableGraphSource source, File target, TGStatusMonitor monitor) throws Exception {
451                 try (RandomAccessBinary out = new BinaryFile(target, 128*1024)) {
452                         writeTransferableGraphVariant(processor, source, out, monitor);
453                 }
454         }
455
456         public static void writeTransferableGraphVariant(RequestProcessor processor, TransferableGraphSource source, RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
457                 Bindings.getSerializerUnchecked(Datatype.class).serialize(out, Datatypes.getDatatypeUnchecked(TransferableGraph1.class));
458                 writeTransferableGraph(processor, source, out, monitor);
459         }
460
461         private static TGStatusMonitor safeMonitor(TGStatusMonitor mon) {
462                 return mon == null ? TGStatusMonitor.NULL_MONITOR : mon;
463         }
464
465         private static class CopyingInputStream extends InputStream {
466                 public DataInput in;
467                 public DataOutput out;
468
469                 @Override
470                 public int read() throws IOException {
471                         int value = in.readUnsignedByte();
472                         out.write(value);
473                         return value;
474                 }
475         }
476
477         private static long copy(byte[] buffer, DataInput in, DataOutput out, long bytesToCopy) throws IOException {
478                 int read = 0;
479                 long bufferLength = buffer.length;
480                 while (read < bytesToCopy) {
481                         int l = (int) Math.min(bufferLength, bytesToCopy-read);
482                         in.readFully(buffer, 0, l);
483                         out.write(buffer, 0, l);
484                         read += l;
485                 }
486                 return read;
487         }
488
489         private static final int LITERAL_VALUE_IO_BUFFER_SIZE = 128 * 1024;
490
491         private static void writeTransferableGraph(RequestProcessor processor, final TransferableGraphSource source, final RandomAccessBinary out, TGStatusMonitor monitor) throws Exception {
492                 long start = System.nanoTime();
493
494                 final Serializer datatypeSerializer = Bindings.getSerializerUnchecked(Datatype.class);
495                 final Serializer identitySerializer = Bindings.getSerializerUnchecked(Identity.class);
496                 final Serializer extensionSerializer = Bindings.getSerializerUnchecked(Extensions.class);
497
498                 int resourceCount = source.getResourceCount();
499                 //System.err.println("resourceCount: " + resourceCount);
500                 out.writeInt(resourceCount);
501                 extensionSerializer.serialize(out, new Extensions(source.getExtensions()));
502
503 //              System.err.println("resource count: " + source.getResourceCount());
504 //              System.err.println("identity count: " + source.getIdentityCount());
505
506                 byte[] buffer = new byte[LITERAL_VALUE_IO_BUFFER_SIZE];
507
508                 processor.syncRequest(new ReadRequest() {
509                         @Override
510                         public void run(ReadGraph graph) throws DatabaseException {
511                                 try {
512                                         if (monitor.isCanceled())
513                                                 throw new CancelTransactionException();
514
515                                         int identityCount = source.getIdentityCount();
516                                         TGStatusMonitor.Updater identityProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 0, 33, identityCount);
517                                         out.writeInt(identityCount);
518                                         //System.err.println("identities: " + identityCount);
519                                         source.forIdentities(graph, value -> {
520                                                 //System.err.println("id: " + value);
521                                                 identitySerializer.serialize(out, value);
522                                                 identityProgress.worked(1);
523                                         });
524
525                                         if (monitor.isCanceled())
526                                                 throw new CancelTransactionException();
527
528                                         long statementCountPos = out.position();
529                                         int originalStatementCount = source.getStatementCount();
530                                         TGStatusMonitor.Updater statementProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 34, 66, originalStatementCount);
531                                         out.writeInt(originalStatementCount);
532                                         //System.err.println("original statementCount: " + originalStatementCount);
533                                         int[] statementCounter = { 0 };
534                                         source.forStatements(graph, r -> {
535                                                 for (int i = 0; i < 4; ++i)
536                                                         out.writeInt(r[i]);
537                                                 statementCounter[0]++;
538                                                 //System.err.println("stm " + (statementCounter[0]) + ": " + r[0] + " " + r[1] + " " + r[2] + " " + r[3]);
539                                                 statementProgress.worked(1);
540                                         });
541                                         //System.err.println("wrote " + statementCounter[0] + " statements, " + (statementCounter[0]*4)+ " integers");
542
543                                         // Rewrite statement count after knowing exactly how many
544                                         // statements were written. It is possible that some
545                                         // statements get filtered out at this stage and the
546                                         // original statement count does not reflect that.
547                                         long afterStatementsPos = out.position();
548                                         out.position(statementCountPos);
549                                         out.writeInt(statementCounter[0]*4);
550                                         out.position(afterStatementsPos);
551
552                                         if (monitor.isCanceled())
553                                                 throw new CancelTransactionException();
554
555                                         int valueCount = source.getValueCount();
556                                         TGStatusMonitor.Updater valueProgress = new TGStatusMonitor.Updater(safeMonitor(monitor), 67, 100, valueCount);
557                                         out.writeInt(valueCount);
558 //                                      System.err.println("valueCount: " + valueCount);
559                                         CopyingInputStream cis = new CopyingInputStream();
560                                         cis.out = out;
561                                         source.forValues2(graph, new TransferableGraphSourceValueProcedure() {
562                                                 TObjectIntHashMap<Object> identities = new TObjectIntHashMap<>();
563
564                                                 @Override
565                                                 public void rawCopy(int resource, int length, DataInput input) throws Exception {
566                                                         out.writeInt(resource);
567                                                         long copied = copy(buffer, input, out, length);
568                                                         assert copied == length;
569                                                         //System.err.println("value " + (num++) + ": raw variant, " + length + " bytes, copied " + copied + " bytes");
570                                                         valueProgress.worked(1);
571                                                 }
572
573                                                 @Override
574                                                 public void execute(int resource, Datatype type, DataInput input) throws Exception {
575                                                         out.writeInt(resource);
576                                                         identities.clear();
577                                                         datatypeSerializer.serialize(out, identities, type);
578                                                         Binding binding = Bindings.getBinding(type);
579                                                         Serializer serializer = Bindings.getSerializer(binding);
580                                                         cis.in = input;
581                                                         serializer.skip(cis);
582                                                         cis.in = null;
583                                                         valueProgress.worked(1);
584                                                 }
585                                         });
586
587                                 } catch (DatabaseException e) {
588                                         throw e;
589                                 } catch (Exception e) {
590                                         throw new DatabaseException(e);
591                                 }
592                         }
593                 });
594
595                 long end = System.nanoTime();
596                 LOGGER.info("Wrote transferable graph in {} seconds.", 1e-9*(end-start));
597         }
598
599         public static TransferableGraph1 create(RequestProcessor processor, TransferableGraphSource source) throws DatabaseException {
600                 
601                 return processor.syncRequest(new UniqueRead<TransferableGraph1>() {
602
603                         @Override
604                         public TransferableGraph1 perform(ReadGraph graph) throws DatabaseException {
605                                 return create(graph, source);
606                         }
607                         
608                 });
609                 
610         }
611
612         public static TransferableGraph1 create(ReadGraph graph, TransferableGraphSource source) throws DatabaseException {
613                 
614                 try {
615
616                         ArrayList<Identity> identities = new ArrayList<>(source.getIdentityCount());
617                         source.forIdentities(graph, i -> identities.add(i));
618                         TIntArrayList statements = new TIntArrayList(source.getStatementCount());
619                         source.forStatements(graph, r -> statements.addAll(r));
620                         ArrayList<Value> values = new ArrayList<>(source.getValueCount());
621                         source.forValues(graph, v -> values.add(v));
622
623                         return new TransferableGraph1(source.getResourceCount(), 
624                                         identities.toArray(new Identity[identities.size()]),
625                                         statements.toArray(),
626                                         values.toArray(new Value[values.size()]),
627                                         source.getExtensions());
628                         
629                 } catch (Exception e) {
630                         
631                         throw new DatabaseException(e);
632                         
633                 }
634                 
635         }
636
637 }