]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java
StreamingTransferableGraphImportProcess assumes index root cluster set
[simantics/platform.git] / bundles / org.simantics.graph.db / src / org / simantics / graph / db / StreamingTransferableGraphImportProcess.java
1 /*******************************************************************************
2  * Copyright (c) 2012, 2016 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy
12  *******************************************************************************/
13 package org.simantics.graph.db;
14
15 import java.io.DataInput;
16 import java.io.DataOutput;
17 import java.io.DataOutputStream;
18 import java.io.FileNotFoundException;
19 import java.io.FileOutputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.util.HashSet;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.TreeMap;
26
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.adapter.AdaptException;
29 import org.simantics.databoard.binding.Binding;
30 import org.simantics.databoard.binding.mutable.Variant;
31 import org.simantics.databoard.serialization.Serializer;
32 import org.simantics.databoard.type.Datatype;
33 import org.simantics.db.ReadGraph;
34 import org.simantics.db.Resource;
35 import org.simantics.db.Session;
36 import org.simantics.db.VirtualGraph;
37 import org.simantics.db.WriteOnlyGraph;
38 import org.simantics.db.common.WriteBindings;
39 import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
40 import org.simantics.db.common.request.PossibleIndexRoot;
41 import org.simantics.db.common.uri.UnescapedChildMapOfResource;
42 import org.simantics.db.common.utils.Logger;
43 import org.simantics.db.exception.DatabaseException;
44 import org.simantics.db.service.ClusterBuilder2;
45 import org.simantics.db.service.ClusterBuilderFactory;
46 import org.simantics.db.service.ClusteringSupport;
47 import org.simantics.db.service.SerialisationSupport;
48 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure;
49 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
50 import org.simantics.graph.representation.Extensions;
51 import org.simantics.graph.representation.External;
52 import org.simantics.graph.representation.Identity;
53 import org.simantics.graph.representation.IdentityDefinition;
54 import org.simantics.graph.representation.Internal;
55 import org.simantics.graph.representation.Optional;
56 import org.simantics.graph.representation.Root;
57 import org.simantics.graph.representation.TransferableGraphUtils;
58 import org.simantics.graph.utils.TGResourceUtil;
59 import org.simantics.graph.utils.TGResourceUtil.LongAdapter;
60
61 public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter {
62         
63         public static String LOG_FILE = "transferableGraphs.log";
64         final static private boolean LOG = false;
65         
66         static DataOutput log;
67
68         static {
69
70                 if (LOG) {
71                         
72                         try {
73                                 FileOutputStream stream = new FileOutputStream(LOG_FILE);
74                                 log = new DataOutputStream(stream);
75                         } catch (FileNotFoundException e) {
76                                 e.printStackTrace();
77                         }
78                 }
79
80         }
81         
82         private static void log(String line) {
83                 if (LOG) {
84                         try {
85                                 log.writeUTF(line + "\n");
86                         } catch (IOException e) {
87                                 e.printStackTrace();
88                         }
89                 }
90         }
91
92         Resource indexRoot;
93         TransferableGraphSource tg;
94         VirtualGraph vg;
95         IImportAdvisor2 advisor;
96         TGStatusMonitor monitor;
97         ClusterBuilder2 builder;
98         final TGResourceUtil resourceUtil = new TGResourceUtil();
99         
100         int[] handles;
101         
102         Set<String> missingExternals = new HashSet<String>(); 
103         
104         int resourceCount;
105         Identity[] identities;
106         TreeMap<String, Variant> extensions;
107         
108         // Builtins
109         Resource RootLibrary;
110         Resource String;
111         Resource Library;
112         
113         Resource InstanceOf;
114         Resource ConsistsOf;
115         Resource PartOf;
116         Resource HasName;
117         Resource NameOf;        
118
119         public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) {
120                 this(session, vg, tg, advisor, null);
121         }
122
123         public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) {
124                 this.tg = tg;
125                 this.vg = vg;
126                 this.advisor = advisor;
127                 this.monitor = monitor;
128         }
129
130         private int updatePercentage(int percentage, int done, int total) {
131                 if (monitor != null && (done & 63) == 0) {
132                         int current = 100*done / total;
133                         if (current > percentage) {
134                                 percentage = current;
135                                 monitor.status(percentage);
136                         }
137                 }
138                 return percentage;
139         }
140
141         public void readIdentities(ReadGraph g) throws Exception {
142                 extensions = tg.getExtensions();
143                 resourceCount = tg.getResourceCount();
144                 identities = new Identity[tg.getIdentityCount()];
145                 tg.forIdentities(g, new TransferableGraphSourceProcedure<Identity>() {
146                         
147                         int counter = 0;
148                         
149                         @Override
150                         public void execute(Identity value) throws Exception {
151                                 identities[counter++] = value;
152                         }
153                 });
154         }
155         
156         public void findBuiltins(WriteOnlyGraph g) throws DatabaseException {
157                 RootLibrary = g.getBuiltin("http:/");
158                 String = g.getBuiltin(CoreInitialization.LAYER0 + "String");
159                 Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");
160                 InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");
161                 ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");
162                 PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");
163                 HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");
164                 NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");
165         }
166         
167         public void findBuiltins(ReadGraph g) throws DatabaseException {
168                 RootLibrary = g.getBuiltin("http:/");
169                 String = g.getBuiltin(CoreInitialization.LAYER0 + "String");
170                 Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");
171                 InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");
172                 ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");
173                 PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");
174                 HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");
175                 NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");
176         }
177
178 //      /* Preparation that is used when the core is empty. 
179 //       */
180 //      void initialPrepare(WriteOnlyGraph graph) throws DatabaseException {
181 //              findBuiltins(graph);
182 //              
183 //              resources = new Resource[tg.resourceCount];
184 //              
185 //              int Root = -1;
186 //              int SimanticsDomain = -1;
187 //              int Layer0 = -1;
188 //              
189 //              for(Identity identity : tg.identities) {
190 //                      if(identity.definition instanceof Internal) {
191 //                              Internal def = (Internal)identity.definition;
192 //                              Resource res = null;
193 //                              if(def.parent == Layer0) {
194 //                                      try {
195 //                                              res = graph.getBuiltin(CoreInitialization.LAYER0 + def.name);
196 //                                      } catch(ResourceNotFoundException e) {                                                                          
197 //                                      }
198 //                              }
199 //                              else if(def.parent == SimanticsDomain) {
200 //                                      if(def.name.equals("Layer0-1.0"))
201 //                                              Layer0 = identity.resource;
202 //                              }
203 //                              else if(def.parent == Root) {
204 //                                      if(def.name.equals("www.simantics.org"))
205 //                                              SimanticsDomain = identity.resource;
206 //                              }
207 //
208 //                              if(res == null)
209 //                                      res = createChild(graph, resources[def.parent], def.name);
210 //                              else
211 //                                      createChild(graph, res, resources[def.parent], def.name);
212 //                              resources[identity.resource] = res;
213 //                      }
214 //                      else if(identity.definition instanceof Root) {
215 //                              Root = identity.resource;
216 //                              resources[identity.resource] = RootLibrary;                             
217 //                      } 
218 //              }
219 //      }
220         
221         void addMissing(String external) {
222                 Set<String> removals = new HashSet<String>();
223                 for(String ext : missingExternals) if(ext.startsWith(external)) return;
224                 for(String ext : missingExternals) if(external.startsWith(ext)) removals.add(ext);
225                 missingExternals.removeAll(removals);
226                 missingExternals.add(external);
227         }
228         
229         void prepare(ReadGraph graph) throws Exception {
230
231                 Resource target = advisor.getTarget();
232                 if(target != null)
233                         indexRoot = graph.syncRequest(new PossibleIndexRoot(target));
234                 
235                 findBuiltins(graph);
236                 readIdentities(graph);
237                 
238 //              System.err.println("ext: " + extensions);
239 //              System.err.println("rc: " + resourceCount);
240 //              System.err.println("ic: " + identities.length);
241                 
242                 ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);
243                 ClusterBuilder2 builder = factory.create(vg, false);
244                 
245         this.handles = new int[resourceCount];
246                 
247                 for(Identity identity : identities) {
248                         IdentityDefinition definition = identity.definition;
249                         if(definition instanceof External) {
250                                 External def = (External)definition;
251                                 if(def.parent == -1) {
252                                     handles[identity.resource] = builder.handle(RootLibrary);
253                                 } else {
254                                         if("@inverse".equals(def.name)) {
255                                                 int parent = handles[def.parent];
256                                                 int child = builder.handle(graph.getInverse(builder.resource(parent)));
257                                                 handles[identity.resource] = child;
258                                         } else {
259                                                 int handle = handles[def.parent];
260                                                 Resource parent = handle != 0 ? builder.resource(handle) : null;
261                                                 // TODO: escape should be removed when names become well-behaving
262                                                 if(parent != null) {
263                                                     Map<String,Resource> childMap = graph
264                                             .syncRequest(new UnescapedChildMapOfResource(parent),
265                                                     new TransientCacheAsyncListener<Map<String, Resource>>()); 
266                                                         Resource child = childMap.get(def.name); 
267                                                         if(child == null) {
268                                                                 addMissing(graph.getURI(parent) + "/" + def.name);
269                                                         } else {
270                                                                 handles[identity.resource] = builder.handle(child);
271                                                         }
272                                                 } else {
273                                                     addMissing(TransferableGraphUtils.getURI(resourceCount, identities, def.parent) + "/" + def.name);
274                                                 }
275                                         }
276                                 }
277                         }
278                         else if(definition instanceof Internal) {
279                                 // Do not do anything for now
280                         }
281                         else if(definition instanceof Root) {
282                                 Root root = (Root)definition;
283                                 if(root.name.equals(""))
284                                     handles[identity.resource] = builder.handle(RootLibrary);
285                                 else  {
286                                         Resource existing = advisor.analyzeRoot(graph, root);
287                                         if(existing != null)
288                                             handles[identity.resource] = builder.handle(existing);
289                                 }
290                         }
291                         else if(definition instanceof Optional) {
292                                 External def = (External)definition;
293                                 Resource parent = builder.resource(handles[def.parent]);
294                                 if(parent != null)
295                                         handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name));                          
296                         }
297                 }               
298                 
299                 if(!missingExternals.isEmpty()) throw new MissingDependencyException(this);
300                 
301         }
302
303         @Override
304         public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException {
305             if(child == null) child = graph.newResource();
306                 Resource nameResource = graph.newResource();
307                 graph.claim(nameResource, InstanceOf, null, String);
308                 graph.claimValue(nameResource, name, WriteBindings.STRING);
309                 graph.claim(child, HasName, NameOf, nameResource);
310                 return child;
311         }
312         
313         int[] getClustering() {
314                 Variant v = extensions.get(Extensions.CLUSTERING);
315                 if(v == null) return null;
316                 try {
317                         return (int[])v.getValue(Bindings.INT_ARRAY);
318                 } catch (AdaptException e) {
319                         Logger.defaultLogError(e);
320                         return null;
321                 }
322         }
323
324         int[] getClusterSets() {
325                 Variant v = extensions.get(Extensions.CLUSTER_SETS);
326                 if(v == null) return null;
327                 try {
328                         return (int[])v.getValue(Bindings.INT_ARRAY);
329                 } catch (AdaptException e) {
330                         Logger.defaultLogError(e);
331                         return null;
332                 }
333         }
334
335         boolean needTranslation(Datatype type) {
336             return resourceUtil.mayHaveResource(type);
337         }
338         
339         void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException {
340                 ClusteringSupport support = graph.getService(ClusteringSupport.class);
341                 if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return;
342                 Resource indexRootClusterSetResource = rootLibrary;
343                 if(support.isClusterSet(indexRoot)) {
344                         indexRootClusterSetResource = indexRoot;
345                 } else {
346                         graph.setClusterSet4NewResource(rootLibrary);
347                         graph.flushCluster();                   
348                 }
349                 int indexRootCsHandle = builder.handle(indexRootClusterSetResource);
350                 for(int pos=0,index=0;index<clustering.length;index++) {
351                         pos += clustering[index];
352                         if(id < pos) {
353                                 int cs = clusterSets[index]; 
354                                 if(handles[id] == 0) {
355                                         int csHandle = 0;
356                                         if(cs == Extensions.ROOT_LIBRARY_CLUSTER_SET) csHandle = builder.handle(rootLibrary);
357                                         else if(cs == Extensions.INDEX_ROOT_CLUSTER_SET) {
358                                                 if(indexRoot == null) throw new DatabaseException("No index root was available in TG import.");
359                                                 csHandle = indexRootCsHandle;
360                                         }
361                                         else {
362                                                 findClusterSet(graph, rootLibrary, clustering, clusterSets, clusters, cs);
363                                                 csHandle = handles[cs];
364                                         }
365                                         
366                                         if(clusters[index] != 0)
367                                                 builder.selectCluster(clusters[index]);
368                                         else if(cs >= 0)
369                                                 builder.newCluster(csHandle);
370                                         
371                                         handles[id] = builder.newResource(csHandle);
372                                         clusters[index] = support.getCluster(builder.resource(handles[id]));
373                                                         
374                                         builder.createClusterSet(handles[id]);
375                                 }
376                                 return;
377                         }
378                 }
379         }
380         
381         void write(final WriteOnlyGraph graph) throws Exception {
382         
383         final SerialisationSupport ss = graph.getService(SerialisationSupport.class);
384             
385         ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);
386         if(advisor instanceof IImportAdvisor2) {
387             boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications();
388             builder = factory.create(vg, allowImmutable);
389         } else {
390             builder = factory.create(vg, false);
391         }
392         
393                 final int[] handles = this.handles; 
394                 
395                 int[] clustering = getClustering();
396                 if(clustering != null) {
397                         
398                         int[] clusterSets = getClusterSets();
399                         if(clusterSets != null) {
400
401                                 assert(clustering.length == clusterSets.length);
402
403                                 long[] clusters = new long[clustering.length];
404                                 
405                                 // Create clustering
406                                 for(int i=0;i<clusterSets.length;i++) {
407                                         findClusterSet(graph, graph.getRootLibrary(), clustering, clusterSets, clusters, clusterSets[i]);
408                                 }
409                                 
410                                 // Then create all resources
411                                 int i=0;
412                             for(int j=0;j<clustering.length;j++) {
413                                 int c = clustering[j];
414                                 int s = clusterSets[j];
415                                 int setHandle = 0;
416                                         if(s == Extensions.ROOT_LIBRARY_CLUSTER_SET)
417                                                 setHandle = builder.handle(graph.getRootLibrary());
418                                         else if(s == Extensions.INDEX_ROOT_CLUSTER_SET) {
419                                                 if(indexRoot == null) throw new DatabaseException("No index root was available in TG import.");
420                                                 setHandle = builder.handle(indexRoot);
421                                         }
422                                         else setHandle = handles[s];
423                                         // Preserve clustering only for internal resources
424                                         if(clusters[j] != 0)
425                                                 builder.selectCluster(clusters[j]);
426                                         else if(s >= 0)
427                                                 builder.newCluster(setHandle);
428                                         for(int r=0;r<c;r++, i++)
429                                                 if(handles[i] == 0) handles[i] = builder.newResource();
430                                 }
431
432                                 for(;i<handles.length;++i)
433                                         if(handles[i] == 0) handles[i] = builder.newResource();
434                                 
435                         } else {
436
437                         int i = 0;
438                                 for(int c : clustering) {
439                                         builder.newCluster();
440                                         for(int r=0;r<c;r++, i++)
441                                                 if(handles[i] == 0) handles[i] = builder.newResource();
442                                 }
443
444                                 for(;i<handles.length;++i)
445                                         if(handles[i] == 0) handles[i] = builder.newResource();
446                                 
447                         }
448                         
449                 } else {
450                 
451                         // Create blank resources
452                         for(int i=0;i<handles.length;++i)
453                                 if(handles[i] == 0) handles[i] = builder.newResource();
454
455                 }
456                 
457                 // Internal identities          
458                 for(Identity identity : identities) {
459                         IdentityDefinition definition = identity.definition;
460 //                      if(handles[identity.resource] != 0)
461 //                              continue;
462                         if(definition instanceof External) {
463                                 // Already done everything
464                         }
465                         else if(definition instanceof Internal) {
466                                 Internal def = (Internal)definition;
467                                 if(handles[identity.resource] != 0)
468                                         handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name));
469                                 else
470                                         handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name));
471                         }
472                         else if(definition instanceof Root) {
473                                 
474                                 Root root = (Root)definition;
475                                 if(handles[identity.resource] != 0)
476                                         handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, builder.resource(handles[identity.resource])));
477                                 else
478                                         handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, null));
479                         }
480                         else if(definition instanceof Optional) {
481                                 Optional def = (Optional)definition;
482                                 if(handles[identity.resource] != 0) {
483                                         Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name);
484                                         graph.claim(child, InstanceOf, null, Library); // ???
485                                         handles[identity.resource] = builder.handle(child);
486                                 } else {
487                                         Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name);
488                                         graph.claim(child, InstanceOf, null, Library); // ???
489                                         handles[identity.resource] = builder.handle(child);
490                                 }
491                         }
492                 }               
493
494                 int[] done = { 0 };
495                 int[] percentage = { 0 };
496
497                 int statementCount = tg.getStatementCount();
498                 tg.forStatements(null, new TransferableGraphSourceProcedure<int[]>() {
499
500                         @Override
501                         public void execute(int[] value) throws Exception {
502                                 
503                                 int sub = value[0];
504                                 int pred = value[1];
505                                 int inv = value[2];
506                                 int obj = value[3];
507
508                                 int subject = handles[sub];
509                                 int predicate = handles[pred];
510                                 int object = handles[obj];
511
512                                 builder.addStatement(graph, subject, predicate, object);        
513                                 if(inv >= 0) {
514                                     int inverse = handles[inv];
515                                     builder.addStatement(graph, object, inverse, subject);    
516                                 }
517
518                                 // Count from 0% -> 50% => total = statementCount*2
519                                 percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2);
520                 
521                         }
522                         
523                 }); 
524                 
525                 int valueCount = tg.getValueCount();
526                 done[0] = 0;
527
528                 class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure {
529
530             private TGResourceUtil util = new TGResourceUtil();
531                     private DataInput source;
532
533             @Override
534             public void execute(int _resource, Datatype type, DataInput stream) throws Exception {
535
536                 source = stream;
537
538                 //int file = _resource & 0x80000000;
539                 int resource = _resource & 0x7FFFFFFF;
540
541                 Binding binding = Bindings.getBinding(type);
542                 Serializer s = Bindings.getSerializer(binding);
543
544                 builder.beginValue(handles[resource]);
545                 if(util.mayHaveResource(type)) {
546                     Object value = s.deserialize(stream);
547                     util.adaptValue( binding,  value, new LongAdapter() {
548                                 @Override
549                                 public long adapt(long in) {
550                                     try {
551                                         return ss.getRandomAccessId(handles[(int)in]);
552                                     } catch (DatabaseException e) {
553                                         throw new IllegalStateException(e);
554                                     }
555                                 }
556                             });
557                     byte[] bytes = s.serialize(value);
558                     for(byte b : bytes) {
559                         int val = b;
560                         if(val < 0) val += 256;
561                         builder.appendValue(val);
562                     }
563                 } else {
564                     s.skip(this);
565                 }
566                 builder.endValue();
567                 worked();
568
569             }
570
571             @Override
572             public int read() throws IOException {
573                 int value = source.readUnsignedByte();
574                 try {
575                     builder.appendValue(value);
576                 } catch (DatabaseException e) {
577                     e.printStackTrace();
578                 }
579                 return value;
580             }
581
582             @Override
583             public void rawCopy(int resource, int length, DataInput input) throws Exception {
584                 builder.beginValue(handles[resource]);
585                 for (int i = 0; i < length; ++i)
586                     builder.appendValue(input.readUnsignedByte());
587                 builder.endValue();
588                 worked();
589             }
590
591             private void worked() {
592                 // Count from 50% -> 100% => [valueCount, valueCount*2)
593                 percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2);
594             }
595                 };
596                 
597                 tg.forValues2(null, new ValueProcedure());
598                 
599         }
600         
601         @Override
602         public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException {
603                 final int count = handles.length;
604                 long[] resourceIds = new long[count];
605                 for(int i=0;i<count;++i)
606                     resourceIds[i] = serializer.getRandomAccessId(handles[i]);
607                 return resourceIds;
608         }
609 }