--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2012, 2016 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ * Semantum Oy\r
+ *******************************************************************************/\r
+package org.simantics.graph.db;\r
+\r
+import java.io.DataInput;\r
+import java.io.DataOutput;\r
+import java.io.DataOutputStream;\r
+import java.io.FileNotFoundException;\r
+import java.io.FileOutputStream;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.util.HashSet;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.TreeMap;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.adapter.AdaptException;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.WriteOnlyGraph;\r
+import org.simantics.db.common.WriteBindings;\r
+import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;\r
+import org.simantics.db.common.uri.UnescapedChildMapOfResource;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.service.ClusterBuilder2;\r
+import org.simantics.db.service.ClusterBuilderFactory;\r
+import org.simantics.db.service.ClusteringSupport;\r
+import org.simantics.db.service.SerialisationSupport;\r
+import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceProcedure;\r
+import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;\r
+import org.simantics.graph.representation.Extensions;\r
+import org.simantics.graph.representation.External;\r
+import org.simantics.graph.representation.Identity;\r
+import org.simantics.graph.representation.IdentityDefinition;\r
+import org.simantics.graph.representation.Internal;\r
+import org.simantics.graph.representation.Optional;\r
+import org.simantics.graph.representation.Root;\r
+import org.simantics.graph.representation.TransferableGraphUtils;\r
+import org.simantics.graph.utils.TGResourceUtil;\r
+import org.simantics.graph.utils.TGResourceUtil.LongAdapter;\r
+\r
+public class StreamingTransferableGraphImportProcess implements TransferableGraphImporter {\r
+ \r
+ public static String LOG_FILE = "transferableGraphs.log";\r
+ final static private boolean LOG = false;\r
+ \r
+ static DataOutput log;\r
+\r
+ static {\r
+\r
+ if (LOG) {\r
+ \r
+ try {\r
+ FileOutputStream stream = new FileOutputStream(LOG_FILE);\r
+ log = new DataOutputStream(stream);\r
+ } catch (FileNotFoundException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+\r
+ }\r
+ \r
+ private static void log(String line) {\r
+ if (LOG) {\r
+ try {\r
+ log.writeUTF(line + "\n");\r
+ } catch (IOException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+\r
+ Resource indexRoot;\r
+ TransferableGraphSource tg;\r
+ VirtualGraph vg;\r
+ IImportAdvisor2 advisor;\r
+ ClusterBuilder2 builder;\r
+ final TGResourceUtil resourceUtil = new TGResourceUtil();\r
+ \r
+ int[] handles;\r
+ \r
+ Set<String> missingExternals = new HashSet<String>(); \r
+ \r
+ int resourceCount;\r
+ Identity[] identities;\r
+ TreeMap<String, Variant> extensions;\r
+ \r
+ // Builtins\r
+ Resource RootLibrary;\r
+ Resource String;\r
+ Resource Library;\r
+ \r
+ Resource InstanceOf;\r
+ Resource ConsistsOf;\r
+ Resource PartOf;\r
+ Resource HasName;\r
+ Resource NameOf; \r
+ \r
+ public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) {\r
+ this.tg = tg;\r
+ this.vg = vg;\r
+ this.advisor = advisor;\r
+ }\r
+ \r
+ public void readIdentities(ReadGraph g) throws Exception {\r
+ extensions = tg.getExtensions();\r
+ resourceCount = tg.getResourceCount();\r
+ identities = new Identity[tg.getIdentityCount()];\r
+ tg.forIdentities(g, new TransferableGraphSourceProcedure<Identity>() {\r
+ \r
+ int counter = 0;\r
+ \r
+ @Override\r
+ public void execute(Identity value) throws Exception {\r
+ identities[counter++] = value;\r
+ }\r
+ });\r
+ }\r
+ \r
+ public void findBuiltins(WriteOnlyGraph g) throws DatabaseException {\r
+ RootLibrary = g.getBuiltin("http:/");\r
+ String = g.getBuiltin(CoreInitialization.LAYER0 + "String");\r
+ Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");\r
+ InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");\r
+ ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");\r
+ PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");\r
+ HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");\r
+ NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");\r
+ }\r
+ \r
+ public void findBuiltins(ReadGraph g) throws DatabaseException {\r
+ RootLibrary = g.getBuiltin("http:/");\r
+ String = g.getBuiltin(CoreInitialization.LAYER0 + "String");\r
+ Library = g.getBuiltin(CoreInitialization.LAYER0 + "Library");\r
+ InstanceOf = g.getBuiltin(CoreInitialization.LAYER0 + "InstanceOf");\r
+ ConsistsOf = g.getBuiltin(CoreInitialization.LAYER0 + "ConsistsOf");\r
+ PartOf = g.getBuiltin(CoreInitialization.LAYER0 + "PartOf");\r
+ HasName = g.getBuiltin(CoreInitialization.LAYER0 + "HasName");\r
+ NameOf = g.getBuiltin(CoreInitialization.LAYER0 + "NameOf");\r
+ }\r
+\r
+// /* Preparation that is used when the core is empty. \r
+// */\r
+// void initialPrepare(WriteOnlyGraph graph) throws DatabaseException {\r
+// findBuiltins(graph);\r
+// \r
+// resources = new Resource[tg.resourceCount];\r
+// \r
+// int Root = -1;\r
+// int SimanticsDomain = -1;\r
+// int Layer0 = -1;\r
+// \r
+// for(Identity identity : tg.identities) {\r
+// if(identity.definition instanceof Internal) {\r
+// Internal def = (Internal)identity.definition;\r
+// Resource res = null;\r
+// if(def.parent == Layer0) {\r
+// try {\r
+// res = graph.getBuiltin(CoreInitialization.LAYER0 + def.name);\r
+// } catch(ResourceNotFoundException e) { \r
+// }\r
+// }\r
+// else if(def.parent == SimanticsDomain) {\r
+// if(def.name.equals("Layer0-1.0"))\r
+// Layer0 = identity.resource;\r
+// }\r
+// else if(def.parent == Root) {\r
+// if(def.name.equals("www.simantics.org"))\r
+// SimanticsDomain = identity.resource;\r
+// }\r
+//\r
+// if(res == null)\r
+// res = createChild(graph, resources[def.parent], def.name);\r
+// else\r
+// createChild(graph, res, resources[def.parent], def.name);\r
+// resources[identity.resource] = res;\r
+// }\r
+// else if(identity.definition instanceof Root) {\r
+// Root = identity.resource;\r
+// resources[identity.resource] = RootLibrary; \r
+// } \r
+// }\r
+// }\r
+ \r
+ void addMissing(String external) {\r
+ Set<String> removals = new HashSet<String>();\r
+ for(String ext : missingExternals) if(ext.startsWith(external)) return;\r
+ for(String ext : missingExternals) if(external.startsWith(ext)) removals.add(ext);\r
+ missingExternals.removeAll(removals);\r
+ missingExternals.add(external);\r
+ }\r
+ \r
+ void prepare(ReadGraph graph) throws Exception {\r
+\r
+// indexRoot = graph.syncRequest(new PossibleIndexRoot(((IImportAdvisor2)advisor).getTarget()));\r
+ \r
+ findBuiltins(graph);\r
+ readIdentities(graph);\r
+ \r
+// System.err.println("ext: " + extensions);\r
+// System.err.println("rc: " + resourceCount);\r
+// System.err.println("ic: " + identities.length);\r
+ \r
+ ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);\r
+ ClusterBuilder2 builder = factory.create(vg, false);\r
+ \r
+ this.handles = new int[resourceCount];\r
+ \r
+ for(Identity identity : identities) {\r
+ IdentityDefinition definition = identity.definition;\r
+ if(definition instanceof External) {\r
+ External def = (External)definition;\r
+ if(def.parent == -1) {\r
+ handles[identity.resource] = builder.handle(RootLibrary);\r
+ } else {\r
+ if("@inverse".equals(def.name)) {\r
+ int parent = handles[def.parent];\r
+ int child = builder.handle(graph.getInverse(builder.resource(parent)));\r
+ handles[identity.resource] = child;\r
+ } else {\r
+ int handle = handles[def.parent];\r
+ Resource parent = handle != 0 ? builder.resource(handle) : null;\r
+ // TODO: escape should be removed when names become well-behaving\r
+ if(parent != null) {\r
+ Map<String,Resource> childMap = graph\r
+ .syncRequest(new UnescapedChildMapOfResource(parent),\r
+ new TransientCacheAsyncListener<Map<String, Resource>>()); \r
+ Resource child = childMap.get(def.name); \r
+ if(child == null) {\r
+ addMissing(graph.getURI(parent) + "/" + def.name);\r
+ } else {\r
+ handles[identity.resource] = builder.handle(child);\r
+ }\r
+ } else {\r
+ addMissing(TransferableGraphUtils.getURI(resourceCount, identities, def.parent) + "/" + def.name);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ else if(definition instanceof Internal) {\r
+ // Do not do anything for now\r
+ }\r
+ else if(definition instanceof Root) {\r
+ Root root = (Root)definition;\r
+ if(root.name.equals(""))\r
+ handles[identity.resource] = builder.handle(RootLibrary);\r
+ else {\r
+ Resource existing = advisor.analyzeRoot(graph, root);\r
+ if(existing != null)\r
+ handles[identity.resource] = builder.handle(existing);\r
+ }\r
+ }\r
+ else if(definition instanceof Optional) {\r
+ External def = (External)definition;\r
+ Resource parent = builder.resource(handles[def.parent]);\r
+ if(parent != null)\r
+ handles[identity.resource] = builder.handle(graph.syncRequest(new UnescapedChildMapOfResource(parent)).get(def.name)); \r
+ }\r
+ } \r
+ \r
+ if(!missingExternals.isEmpty()) throw new MissingDependencyException(this);\r
+ \r
+ }\r
+\r
+ @Override\r
+ public Resource createChild(WriteOnlyGraph graph, Resource parent, Resource child, String name) throws DatabaseException {\r
+ if(child == null) child = graph.newResource();\r
+ Resource nameResource = graph.newResource();\r
+ graph.claim(nameResource, InstanceOf, null, String);\r
+ graph.claimValue(nameResource, name, WriteBindings.STRING);\r
+ graph.claim(child, HasName, NameOf, nameResource);\r
+ return child;\r
+ }\r
+ \r
+ int[] getClustering() {\r
+ Variant v = extensions.get(Extensions.CLUSTERING);\r
+ if(v == null) return null;\r
+ try {\r
+ return (int[])v.getValue(Bindings.INT_ARRAY);\r
+ } catch (AdaptException e) {\r
+ Logger.defaultLogError(e);\r
+ return null;\r
+ }\r
+ }\r
+\r
+ int[] getClusterSets() {\r
+ Variant v = extensions.get(Extensions.CLUSTER_SETS);\r
+ if(v == null) return null;\r
+ try {\r
+ return (int[])v.getValue(Bindings.INT_ARRAY);\r
+ } catch (AdaptException e) {\r
+ Logger.defaultLogError(e);\r
+ return null;\r
+ }\r
+ }\r
+\r
+ boolean needTranslation(Datatype type) {\r
+ return resourceUtil.mayHaveResource(type);\r
+ }\r
+ \r
+ void findClusterSet(WriteOnlyGraph graph, Resource rootLibrary, Resource indexRoot, int[] clustering, int[] clusterSets, long[] clusters, int id) throws DatabaseException {\r
+ ClusteringSupport support = graph.getService(ClusteringSupport.class);\r
+ if(id == Extensions.ROOT_LIBRARY_CLUSTER_SET || id == Extensions.INDEX_ROOT_CLUSTER_SET) return;\r
+ for(int pos=0,index=0;index<clustering.length;index++) {\r
+ pos += clustering[index];\r
+ if(id < pos) {\r
+ int cs = clusterSets[index]; \r
+ if(handles[id] == 0) {\r
+ int csHandle = 0;\r
+ if(cs == Extensions.ROOT_LIBRARY_CLUSTER_SET) csHandle = builder.handle(rootLibrary);\r
+ else if(cs == Extensions.INDEX_ROOT_CLUSTER_SET) csHandle = builder.handle(indexRoot);\r
+ else {\r
+ findClusterSet(graph, rootLibrary, indexRoot, clustering, clusterSets, clusters, cs);\r
+ csHandle = handles[cs];\r
+ }\r
+ \r
+ if(clusters[index] != 0)\r
+ builder.selectCluster(clusters[index]);\r
+ else if(cs >= 0)\r
+ builder.newCluster(csHandle);\r
+ \r
+ handles[id] = builder.newResource(csHandle);\r
+ clusters[index] = support.getCluster(builder.resource(handles[id]));\r
+ \r
+ builder.createClusterSet(handles[id]);\r
+ }\r
+ return;\r
+ }\r
+ }\r
+ }\r
+ \r
+ void write(final WriteOnlyGraph graph) throws Exception {\r
+ \r
+ final SerialisationSupport ss = graph.getService(SerialisationSupport.class);\r
+ \r
+ ClusterBuilderFactory factory = graph.getService(ClusterBuilderFactory.class);\r
+ if(advisor instanceof IImportAdvisor2) {\r
+ boolean allowImmutable = ((IImportAdvisor2)advisor).allowImmutableModifications();\r
+ builder = factory.create(vg, allowImmutable);\r
+ } else {\r
+ builder = factory.create(vg, false);\r
+ }\r
+ \r
+ final int[] handles = this.handles; \r
+ \r
+ int[] clustering = getClustering();\r
+ if(clustering != null) {\r
+ \r
+ int[] clusterSets = getClusterSets();\r
+ if(clusterSets != null) {\r
+\r
+ assert(clustering.length == clusterSets.length);\r
+\r
+ long[] clusters = new long[clustering.length];\r
+ \r
+ // Create clustering\r
+ for(int i=0;i<clusterSets.length;i++) {\r
+ findClusterSet(graph, graph.getRootLibrary(), indexRoot, clustering, clusterSets, clusters, clusterSets[i]);\r
+ }\r
+ \r
+ // Then create all resources\r
+ int i=0;\r
+ for(int j=0;j<clustering.length;j++) {\r
+ int c = clustering[j];\r
+ int s = clusterSets[j];\r
+ int setHandle = 0;\r
+ if(s == Extensions.ROOT_LIBRARY_CLUSTER_SET)\r
+ setHandle = builder.handle(graph.getRootLibrary());\r
+ else if(s == Extensions.INDEX_ROOT_CLUSTER_SET)\r
+ setHandle = builder.handle(indexRoot);\r
+ else setHandle = handles[s];\r
+ // Preserve clustering only for internal resources\r
+ if(clusters[j] != 0)\r
+ builder.selectCluster(clusters[j]);\r
+ else if(s >= 0)\r
+ builder.newCluster(setHandle);\r
+ for(int r=0;r<c;r++, i++)\r
+ if(handles[i] == 0) handles[i] = builder.newResource();\r
+ }\r
+\r
+ for(;i<handles.length;++i)\r
+ if(handles[i] == 0) handles[i] = builder.newResource();\r
+ \r
+ } else {\r
+\r
+ int i = 0;\r
+ for(int c : clustering) {\r
+ builder.newCluster();\r
+ for(int r=0;r<c;r++, i++)\r
+ if(handles[i] == 0) handles[i] = builder.newResource();\r
+ }\r
+\r
+ for(;i<handles.length;++i)\r
+ if(handles[i] == 0) handles[i] = builder.newResource();\r
+ \r
+ }\r
+ \r
+ } else {\r
+ \r
+ // Create blank resources\r
+ for(int i=0;i<handles.length;++i)\r
+ if(handles[i] == 0) handles[i] = builder.newResource();\r
+\r
+ }\r
+ \r
+ // Internal identities \r
+ for(Identity identity : identities) {\r
+ IdentityDefinition definition = identity.definition;\r
+// if(handles[identity.resource] != 0)\r
+// continue;\r
+ if(definition instanceof External) {\r
+ // Already done everything\r
+ }\r
+ else if(definition instanceof Internal) {\r
+ Internal def = (Internal)definition;\r
+ if(handles[identity.resource] != 0)\r
+ handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name));\r
+ else\r
+ handles[identity.resource] = builder.handle(advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name));\r
+ }\r
+ else if(definition instanceof Root) {\r
+ \r
+ Root root = (Root)definition;\r
+ if(handles[identity.resource] != 0)\r
+ handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, builder.resource(handles[identity.resource])));\r
+ else\r
+ handles[identity.resource] = builder.handle(advisor.createRoot(graph, root, null));\r
+ }\r
+ else if(definition instanceof Optional) {\r
+ Optional def = (Optional)definition;\r
+ if(handles[identity.resource] != 0) {\r
+ Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), builder.resource(handles[identity.resource]), def.name);\r
+ graph.claim(child, InstanceOf, null, Library); // ???\r
+ handles[identity.resource] = builder.handle(child);\r
+ } else {\r
+ Resource child = advisor.createChild(graph, this, builder.resource(handles[def.parent]), null, def.name);\r
+ graph.claim(child, InstanceOf, null, Library); // ???\r
+ handles[identity.resource] = builder.handle(child);\r
+ }\r
+ }\r
+ } \r
+ \r
+ tg.getStatementCount();\r
+ tg.forStatements(null, new TransferableGraphSourceProcedure<int[]>() {\r
+\r
+ @Override\r
+ public void execute(int[] value) throws Exception {\r
+ \r
+ int sub = value[0];\r
+ int pred = value[1];\r
+ int inv = value[2];\r
+ int obj = value[3];\r
+\r
+ int subject = handles[sub];\r
+ int predicate = handles[pred];\r
+ int object = handles[obj];\r
+\r
+ builder.addStatement(graph, subject, predicate, object); \r
+ if(inv >= 0) {\r
+ int inverse = handles[inv];\r
+ builder.addStatement(graph, object, inverse, subject); \r
+ }\r
+ \r
+ }\r
+ \r
+ }); \r
+ \r
+ tg.getValueCount();\r
+\r
+ class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure {\r
+\r
+ private TGResourceUtil util = new TGResourceUtil();\r
+ private DataInput source;\r
+\r
+ @Override\r
+ public void execute(int _resource, Datatype type, DataInput stream) throws Exception {\r
+\r
+ source = stream;\r
+\r
+ //int file = _resource & 0x80000000;\r
+ int resource = _resource & 0x7FFFFFFF;\r
+\r
+ Binding binding = Bindings.getBinding(type);\r
+ Serializer s = Bindings.getSerializer(binding);\r
+\r
+ builder.beginValue(handles[resource]);\r
+ if(util.mayHaveResource(type)) {\r
+ Object value = s.deserialize(stream);\r
+ util.adaptValue( binding, value, new LongAdapter() {\r
+ @Override\r
+ public long adapt(long in) {\r
+ try {\r
+ return ss.getRandomAccessId(handles[(int)in]);\r
+ } catch (DatabaseException e) {\r
+ throw new IllegalStateException(e);\r
+ }\r
+ }\r
+ });\r
+ byte[] bytes = s.serialize(value);\r
+ for(byte b : bytes) {\r
+ int val = b;\r
+ if(val < 0) val += 256;\r
+ builder.appendValue(val);\r
+ }\r
+ } else {\r
+ s.skip(this);\r
+ }\r
+ builder.endValue();\r
+ \r
+ }\r
+\r
+ @Override\r
+ public int read() throws IOException {\r
+ int value = source.readUnsignedByte();\r
+ try {\r
+ builder.appendValue(value);\r
+ } catch (DatabaseException e) {\r
+ e.printStackTrace();\r
+ }\r
+ return value;\r
+ }\r
+\r
+ @Override\r
+ public void rawCopy(int resource, int length, DataInput input) throws Exception {\r
+ builder.beginValue(handles[resource]);\r
+ for (int i = 0; i < length; ++i)\r
+ builder.appendValue(input.readUnsignedByte());\r
+ builder.endValue();\r
+ }\r
+\r
+ };\r
+ \r
+ tg.forValues2(null, new ValueProcedure());\r
+ \r
+ }\r
+ \r
+ @Override\r
+ public long[] getResourceIds(SerialisationSupport serializer) throws DatabaseException {\r
+ final int count = handles.length;\r
+ long[] resourceIds = new long[count];\r
+ for(int i=0;i<count;++i)\r
+ resourceIds[i] = serializer.getRandomAccessId(handles[i]);\r
+ return resourceIds;\r
+ }\r
+}
\ No newline at end of file