X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.graph.db%2Fsrc%2Forg%2Fsimantics%2Fgraph%2Fdb%2FStreamingTransferableGraphImportProcess.java;h=e61937fc9e90aaf92d2dc57767d35c4545d6b91e;hp=48cebf342c2cfe5566a3fb325bd33994437b20f7;hb=55feb8ea3a74bf5ce73619d62d4c73e576ae89fc;hpb=0ae2b770234dfc3cbb18bd38f324125cf0faca07 diff --git a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java index 48cebf342..e61937fc9 100644 --- a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java +++ b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2012, 2016 Association for Decentralized Information Management + * Copyright (c) 2012, 2017 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -8,17 +8,16 @@ * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation - * Semantum Oy + * Semantum Oy - e.g. #7016 *******************************************************************************/ package org.simantics.graph.db; import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,6 +29,7 @@ import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.util.URIStringUtils; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; @@ -45,6 +45,7 @@ import org.simantics.db.service.ClusterBuilder2; import org.simantics.db.service.ClusterBuilderFactory; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.XSupport; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; import org.simantics.graph.representation.Extensions; @@ -57,70 +58,69 @@ import org.simantics.graph.representation.Root; import org.simantics.graph.representation.TransferableGraphUtils; import org.simantics.graph.utils.TGResourceUtil; import org.simantics.graph.utils.TGResourceUtil.LongAdapter; +import org.simantics.utils.datastructures.Pair; +import org.slf4j.LoggerFactory; -public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter { - - public static String LOG_FILE = "transferableGraphs.log"; - final static private boolean LOG = false; - - static DataOutput log; - - static { +import gnu.trove.map.TIntObjectMap; +import gnu.trove.map.hash.TIntObjectHashMap; - if (LOG) { - - try { - FileOutputStream stream = new FileOutputStream(LOG_FILE); - log = new DataOutputStream(stream); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - } +public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter { - } - - private static void log(String line) { - if (LOG) { - try { - log.writeUTF(line + "\n"); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(StreamingTransferableGraphImportProcess.class); Resource indexRoot; TransferableGraphSource tg; VirtualGraph vg; IImportAdvisor2 advisor; + TGStatusMonitor monitor; ClusterBuilder2 builder; final TGResourceUtil resourceUtil = new TGResourceUtil(); - + int[] handles; - - Set missingExternals = new HashSet(); - + + Map allMissingExternals = new HashMap<>(); + Set missingExternals = new HashSet<>(); + Map resolvedParents = new HashMap<>(); + TIntObjectHashMap existingInternalMap = new TIntObjectHashMap<>(); + int resourceCount; Identity[] identities; TreeMap extensions; - + // Builtins Resource RootLibrary; Resource String; + Resource ExternalEntity; Resource Library; - + Resource InstanceOf; Resource ConsistsOf; Resource PartOf; Resource HasName; Resource NameOf; - + public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) { + this(session, vg, tg, advisor, null); + } + + public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) { this.tg = tg; this.vg = vg; this.advisor = advisor; + this.monitor = monitor; + } + + private int updatePercentage(int percentage, int done, int total) { + if (monitor != null && (done & 63) == 0) { + int current = 100*done / total; + if (current > percentage) { + percentage = current; + monitor.status(percentage); + } + } + return percentage; } - + public void readIdentities(ReadGraph g) throws Exception { extensions = tg.getExtensions(); resourceCount = tg.getResourceCount(); @@ -135,7 +135,7 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap } }); } - + public void findBuiltins(WriteOnlyGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); @@ -145,8 +145,9 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); + ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity"); } - + public void findBuiltins(ReadGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); @@ -156,59 +157,18 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); + ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity"); } -// /* Preparation that is used when the core is empty. -// */ -// void initialPrepare(WriteOnlyGraph graph) throws DatabaseException { -// findBuiltins(graph); -// -// resources = new Resource[tg.resourceCount]; -// -// int Root = -1; -// int SimanticsDomain = -1; -// int Layer0 = -1; -// -// for(Identity identity : tg.identities) { -// if(identity.definition instanceof Internal) { -// Internal def = (Internal)identity.definition; -// Resource res = null; -// if(def.parent == Layer0) { -// try { -// res = graph.getBuiltin(CoreInitialization.LAYER0 + def.name); -// } catch(ResourceNotFoundException e) { -// } -// } -// else if(def.parent == SimanticsDomain) { -// if(def.name.equals("Layer0-1.0")) -// Layer0 = identity.resource; -// } -// else if(def.parent == Root) { -// if(def.name.equals("www.simantics.org")) -// SimanticsDomain = identity.resource; -// } -// -// if(res == null) -// res = createChild(graph, resources[def.parent], def.name); -// else -// createChild(graph, res, resources[def.parent], def.name); -// resources[identity.resource] = res; -// } -// else if(identity.definition instanceof Root) { -// Root = identity.resource; -// resources[identity.resource] = RootLibrary; -// } -// } -// } - - void addMissing(String external) { - Set removals = new HashSet(); - for(String ext : missingExternals) if(ext.startsWith(external)) return; - for(String ext : missingExternals) if(external.startsWith(ext)) removals.add(ext); + void addMissing(int handleIndex, String external) { + allMissingExternals.put(external, handleIndex); + Set removals = new HashSet<>(); + for(String ext : missingExternals) if(ext.startsWith(external + "/")) return; + for(String ext : missingExternals) if(external.startsWith(ext + "/")) removals.add(ext); missingExternals.removeAll(removals); missingExternals.add(external); } - + void prepare(ReadGraph graph) throws Exception { Resource target = advisor.getTarget(); @@ -225,7 +185,23 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); ClusterBuilder2 builder = factory.create(vg, false); - this.handles = new int[resourceCount]; + this.handles = new int[resourceCount]; + TIntObjectMap identityMap = TransferableGraphUtils.mapIdentities(identities); + + // We must process roots first, because internal identifiers depend on them. + for(Identity identity : identities) { + IdentityDefinition definition = identity.definition; + if (definition instanceof Root) { + Root root = (Root) definition; + if (root.name.equals("")) + handles[identity.resource] = builder.handle(RootLibrary); + else { + Resource existing = advisor.analyzeRoot(graph, root); + if (existing != null) + handles[identity.resource] = builder.handle(existing); + } + } + } for(Identity identity : identities) { IdentityDefinition definition = identity.definition; @@ -243,32 +219,27 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap Resource parent = handle != 0 ? builder.resource(handle) : null; // TODO: escape should be removed when names become well-behaving if(parent != null) { + resolvedParents.put(graph.getURI(parent), parent); Map childMap = graph .syncRequest(new UnescapedChildMapOfResource(parent), - new TransientCacheAsyncListener>()); + TransientCacheAsyncListener.instance()); Resource child = childMap.get(def.name); if(child == null) { - addMissing(graph.getURI(parent) + "/" + def.name); + addMissing(identity.resource, graph.getURI(parent) + "/" + URIStringUtils.escape(def.name)); } else { handles[identity.resource] = builder.handle(child); } } else { - addMissing(TransferableGraphUtils.getURI(resourceCount, identities, def.parent) + "/" + def.name); + addMissing(identity.resource, TransferableGraphUtils.getURI(resourceCount, identityMap, def.parent) + "/" + URIStringUtils.escape(def.name)); } } } } else if(definition instanceof Internal) { - // Do not do anything for now - } - else if(definition instanceof Root) { - Root root = (Root)definition; - if(root.name.equals("")) - handles[identity.resource] = builder.handle(RootLibrary); - else { - Resource existing = advisor.analyzeRoot(graph, root); - if(existing != null) - handles[identity.resource] = builder.handle(existing); + String uri = TransferableGraphUtils.getURI(resourceCount, identityMap, identity.resource); + Resource existing = graph.getPossibleResource(uri); + if(existing != null) { + existingInternalMap.put(identity.resource, existing); } } else if(definition instanceof Optional) { @@ -278,22 +249,33 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name)); } } - - if(!missingExternals.isEmpty()) throw new MissingDependencyException(this); - + + if (!missingExternals.isEmpty() && failOnMissingEntities()) + throw new MissingDependencyException(this); + } + + private boolean failOnMissingEntities() { + return "true".equalsIgnoreCase( + System.getProperty( + "org.simantics.tg.import.failOnMissingEntities", + "false") ); } @Override public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException { + //System.err.println("child " + parent + " - " + child + " = " + name); if(child == null) child = graph.newResource(); Resource nameResource = graph.newResource(); graph.claim(nameResource, InstanceOf, null, String); graph.claimValue(nameResource, name, WriteBindings.STRING); graph.claim(child, HasName, NameOf, nameResource); + graph.claim(parent, ConsistsOf, PartOf, child); return child; } - + int[] getClustering() { + if (vg != null) + return null; Variant v = extensions.get(Extensions.CLUSTERING); if(v == null) return null; try { @@ -305,6 +287,8 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap } int[] getClusterSets() { + if (vg != null) + return null; Variant v = extensions.get(Extensions.CLUSTER_SETS); if(v == null) return null; try { @@ -318,10 +302,18 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap boolean needTranslation(Datatype type) { return resourceUtil.mayHaveResource(type); } - + void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException { ClusteringSupport support = graph.getService(ClusteringSupport.class); if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return; + Resource indexRootClusterSetResource = rootLibrary; + if(indexRoot != null && support.isClusterSet(indexRoot)) { + indexRootClusterSetResource = indexRoot; + } else { + graph.setClusterSet4NewResource(rootLibrary); + graph.flushCluster(); + } + int indexRootCsHandle = builder.handle(indexRootClusterSetResource); for(int pos=0,index=0;index serviceMode = xs.getServiceMode(); + xs.setServiceMode(true, false); + try { + ArrayList missing = new ArrayList<>(allMissingExternals.keySet()); + Collections.sort(missing); + for(String uri : missing) { + String[] parts = URIStringUtils.splitURI(uri); + // URIStringUtils.splitURI returns root URI in non-standard format, so fix it manually as a workaround + if (parts[0].equals("http://")) { + parts[0] = "http:/"; + } + + Resource parent = resolvedParents.get(parts[0]); + // TODO: proper exception message + if(parent == null) { + throw new IllegalStateException("Missing URI: " + uri); + } + + Resource childResource = graph.newResource(); + graph.claim(childResource, InstanceOf, null, ExternalEntity); + + Resource nameResource = graph.newResource(); + graph.claim(nameResource, InstanceOf, null, String); + graph.claimValue(nameResource, URIStringUtils.unescape(parts[1]), WriteBindings.STRING); + graph.claim(childResource, HasName, NameOf, nameResource); + + graph.claim(parent, ConsistsOf, PartOf, childResource); + + resolvedParents.put(uri, childResource); + + handles[allMissingExternals.get(uri)] = builder.handle(childResource); + } + } finally { + xs.setServiceMode(serviceMode.first, serviceMode.second); + } + } + void write(final WriteOnlyGraph graph) throws Exception { final SerialisationSupport ss = graph.getService(SerialisationSupport.class); - + ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); if(advisor instanceof IImportAdvisor2) { boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications(); @@ -365,6 +399,8 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap builder = factory.create(vg, false); } + createMissing(graph); + final int[] handles = this.handles; int[] clustering = getClustering(); @@ -439,10 +475,16 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap } else if(definition instanceof Internal) { Internal def = (Internal)definition; - if(handles[identity.resource] != 0) - handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name)); - else - handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name)); + Resource external = existingInternalMap.get(identity.resource); + if(external != null) { + handles[identity.resource] = builder.handle(external); + } else { + if(handles[identity.resource] != 0) + handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name)); + else + handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name)); + } + } else if(definition instanceof Root) { @@ -465,8 +507,11 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap } } } - - tg.getStatementCount(); + + int[] done = { 0 }; + int[] percentage = { 0 }; + + int statementCount = tg.getStatementCount(); tg.forStatements(null, new TransferableGraphSourceProcedure() { @Override @@ -486,12 +531,16 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap int inverse = handles[inv]; builder.addStatement(graph, object, inverse, subject); } - + + // Count from 0% -> 50% => total = statementCount*2 + percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2); + } }); - tg.getValueCount(); + int valueCount = tg.getValueCount(); + done[0] = 0; class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure { @@ -532,7 +581,8 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap s.skip(this); } builder.endValue(); - + work(); + } @Override @@ -541,7 +591,7 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap try { builder.appendValue(value); } catch (DatabaseException e) { - e.printStackTrace(); + LOGGER.error("Failed to write value into database", e); } return value; } @@ -552,14 +602,27 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap for (int i = 0; i < length; ++i) builder.appendValue(input.readUnsignedByte()); builder.endValue(); + work(); } + private void work() { + // Count from 50% -> 100% => [valueCount, valueCount*2) + percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2); + } }; tg.forValues2(null, new ValueProcedure()); + for(Resource r : existingInternalMap.valueCollection()) { + try { + graph.deny(r, InstanceOf, null, ExternalEntity, null); + } catch (DatabaseException e) { + graph.deny(r, InstanceOf, null, ExternalEntity, vg); + } + } + } - + @Override public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException { final int count = handles.length;