1 /*******************************************************************************
2 * Copyright (c) 2012, 2017 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 * Semantum Oy - e.g. #7016
12 *******************************************************************************/
13 package org.simantics.graph.db;
15 import java.io.DataInput;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
24 import java.util.TreeMap;
26 import org.simantics.databoard.Bindings;
27 import org.simantics.databoard.adapter.AdaptException;
28 import org.simantics.databoard.binding.Binding;
29 import org.simantics.databoard.binding.mutable.Variant;
30 import org.simantics.databoard.serialization.Serializer;
31 import org.simantics.databoard.type.Datatype;
32 import org.simantics.databoard.util.URIStringUtils;
33 import org.simantics.db.ReadGraph;
34 import org.simantics.db.Resource;
35 import org.simantics.db.Session;
36 import org.simantics.db.VirtualGraph;
37 import org.simantics.db.WriteOnlyGraph;
38 import org.simantics.db.common.WriteBindings;
39 import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
40 import org.simantics.db.common.request.PossibleIndexRoot;
41 import org.simantics.db.common.uri.UnescapedChildMapOfResource;
42 import org.simantics.db.common.utils.Logger;
43 import org.simantics.db.exception.DatabaseException;
44 import org.simantics.db.service.ClusterBuilder2;
45 import org.simantics.db.service.ClusterBuilderFactory;
46 import org.simantics.db.service.ClusteringSupport;
47 import org.simantics.db.service.SerialisationSupport;
48 import org.simantics.db.service.XSupport;
49 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure;
50 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
51 import org.simantics.graph.representation.Extensions;
52 import org.simantics.graph.representation.External;
53 import org.simantics.graph.representation.Identity;
54 import org.simantics.graph.representation.IdentityDefinition;
55 import org.simantics.graph.representation.Internal;
56 import org.simantics.graph.representation.Optional;
57 import org.simantics.graph.representation.Root;
58 import org.simantics.graph.representation.TransferableGraphUtils;
59 import org.simantics.graph.utils.TGResourceUtil;
60 import org.simantics.graph.utils.TGResourceUtil.LongAdapter;
61 import org.simantics.utils.datastructures.Pair;
62 import org.slf4j.LoggerFactory;
64 import gnu.trove.map.TIntObjectMap;
65 import gnu.trove.map.hash.TIntObjectHashMap;
67 public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter {
69 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(StreamingTransferableGraphImportProcess.class);
72 TransferableGraphSource tg;
74 IImportAdvisor2 advisor;
75 TGStatusMonitor monitor;
76 ClusterBuilder2 builder;
77 final TGResourceUtil resourceUtil = new TGResourceUtil();
81 Map<String,Integer> allMissingExternals = new HashMap<>();
82 Set<String> missingExternals = new HashSet<>();
83 Map<String,Resource> resolvedParents = new HashMap<>();
84 TIntObjectHashMap<Resource> existingInternalMap = new TIntObjectHashMap<>();
87 Identity[] identities;
88 TreeMap<String, Variant> extensions;
93 Resource ExternalEntity;
102 public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) {
103 this(session, vg, tg, advisor, null);
106 public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) {
109 this.advisor = advisor;
110 this.monitor = monitor;
113 private int updatePercentage(int percentage, int done, int total) {
114 if (monitor != null && (done & 63) == 0) {
115 int current = 100*done / total;
116 if (current > percentage) {
117 percentage = current;
118 monitor.status(percentage);
124 public void readIdentities(ReadGraph g) throws Exception {
125 extensions = tg.getExtensions();
126 resourceCount = tg.getResourceCount();
127 identities = new Identity[tg.getIdentityCount()];
128 tg.forIdentities(g, new TransferableGraphSourceProcedure<Identity>() {
133 public void execute(Identity value) throws Exception {
134 identities[counter++] = value;
139 public void findBuiltins(WriteOnlyGraph g) throws DatabaseException {
140 RootLibrary = g.getBuiltin("http:/");
141 String = g.getBuiltin(CoreInitialization.LAYER0 + "String");
142 Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");
143 InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");
144 ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");
145 PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");
146 HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");
147 NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");
148 ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity");
151 public void findBuiltins(ReadGraph g) throws DatabaseException {
152 RootLibrary = g.getBuiltin("http:/");
153 String = g.getBuiltin(CoreInitialization.LAYER0 + "String");
154 Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");
155 InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");
156 ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");
157 PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");
158 HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");
159 NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");
160 ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity");
163 void addMissing(int handleIndex, String external) {
164 allMissingExternals.put(external, handleIndex);
165 Set<String> removals = new HashSet<>();
166 for(String ext : missingExternals) if(ext.startsWith(external + "/")) return;
167 for(String ext : missingExternals) if(external.startsWith(ext + "/")) removals.add(ext);
168 missingExternals.removeAll(removals);
169 missingExternals.add(external);
172 void prepare(ReadGraph graph) throws Exception {
174 Resource target = advisor.getTarget();
176 indexRoot = graph.syncRequest(new PossibleIndexRoot(target));
179 readIdentities(graph);
181 // System.err.println("ext: " + extensions);
182 // System.err.println("rc: " + resourceCount);
183 // System.err.println("ic: " + identities.length);
185 ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);
186 ClusterBuilder2 builder = factory.create(vg, false);
188 this.handles = new int[resourceCount];
189 TIntObjectMap<Identity> identityMap = TransferableGraphUtils.mapIdentities(identities);
191 // We must process roots first, because internal identifiers depend on them.
192 for(Identity identity : identities) {
193 IdentityDefinition definition = identity.definition;
194 if (definition instanceof Root) {
195 Root root = (Root) definition;
196 if (root.name.equals(""))
197 handles[identity.resource] = builder.handle(RootLibrary);
199 Resource existing = advisor.analyzeRoot(graph, root);
200 if (existing != null)
201 handles[identity.resource] = builder.handle(existing);
206 for(Identity identity : identities) {
207 IdentityDefinition definition = identity.definition;
208 if(definition instanceof External) {
209 External def = (External)definition;
210 if(def.parent == -1) {
211 handles[identity.resource] = builder.handle(RootLibrary);
213 if("@inverse".equals(def.name)) {
214 int parent = handles[def.parent];
215 int child = builder.handle(graph.getInverse(builder.resource(parent)));
216 handles[identity.resource] = child;
218 int handle = handles[def.parent];
219 Resource parent = handle != 0 ? builder.resource(handle) : null;
220 // TODO: escape should be removed when names become well-behaving
222 resolvedParents.put(graph.getURI(parent), parent);
223 Map<String,Resource> childMap = graph
224 .syncRequest(new UnescapedChildMapOfResource(parent),
225 TransientCacheAsyncListener.instance());
226 Resource child = childMap.get(def.name);
228 addMissing(identity.resource, graph.getURI(parent) + "/" + URIStringUtils.escape(def.name));
230 handles[identity.resource] = builder.handle(child);
233 addMissing(identity.resource, TransferableGraphUtils.getURI(resourceCount, identityMap, def.parent) + "/" + URIStringUtils.escape(def.name));
238 else if(definition instanceof Internal) {
239 String uri = TransferableGraphUtils.getURI(resourceCount, identityMap, identity.resource);
240 Resource existing = graph.getPossibleResource(uri);
241 if(existing != null) {
242 existingInternalMap.put(identity.resource, existing);
245 else if(definition instanceof Optional) {
246 External def = (External)definition;
247 Resource parent = builder.resource(handles[def.parent]);
249 handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name));
253 if (!missingExternals.isEmpty() && failOnMissingEntities())
254 throw new MissingDependencyException(this);
257 private boolean failOnMissingEntities() {
258 return "true".equalsIgnoreCase(
260 "org.simantics.tg.import.failOnMissingEntities",
265 public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException {
266 //System.err.println("child " + parent + " - " + child + " = " + name);
267 if(child == null) child = graph.newResource();
268 Resource nameResource = graph.newResource();
269 graph.claim(nameResource, InstanceOf, null, String);
270 graph.claimValue(nameResource, name, WriteBindings.STRING);
271 graph.claim(child, HasName, NameOf, nameResource);
272 graph.claim(parent, ConsistsOf, PartOf, child);
276 int[] getClustering() {
279 Variant v = extensions.get(Extensions.CLUSTERING);
280 if(v == null) return null;
282 return (int[])v.getValue(Bindings.INT_ARRAY);
283 } catch (AdaptException e) {
284 Logger.defaultLogError(e);
289 int[] getClusterSets() {
292 Variant v = extensions.get(Extensions.CLUSTER_SETS);
293 if(v == null) return null;
295 return (int[])v.getValue(Bindings.INT_ARRAY);
296 } catch (AdaptException e) {
297 Logger.defaultLogError(e);
302 boolean needTranslation(Datatype type) {
303 return resourceUtil.mayHaveResource(type);
306 void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException {
307 ClusteringSupport support = graph.getService(ClusteringSupport.class);
308 if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return;
309 Resource indexRootClusterSetResource = rootLibrary;
310 if(indexRoot != null && support.isClusterSet(indexRoot)) {
311 indexRootClusterSetResource = indexRoot;
313 graph.setClusterSet4NewResource(rootLibrary);
314 graph.flushCluster();
316 int indexRootCsHandle = builder.handle(indexRootClusterSetResource);
317 for(int pos=0,index=0;index<clustering.length;index++) {
318 pos += clustering[index];
320 int cs = clusterSets[index];
321 if(handles[id] == 0) {
323 if(cs == Extensions.ROOT_LIBRARY_CLUSTER_SET) csHandle = builder.handle(rootLibrary);
324 else if(cs == Extensions.INDEX_ROOT_CLUSTER_SET) {
325 if(indexRoot == null) throw new DatabaseException("No index root was available in TG import.");
326 csHandle = indexRootCsHandle;
329 findClusterSet(graph, rootLibrary, clustering, clusterSets, clusters, cs);
330 csHandle = handles[cs];
333 if(clusters[index] != 0)
334 builder.selectCluster(clusters[index]);
336 builder.newCluster(csHandle);
338 handles[id] = builder.newResource(csHandle);
339 clusters[index] = support.getCluster(builder.resource(handles[id]));
341 builder.createClusterSet(handles[id]);
348 void createMissing(final WriteOnlyGraph graph) throws Exception {
350 if(allMissingExternals.isEmpty()) return;
352 XSupport xs = graph.getService(XSupport.class);
353 Pair<Boolean,Boolean> serviceMode = xs.getServiceMode();
354 xs.setServiceMode(true, false);
356 ArrayList<String> missing = new ArrayList<>(allMissingExternals.keySet());
357 Collections.sort(missing);
358 for(String uri : missing) {
359 String[] parts = URIStringUtils.splitURI(uri);
360 // URIStringUtils.splitURI returns root URI in non-standard format, so fix it manually as a workaround
361 if (parts[0].equals("http://")) {
365 Resource parent = resolvedParents.get(parts[0]);
366 // TODO: proper exception message
368 throw new IllegalStateException("Missing URI: " + uri);
371 Resource childResource = graph.newResource();
372 graph.claim(childResource, InstanceOf, null, ExternalEntity);
374 Resource nameResource = graph.newResource();
375 graph.claim(nameResource, InstanceOf, null, String);
376 graph.claimValue(nameResource, URIStringUtils.unescape(parts[1]), WriteBindings.STRING);
377 graph.claim(childResource, HasName, NameOf, nameResource);
379 graph.claim(parent, ConsistsOf, PartOf, childResource);
381 resolvedParents.put(uri, childResource);
383 handles[allMissingExternals.get(uri)] = builder.handle(childResource);
386 xs.setServiceMode(serviceMode.first, serviceMode.second);
390 void write(final WriteOnlyGraph graph) throws Exception {
392 final SerialisationSupport ss = graph.getService(SerialisationSupport.class);
394 ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);
395 if(advisor instanceof IImportAdvisor2) {
396 boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications();
397 builder = factory.create(vg, allowImmutable);
399 builder = factory.create(vg, false);
402 createMissing(graph);
404 final int[] handles = this.handles;
406 int[] clustering = getClustering();
407 if(clustering != null) {
409 int[] clusterSets = getClusterSets();
410 if(clusterSets != null) {
412 assert(clustering.length == clusterSets.length);
414 long[] clusters = new long[clustering.length];
417 for(int i=0;i<clusterSets.length;i++) {
418 findClusterSet(graph, graph.getRootLibrary(), clustering, clusterSets, clusters, clusterSets[i]);
421 // Then create all resources
423 for(int j=0;j<clustering.length;j++) {
424 int c = clustering[j];
425 int s = clusterSets[j];
427 if(s == Extensions.ROOT_LIBRARY_CLUSTER_SET)
428 setHandle = builder.handle(graph.getRootLibrary());
429 else if(s == Extensions.INDEX_ROOT_CLUSTER_SET) {
430 if(indexRoot == null) throw new DatabaseException("No index root was available in TG import.");
431 setHandle = builder.handle(indexRoot);
433 else setHandle = handles[s];
434 // Preserve clustering only for internal resources
436 builder.selectCluster(clusters[j]);
438 builder.newCluster(setHandle);
439 for(int r=0;r<c;r++, i++)
440 if(handles[i] == 0) handles[i] = builder.newResource();
443 for(;i<handles.length;++i)
444 if(handles[i] == 0) handles[i] = builder.newResource();
449 for(int c : clustering) {
450 builder.newCluster();
451 for(int r=0;r<c;r++, i++)
452 if(handles[i] == 0) handles[i] = builder.newResource();
455 for(;i<handles.length;++i)
456 if(handles[i] == 0) handles[i] = builder.newResource();
462 // Create blank resources
463 for(int i=0;i<handles.length;++i)
464 if(handles[i] == 0) handles[i] = builder.newResource();
468 // Internal identities
469 for(Identity identity : identities) {
470 IdentityDefinition definition = identity.definition;
471 // if(handles[identity.resource] != 0)
473 if(definition instanceof External) {
474 // Already done everything
476 else if(definition instanceof Internal) {
477 Internal def = (Internal)definition;
478 Resource external = existingInternalMap.get(identity.resource);
479 if(external != null) {
480 handles[identity.resource] = builder.handle(external);
482 if(handles[identity.resource] != 0)
483 handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name));
485 handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name));
489 else if(definition instanceof Root) {
491 Root root = (Root)definition;
492 if(handles[identity.resource] != 0)
493 handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, builder.resource(handles[identity.resource])));
495 handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, null));
497 else if(definition instanceof Optional) {
498 Optional def = (Optional)definition;
499 if(handles[identity.resource] != 0) {
500 Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name);
501 graph.claim(child, InstanceOf, null, Library); // ???
502 handles[identity.resource] = builder.handle(child);
504 Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name);
505 graph.claim(child, InstanceOf, null, Library); // ???
506 handles[identity.resource] = builder.handle(child);
512 int[] percentage = { 0 };
514 int statementCount = tg.getStatementCount();
515 tg.forStatements(null, new TransferableGraphSourceProcedure<int[]>() {
518 public void execute(int[] value) throws Exception {
525 int subject = handles[sub];
526 int predicate = handles[pred];
527 int object = handles[obj];
529 builder.addStatement(graph, subject, predicate, object);
531 int inverse = handles[inv];
532 builder.addStatement(graph, object, inverse, subject);
535 // Count from 0% -> 50% => total = statementCount*2
536 percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2);
542 int valueCount = tg.getValueCount();
545 class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure {
547 private TGResourceUtil util = new TGResourceUtil();
548 private DataInput source;
551 public void execute(int _resource, Datatype type, DataInput stream) throws Exception {
555 //int file = _resource & 0x80000000;
556 int resource = _resource & 0x7FFFFFFF;
558 Binding binding = Bindings.getBinding(type);
559 Serializer s = Bindings.getSerializer(binding);
561 builder.beginValue(handles[resource]);
562 if(util.mayHaveResource(type)) {
563 Object value = s.deserialize(stream);
564 util.adaptValue( binding, value, new LongAdapter() {
566 public long adapt(long in) {
568 return ss.getRandomAccessId(handles[(int)in]);
569 } catch (DatabaseException e) {
570 throw new IllegalStateException(e);
574 byte[] bytes = s.serialize(value);
575 for(byte b : bytes) {
577 if(val < 0) val += 256;
578 builder.appendValue(val);
589 public int read() throws IOException {
590 int value = source.readUnsignedByte();
592 builder.appendValue(value);
593 } catch (DatabaseException e) {
594 LOGGER.error("Failed to write value into database", e);
600 public void rawCopy(int resource, int length, DataInput input) throws Exception {
601 builder.beginValue(handles[resource]);
602 for (int i = 0; i < length; ++i)
603 builder.appendValue(input.readUnsignedByte());
608 private void work() {
609 // Count from 50% -> 100% => [valueCount, valueCount*2)
610 percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2);
614 tg.forValues2(null, new ValueProcedure());
616 for(Resource r : existingInternalMap.valueCollection()) {
618 graph.deny(r, InstanceOf, null, ExternalEntity, null);
619 } catch (DatabaseException e) {
620 graph.deny(r, InstanceOf, null, ExternalEntity, vg);
627 public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException {
628 final int count = handles.length;
629 long[] resourceIds = new long[count];
630 for(int i=0;i<count;++i)
631 resourceIds[i] = serializer.getRandomAccessId(handles[i]);