/******************************************************************************* * Copyright (c) 2012, 2017 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation * Semantum Oy - e.g. #7016 *******************************************************************************/ package org.simantics.graph.db; import java.io.DataInput; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.simantics.databoard.Bindings; import org.simantics.databoard.adapter.AdaptException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.util.URIStringUtils; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.WriteBindings; import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; import org.simantics.db.common.request.PossibleIndexRoot; import org.simantics.db.common.uri.UnescapedChildMapOfResource; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.ClusterBuilder2; import org.simantics.db.service.ClusterBuilderFactory; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.db.service.XSupport; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; import org.simantics.graph.representation.Extensions; import org.simantics.graph.representation.External; import org.simantics.graph.representation.Identity; import org.simantics.graph.representation.IdentityDefinition; import org.simantics.graph.representation.Internal; import org.simantics.graph.representation.Optional; import org.simantics.graph.representation.Root; import org.simantics.graph.representation.TransferableGraphUtils; import org.simantics.graph.utils.TGResourceUtil; import org.simantics.graph.utils.TGResourceUtil.LongAdapter; import org.simantics.utils.datastructures.Pair; import org.slf4j.LoggerFactory; import gnu.trove.map.TIntObjectMap; import gnu.trove.map.hash.TIntObjectHashMap; public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(StreamingTransferableGraphImportProcess.class); Resource indexRoot; TransferableGraphSource tg; VirtualGraph vg; IImportAdvisor2 advisor; TGStatusMonitor monitor; ClusterBuilder2 builder; final TGResourceUtil resourceUtil = new TGResourceUtil(); int[] handles; Map allMissingExternals = new HashMap<>(); Set missingExternals = new HashSet<>(); Map resolvedParents = new HashMap<>(); TIntObjectHashMap existingInternalMap = new TIntObjectHashMap<>(); int resourceCount; Identity[] identities; TreeMap extensions; // Builtins Resource RootLibrary; Resource String; Resource ExternalEntity; Resource Library; Resource InstanceOf; Resource ConsistsOf; Resource PartOf; Resource HasName; Resource NameOf; public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) { this(session, vg, tg, advisor, null); } public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) { this.tg = tg; this.vg = vg; this.advisor = advisor; this.monitor = monitor; } private int updatePercentage(int percentage, int done, int total) { if (monitor != null && (done & 63) == 0) { int current = 100*done / total; if (current > percentage) { percentage = current; monitor.status(percentage); } } return percentage; } public void readIdentities(ReadGraph g) throws Exception { extensions = tg.getExtensions(); resourceCount = tg.getResourceCount(); identities = new Identity[tg.getIdentityCount()]; tg.forIdentities(g, new TransferableGraphSourceProcedure() { int counter = 0; @Override public void execute(Identity value) throws Exception { identities[counter++] = value; } }); } public void findBuiltins(WriteOnlyGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library"); InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf"); ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf"); PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity"); } public void findBuiltins(ReadGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library"); InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf"); ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf"); PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); ExternalEntity = g.getBuiltin(CoreInitialization.LAYER0 + "ExternalEntity"); } void addMissing(int handleIndex, String external) { allMissingExternals.put(external, handleIndex); Set removals = new HashSet<>(); for(String ext : missingExternals) if(ext.startsWith(external + "/")) return; for(String ext : missingExternals) if(external.startsWith(ext + "/")) removals.add(ext); missingExternals.removeAll(removals); missingExternals.add(external); } void prepare(ReadGraph graph) throws Exception { Resource target = advisor.getTarget(); if(target != null) indexRoot = graph.syncRequest(new PossibleIndexRoot(target)); findBuiltins(graph); readIdentities(graph); // System.err.println("ext: " + extensions); // System.err.println("rc: " + resourceCount); // System.err.println("ic: " + identities.length); ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); ClusterBuilder2 builder = factory.create(vg, false); this.handles = new int[resourceCount]; TIntObjectMap identityMap = TransferableGraphUtils.mapIdentities(identities); for(Identity identity : identities) { IdentityDefinition definition = identity.definition; if(definition instanceof External) { External def = (External)definition; if(def.parent == -1) { handles[identity.resource] = builder.handle(RootLibrary); } else { if("@inverse".equals(def.name)) { int parent = handles[def.parent]; int child = builder.handle(graph.getInverse(builder.resource(parent))); handles[identity.resource] = child; } else { int handle = handles[def.parent]; Resource parent = handle != 0 ? builder.resource(handle) : null; // TODO: escape should be removed when names become well-behaving if(parent != null) { resolvedParents.put(graph.getURI(parent), parent); Map childMap = graph .syncRequest(new UnescapedChildMapOfResource(parent), TransientCacheAsyncListener.instance()); Resource child = childMap.get(def.name); if(child == null) { addMissing(identity.resource, graph.getURI(parent) + "/" + URIStringUtils.escape(def.name)); } else { handles[identity.resource] = builder.handle(child); } } else { addMissing(identity.resource, TransferableGraphUtils.getURI(resourceCount, identityMap, def.parent) + "/" + URIStringUtils.escape(def.name)); } } } } else if(definition instanceof Internal) { String uri = TransferableGraphUtils.getURI(resourceCount, identityMap, identity.resource); Resource existing = graph.getPossibleResource(uri); if(existing != null) { existingInternalMap.put(identity.resource, existing); } } else if(definition instanceof Root) { Root root = (Root)definition; if(root.name.equals("")) handles[identity.resource] = builder.handle(RootLibrary); else { Resource existing = advisor.analyzeRoot(graph, root); if(existing != null) handles[identity.resource] = builder.handle(existing); } } else if(definition instanceof Optional) { External def = (External)definition; Resource parent = builder.resource(handles[def.parent]); if(parent != null) handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name)); } } //if(!missingExternals.isEmpty()) throw new MissingDependencyException(this); } @Override public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException { if(child == null) child = graph.newResource(); Resource nameResource = graph.newResource(); graph.claim(nameResource, InstanceOf, null, String); graph.claimValue(nameResource, name, WriteBindings.STRING); graph.claim(child, HasName, NameOf, nameResource); return child; } int[] getClustering() { Variant v = extensions.get(Extensions.CLUSTERING); if(v == null) return null; try { return (int[])v.getValue(Bindings.INT_ARRAY); } catch (AdaptException e) { Logger.defaultLogError(e); return null; } } int[] getClusterSets() { Variant v = extensions.get(Extensions.CLUSTER_SETS); if(v == null) return null; try { return (int[])v.getValue(Bindings.INT_ARRAY); } catch (AdaptException e) { Logger.defaultLogError(e); return null; } } boolean needTranslation(Datatype type) { return resourceUtil.mayHaveResource(type); } void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException { ClusteringSupport support = graph.getService(ClusteringSupport.class); if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return; Resource indexRootClusterSetResource = rootLibrary; if(indexRoot != null && support.isClusterSet(indexRoot)) { indexRootClusterSetResource = indexRoot; } else { graph.setClusterSet4NewResource(rootLibrary); graph.flushCluster(); } int indexRootCsHandle = builder.handle(indexRootClusterSetResource); for(int pos=0,index=0;index= 0) builder.newCluster(csHandle); handles[id] = builder.newResource(csHandle); clusters[index] = support.getCluster(builder.resource(handles[id])); builder.createClusterSet(handles[id]); } return; } } } void createMissing(final WriteOnlyGraph graph) throws Exception { if(allMissingExternals.isEmpty()) return; XSupport xs = graph.getService(XSupport.class); Pair serviceMode = xs.getServiceMode(); xs.setServiceMode(true, false); try { ArrayList missing = new ArrayList<>(allMissingExternals.keySet()); Collections.sort(missing); for(String uri : missing) { String[] parts = URIStringUtils.splitURI(uri); // URIStringUtils.splitURI returns root URI in non-standard format, so fix it manually as a workaround if (parts[0].equals("http://")) { parts[0] = "http:/"; } Resource parent = resolvedParents.get(parts[0]); // TODO: proper exception message if(parent == null) { throw new IllegalStateException("!!"); } Resource childResource = graph.newResource(); graph.claim(childResource, InstanceOf, null, ExternalEntity); Resource nameResource = graph.newResource(); graph.claim(nameResource, InstanceOf, null, String); graph.claimValue(nameResource, URIStringUtils.unescape(parts[1]), WriteBindings.STRING); graph.claim(childResource, HasName, NameOf, nameResource); graph.claim(parent, ConsistsOf, PartOf, childResource); resolvedParents.put(uri, childResource); handles[allMissingExternals.get(uri)] = builder.handle(childResource); } } finally { xs.setServiceMode(serviceMode.first, serviceMode.second); } } void write(final WriteOnlyGraph graph) throws Exception { final SerialisationSupport ss = graph.getService(SerialisationSupport.class); ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); if(advisor instanceof IImportAdvisor2) { boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications(); builder = factory.create(vg, allowImmutable); } else { builder = factory.create(vg, false); } createMissing(graph); final int[] handles = this.handles; int[] clustering = getClustering(); if(clustering != null) { int[] clusterSets = getClusterSets(); if(clusterSets != null) { assert(clustering.length == clusterSets.length); long[] clusters = new long[clustering.length]; // Create clustering for(int i=0;i= 0) builder.newCluster(setHandle); for(int r=0;r() { @Override public void execute(int[] value) throws Exception { int sub = value[0]; int pred = value[1]; int inv = value[2]; int obj = value[3]; int subject = handles[sub]; int predicate = handles[pred]; int object = handles[obj]; builder.addStatement(graph, subject, predicate, object); if(inv >= 0) { int inverse = handles[inv]; builder.addStatement(graph, object, inverse, subject); } // Count from 0% -> 50% => total = statementCount*2 percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2); } }); int valueCount = tg.getValueCount(); done[0] = 0; class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure { private TGResourceUtil util = new TGResourceUtil(); private DataInput source; @Override public void execute(int _resource, Datatype type, DataInput stream) throws Exception { source = stream; //int file = _resource & 0x80000000; int resource = _resource & 0x7FFFFFFF; Binding binding = Bindings.getBinding(type); Serializer s = Bindings.getSerializer(binding); builder.beginValue(handles[resource]); if(util.mayHaveResource(type)) { Object value = s.deserialize(stream); util.adaptValue( binding, value, new LongAdapter() { @Override public long adapt(long in) { try { return ss.getRandomAccessId(handles[(int)in]); } catch (DatabaseException e) { throw new IllegalStateException(e); } } }); byte[] bytes = s.serialize(value); for(byte b : bytes) { int val = b; if(val < 0) val += 256; builder.appendValue(val); } } else { s.skip(this); } builder.endValue(); work(); } @Override public int read() throws IOException { int value = source.readUnsignedByte(); try { builder.appendValue(value); } catch (DatabaseException e) { LOGGER.error("Failed to write value into database", e); } return value; } @Override public void rawCopy(int resource, int length, DataInput input) throws Exception { builder.beginValue(handles[resource]); for (int i = 0; i < length; ++i) builder.appendValue(input.readUnsignedByte()); builder.endValue(); work(); } private void work() { // Count from 50% -> 100% => [valueCount, valueCount*2) percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2); } }; tg.forValues2(null, new ValueProcedure()); for(Resource r : existingInternalMap.valueCollection()) { graph.deny(r, InstanceOf, null, ExternalEntity, null); } } @Override public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException { final int count = handles.length; long[] resourceIds = new long[count]; for(int i=0;i