/******************************************************************************* * Copyright (c) 2012, 2016 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation * Semantum Oy *******************************************************************************/ package org.simantics.graph.db; import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.simantics.databoard.Bindings; import org.simantics.databoard.adapter.AdaptException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.WriteBindings; import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; import org.simantics.db.common.uri.UnescapedChildMapOfResource; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.ClusterBuilder2; import org.simantics.db.service.ClusterBuilderFactory; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure; import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure; import org.simantics.graph.representation.Extensions; import org.simantics.graph.representation.External; import org.simantics.graph.representation.Identity; import org.simantics.graph.representation.IdentityDefinition; import org.simantics.graph.representation.Internal; import org.simantics.graph.representation.Optional; import org.simantics.graph.representation.Root; import org.simantics.graph.representation.TransferableGraphUtils; import org.simantics.graph.utils.TGResourceUtil; import org.simantics.graph.utils.TGResourceUtil.LongAdapter; public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter { public static String LOG_FILE = "transferableGraphs.log"; final static private boolean LOG = false; static DataOutput log; static { if (LOG) { try { FileOutputStream stream = new FileOutputStream(LOG_FILE); log = new DataOutputStream(stream); } catch (FileNotFoundException e) { e.printStackTrace(); } } } private static void log(String line) { if (LOG) { try { log.writeUTF(line + "\n"); } catch (IOException e) { e.printStackTrace(); } } } Resource indexRoot; TransferableGraphSource tg; VirtualGraph vg; IImportAdvisor2 advisor; ClusterBuilder2 builder; final TGResourceUtil resourceUtil = new TGResourceUtil(); int[] handles; Set missingExternals = new HashSet(); int resourceCount; Identity[] identities; TreeMap extensions; // Builtins Resource RootLibrary; Resource String; Resource Library; Resource InstanceOf; Resource ConsistsOf; Resource PartOf; Resource HasName; Resource NameOf; public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) { this.tg = tg; this.vg = vg; this.advisor = advisor; } public void readIdentities(ReadGraph g) throws Exception { extensions = tg.getExtensions(); resourceCount = tg.getResourceCount(); identities = new Identity[tg.getIdentityCount()]; tg.forIdentities(g, new TransferableGraphSourceProcedure() { int counter = 0; @Override public void execute(Identity value) throws Exception { identities[counter++] = value; } }); } public void findBuiltins(WriteOnlyGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library"); InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf"); ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf"); PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); } public void findBuiltins(ReadGraph g) throws DatabaseException { RootLibrary = g.getBuiltin("http:/"); String = g.getBuiltin(CoreInitialization.LAYER0 + "String"); Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library"); InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf"); ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf"); PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf"); HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName"); NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf"); } // /* Preparation that is used when the core is empty. // */ // void initialPrepare(WriteOnlyGraph graph) throws DatabaseException { // findBuiltins(graph); // // resources = new Resource[tg.resourceCount]; // // int Root = -1; // int SimanticsDomain = -1; // int Layer0 = -1; // // for(Identity identity : tg.identities) { // if(identity.definition instanceof Internal) { // Internal def = (Internal)identity.definition; // Resource res = null; // if(def.parent == Layer0) { // try { // res = graph.getBuiltin(CoreInitialization.LAYER0 + def.name); // } catch(ResourceNotFoundException e) { // } // } // else if(def.parent == SimanticsDomain) { // if(def.name.equals("Layer0-1.0")) // Layer0 = identity.resource; // } // else if(def.parent == Root) { // if(def.name.equals("www.simantics.org")) // SimanticsDomain = identity.resource; // } // // if(res == null) // res = createChild(graph, resources[def.parent], def.name); // else // createChild(graph, res, resources[def.parent], def.name); // resources[identity.resource] = res; // } // else if(identity.definition instanceof Root) { // Root = identity.resource; // resources[identity.resource] = RootLibrary; // } // } // } void addMissing(String external) { Set removals = new HashSet(); for(String ext : missingExternals) if(ext.startsWith(external)) return; for(String ext : missingExternals) if(external.startsWith(ext)) removals.add(ext); missingExternals.removeAll(removals); missingExternals.add(external); } void prepare(ReadGraph graph) throws Exception { // indexRoot = graph.syncRequest(new PossibleIndexRoot(((IImportAdvisor2)advisor).getTarget())); findBuiltins(graph); readIdentities(graph); // System.err.println("ext: " + extensions); // System.err.println("rc: " + resourceCount); // System.err.println("ic: " + identities.length); ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); ClusterBuilder2 builder = factory.create(vg, false); this.handles = new int[resourceCount]; for(Identity identity : identities) { IdentityDefinition definition = identity.definition; if(definition instanceof External) { External def = (External)definition; if(def.parent == -1) { handles[identity.resource] = builder.handle(RootLibrary); } else { if("@inverse".equals(def.name)) { int parent = handles[def.parent]; int child = builder.handle(graph.getInverse(builder.resource(parent))); handles[identity.resource] = child; } else { int handle = handles[def.parent]; Resource parent = handle != 0 ? builder.resource(handle) : null; // TODO: escape should be removed when names become well-behaving if(parent != null) { Map childMap = graph .syncRequest(new UnescapedChildMapOfResource(parent), new TransientCacheAsyncListener>()); Resource child = childMap.get(def.name); if(child == null) { addMissing(graph.getURI(parent) + "/" + def.name); } else { handles[identity.resource] = builder.handle(child); } } else { addMissing(TransferableGraphUtils.getURI(resourceCount, identities, def.parent) + "/" + def.name); } } } } else if(definition instanceof Internal) { // Do not do anything for now } else if(definition instanceof Root) { Root root = (Root)definition; if(root.name.equals("")) handles[identity.resource] = builder.handle(RootLibrary); else { Resource existing = advisor.analyzeRoot(graph, root); if(existing != null) handles[identity.resource] = builder.handle(existing); } } else if(definition instanceof Optional) { External def = (External)definition; Resource parent = builder.resource(handles[def.parent]); if(parent != null) handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name)); } } if(!missingExternals.isEmpty()) throw new MissingDependencyException(this); } @Override public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException { if(child == null) child = graph.newResource(); Resource nameResource = graph.newResource(); graph.claim(nameResource, InstanceOf, null, String); graph.claimValue(nameResource, name, WriteBindings.STRING); graph.claim(child, HasName, NameOf, nameResource); return child; } int[] getClustering() { Variant v = extensions.get(Extensions.CLUSTERING); if(v == null) return null; try { return (int[])v.getValue(Bindings.INT_ARRAY); } catch (AdaptException e) { Logger.defaultLogError(e); return null; } } int[] getClusterSets() { Variant v = extensions.get(Extensions.CLUSTER_SETS); if(v == null) return null; try { return (int[])v.getValue(Bindings.INT_ARRAY); } catch (AdaptException e) { Logger.defaultLogError(e); return null; } } boolean needTranslation(Datatype type) { return resourceUtil.mayHaveResource(type); } void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, Resource indexRoot, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException { ClusteringSupport support = graph.getService(ClusteringSupport.class); if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return; for(int pos=0,index=0;index= 0) builder.newCluster(csHandle); handles[id] = builder.newResource(csHandle); clusters[index] = support.getCluster(builder.resource(handles[id])); builder.createClusterSet(handles[id]); } return; } } } void write(final WriteOnlyGraph graph) throws Exception { final SerialisationSupport ss = graph.getService(SerialisationSupport.class); ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class); if(advisor instanceof IImportAdvisor2) { boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications(); builder = factory.create(vg, allowImmutable); } else { builder = factory.create(vg, false); } final int[] handles = this.handles; int[] clustering = getClustering(); if(clustering != null) { int[] clusterSets = getClusterSets(); if(clusterSets != null) { assert(clustering.length == clusterSets.length); long[] clusters = new long[clustering.length]; // Create clustering for(int i=0;i= 0) builder.newCluster(setHandle); for(int r=0;r() { @Override public void execute(int[] value) throws Exception { int sub = value[0]; int pred = value[1]; int inv = value[2]; int obj = value[3]; int subject = handles[sub]; int predicate = handles[pred]; int object = handles[obj]; builder.addStatement(graph, subject, predicate, object); if(inv >= 0) { int inverse = handles[inv]; builder.addStatement(graph, object, inverse, subject); } } }); tg.getValueCount(); class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure { private TGResourceUtil util = new TGResourceUtil(); private DataInput source; @Override public void execute(int _resource, Datatype type, DataInput stream) throws Exception { source = stream; //int file = _resource & 0x80000000; int resource = _resource & 0x7FFFFFFF; Binding binding = Bindings.getBinding(type); Serializer s = Bindings.getSerializer(binding); builder.beginValue(handles[resource]); if(util.mayHaveResource(type)) { Object value = s.deserialize(stream); util.adaptValue( binding, value, new LongAdapter() { @Override public long adapt(long in) { try { return ss.getRandomAccessId(handles[(int)in]); } catch (DatabaseException e) { throw new IllegalStateException(e); } } }); byte[] bytes = s.serialize(value); for(byte b : bytes) { int val = b; if(val < 0) val += 256; builder.appendValue(val); } } else { s.skip(this); } builder.endValue(); } @Override public int read() throws IOException { int value = source.readUnsignedByte(); try { builder.appendValue(value); } catch (DatabaseException e) { e.printStackTrace(); } return value; } @Override public void rawCopy(int resource, int length, DataInput input) throws Exception { builder.beginValue(handles[resource]); for (int i = 0; i < length; ++i) builder.appendValue(input.readUnsignedByte()); builder.endValue(); } }; tg.forValues2(null, new ValueProcedure()); } @Override public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException { final int count = handles.length; long[] resourceIds = new long[count]; for(int i=0;i