-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.layer0.util;\r
-\r
-import gnu.trove.list.array.TIntArrayList;\r
-import gnu.trove.map.hash.TIntIntHashMap;\r
-import gnu.trove.map.hash.TLongObjectHashMap;\r
-import gnu.trove.procedure.TIntProcedure;\r
-import gnu.trove.procedure.TLongObjectProcedure;\r
-import gnu.trove.set.TIntSet;\r
-import gnu.trove.set.hash.THashSet;\r
-import gnu.trove.set.hash.TIntHashSet;\r
-\r
-import java.io.DataOutput;\r
-import java.io.DataOutputStream;\r
-import java.io.FileNotFoundException;\r
-import java.io.FileOutputStream;\r
-import java.io.IOException;\r
-import java.io.ObjectOutputStream;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashSet;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.TreeMap;\r
-import java.util.concurrent.ConcurrentLinkedQueue;\r
-import java.util.concurrent.ConcurrentSkipListSet;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.Databoard;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.mutable.Variant;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.db.AsyncReadGraph;\r
-import org.simantics.db.DirectStatements;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Statement;\r
-import org.simantics.db.common.request.AsyncReadRequest;\r
-import org.simantics.db.common.request.ReadRequest;\r
-import org.simantics.db.common.request.ResourceAsyncRead;\r
-import org.simantics.db.common.utils.NameUtils;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.layer0.adapter.SubgraphAdvisor;\r
-import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;\r
-import org.simantics.db.procedure.AsyncProcedure;\r
-import org.simantics.db.request.AsyncRead;\r
-import org.simantics.db.service.ClusteringSupport;\r
-import org.simantics.db.service.CollectionSupport;\r
-import org.simantics.db.service.DirectQuerySupport;\r
-import org.simantics.db.service.QueryControl;\r
-import org.simantics.db.service.QueryControl.ControlProcedure;\r
-import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.db.service.StatementSupport;\r
-import org.simantics.db.service.TransferableGraphSupport;\r
-import org.simantics.graph.representation.Extensions;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.utils.datastructures.Pair;\r
-import org.simantics.utils.threads.logger.ITask;\r
-import org.simantics.utils.threads.logger.ThreadLogger;\r
-\r
-public class Subgraphs {\r
-\r
- public static String LOG_FILE = "export.log";\r
- final static private boolean LOG = false;\r
- final static private boolean DEBUG = false;\r
- final static private boolean PARENT_DEBUG = DEBUG | false;\r
- final static private boolean EXTERNAL_DEBUG = DEBUG | false;\r
- final static private boolean ADVISOR_LOG = LOG & false;\r
- final static private boolean EXPANSION_LOG = LOG & false;\r
- final static private boolean INTERNAL_LOG = LOG & false;\r
- final static private boolean COMPOSED_LOG = LOG & false;\r
- final static private boolean RESOLVE_LOG = LOG & false;\r
- final static private boolean CLASSIFY_LOG = LOG & false;\r
- final static private boolean EXTERNAL_LOG = LOG & false;\r
- final static private boolean PROFILE = false;\r
-\r
- static enum WeakStatus {\r
- STRONG, WEAK\r
- }\r
-\r
- \r
- static DataOutput log;\r
-\r
- static {\r
-\r
- if (LOG) {\r
- try {\r
- FileOutputStream stream = new FileOutputStream(LOG_FILE, false);\r
- log = new DataOutputStream(stream);\r
- } catch (FileNotFoundException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
- \r
- private static void log(String line) {\r
- if (LOG) {\r
- try {\r
- log.write((line + "\n").getBytes());\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
- }\r
-\r
- public static Collection<Resource> getParents(ReadGraph g, Resource r)\r
- throws DatabaseException {\r
- return getParents(g, r, false);\r
- }\r
-\r
- static class FastInternalRequest extends ResourceAsyncRead<Boolean> {\r
-\r
- final DirectQuerySupport dqs;\r
- final ConcurrentLinkedQueue<Resource> queue;\r
- final Map<Resource, WeakStatus> weakInverses;\r
- final Map<Resource, ExtentStatus> status;\r
-\r
- public FastInternalRequest(DirectQuerySupport dqs, Resource resource,\r
- Map<Resource, ExtentStatus> status,\r
- Map<Resource, WeakStatus> weakInverses,\r
- ConcurrentLinkedQueue<Resource> queue) {\r
- super(resource);\r
- this.dqs = dqs;\r
- this.status = status;\r
- this.weakInverses = weakInverses;\r
- this.queue = queue;\r
- }\r
-\r
- @Override\r
- public int getFlags() {\r
- return 0;\r
- }\r
-\r
- @Override\r
- public void perform(AsyncReadGraph graph, final AsyncProcedure<Boolean> procedure) {\r
-\r
- dqs.forEachDirectStatement(graph, resource, new AsyncProcedure<DirectStatements>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, DirectStatements ss) {\r
- boolean ok = true;\r
- for(Statement statement : ss) {\r
- if (status.get(statement.getObject()) == ExtentStatus.INTERNAL) continue;\r
- WeakStatus status = weakInverses.get(statement.getPredicate()); \r
- if(status == WeakStatus.WEAK) continue;\r
- else if (status == null) {\r
- queue.add(statement.getPredicate());\r
- }\r
- ok = false;\r
- }\r
- procedure.execute(graph, ok);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
-\r
- static class ClassifyStatementsRequest implements AsyncRead<Boolean> {\r
-\r
- final Set<Resource> schedule;\r
- final Map<Resource, WeakStatus> weakMap;\r
-\r
- public ClassifyStatementsRequest(Set<Resource> schedule, Map<Resource, WeakStatus> weakMap) {\r
- this.weakMap = weakMap;\r
- this.schedule = schedule;\r
- }\r
-\r
- @Override\r
- public int threadHash() {\r
- return hashCode();\r
- }\r
- \r
- @Override\r
- public int getFlags() {\r
- return 0;\r
- }\r
-\r
- @Override\r
- public void perform(AsyncReadGraph graph, final AsyncProcedure<Boolean> procedure) {\r
-\r
- for (final Resource p : schedule) {\r
-\r
- graph.forPossibleInverse(p, new AsyncProcedure<Resource>() {\r
-\r
- private void register(AsyncReadGraph graph, Resource predicate, Resource superRelation, WeakStatus status) {\r
- synchronized (weakMap) {\r
- weakMap.put(predicate, status);\r
- if(superRelation != null) weakMap.put(superRelation, status);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, final Resource inverse) {\r
-\r
- if (inverse == null) {\r
- \r
- register(graph, p, null, WeakStatus.WEAK);\r
- \r
- } else {\r
- \r
- graph.forPossibleSuperrelation(inverse, new AsyncProcedure<Resource>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, final Resource superRelation) {\r
- \r
- if(superRelation != null && weakMap.containsKey(superRelation)) {\r
- register(graph, p, null, weakMap.get(superRelation));\r
- return;\r
- }\r
-\r
- graph.forIsSubrelationOf(inverse, graph.getService(Layer0.class).IsRelatedTo, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph,Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph,Boolean strong) {\r
- register(graph, p, superRelation, strong ? WeakStatus.STRONG : WeakStatus.WEAK);\r
- }\r
-\r
- });\r
- \r
- }\r
- \r
- });\r
- \r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- procedure.execute(graph, false);\r
-\r
- }\r
-\r
- }\r
-\r
- private static Collection<Resource> getParents(ReadGraph g, Resource r, boolean isStrong) throws DatabaseException {\r
-\r
- System.out.println("getParents " + NameUtils.getSafeName(g, r));\r
-\r
- Layer0 l0 = Layer0.getInstance(g);\r
-\r
- Collection<Resource> predicates = g.getPredicates(r);\r
-\r
- // --- Consists Of ----------------------------------------------------\r
-\r
- if (predicates.contains(l0.PartOf)) {\r
- Collection<Resource> parents = g.getObjects(r, l0.PartOf);\r
- if (parents.size() == 1)\r
- return parents;\r
- ArrayList<Resource> libraryParents = new ArrayList<Resource>(1);\r
- for (Resource p : parents)\r
- if (g.isInstanceOf(p, l0.Library))\r
- libraryParents.add(p);\r
- if (!libraryParents.isEmpty())\r
- return libraryParents;\r
- else\r
- return parents;\r
- }\r
-\r
- // --- Ordered sets ---------------------------------------------------\r
-\r
- {\r
- Collection<Resource> parents = null;\r
- for (Resource p : predicates)\r
- if (g.isInstanceOf(p, l0.OrderedSet) && !p.equals(r)) {\r
- if (parents == null)\r
- parents = new ArrayList<Resource>(1);\r
- parents.add(p);\r
- }\r
- if (parents != null) {\r
- if (DEBUG)\r
- System.out.println("ORDERED SET");\r
- return parents;\r
- }\r
- }\r
-\r
-\r
- if (isStrong)\r
- return Collections.emptyList();\r
- else {\r
-\r
- if (predicates.contains(l0.InverseOf)) {\r
- \r
- Resource inv = g.getInverse(r);\r
- return getParents(g, inv, true);\r
- \r
- } else {\r
- \r
- /*\r
- * Depends On\r
- * \r
- * If there are DependsOn parents, then IsRelatedTo parents are discarded\r
- * \r
- */\r
- HashSet<Resource> result = new HashSet<Resource>();\r
- for(Resource predicate : predicates) {\r
- if(g.isSubrelationOf(predicate, l0.IsDependencyOf)) result.addAll(g.getObjects(r, predicate));\r
- }\r
- if(!result.isEmpty()) return result;\r
- \r
- /*\r
- * Is Related To\r
- * \r
- * At this point all Is Related To are parents.\r
- * \r
- */\r
- for(Resource predicate : predicates) {\r
- Resource inv = g.getPossibleInverse(predicate);\r
- if(inv != null) {\r
- if(g.isSubrelationOf(inv, l0.IsRelatedTo)) result.addAll(g.getObjects(r, predicate)); \r
- }\r
- }\r
- \r
- return result;\r
- \r
- }\r
- \r
- /*\r
- Collection<Resource> invR = g.getObjects(r, b.IsRelatedTo_Inverse);\r
- if (predicates.contains(b.InverseOf)) {\r
- if (invR.size() > 1) {\r
- if (DEBUG)\r
- System.out\r
- .println("###########################################");\r
- Resource inv = g.getInverse(r);\r
- Collection<Resource> ret = new ArrayList<Resource>();\r
- for (Statement pp : g.getStatements(r,\r
- b.IsRelatedTo_Inverse))\r
- if (!pp.getPredicate().equals(inv)) {\r
- if (DEBUG) {\r
- System.out.println("<"\r
- + NameUtils.getSafeName(g, pp\r
- .getSubject())\r
- + ","\r
- + NameUtils.getSafeName(g, pp\r
- .getPredicate())\r
- + ","\r
- + NameUtils.getSafeName(g, pp\r
- .getObject()) + ">");\r
- }\r
- ret.add(pp.getObject());\r
- }\r
- return ret;\r
- }\r
- // System.out.println("?????????????????");\r
- Collection<Resource> invParents = getParents(g,\r
- g.getInverse(r), true);\r
- if (!invParents.isEmpty())\r
- return invParents;\r
- }\r
- if (DEBUG) {\r
- System.out.print("invR");\r
- for (Resource res : invR)\r
- System.out.print(" " + NameUtils.getSafeName(g, res));\r
- System.out.println();\r
- }\r
- return invR;\r
- */\r
- }\r
-\r
- }\r
-\r
-// public static String getIdentifier(ReadGraph g, Resource r)\r
-// throws DatabaseException {\r
-// Layer0 L0 = Layer0.getInstance(g);\r
-// if (r.equals(g.getRootLibrary()))\r
-// return "";\r
-// String name = g.getPossibleRelatedValue(r, L0.HasName);\r
-// if (name == null)\r
-// return null;\r
-// Collection<Resource> parents = getParents(g, r, true);\r
-// if (parents.size() != 1)\r
-// return null;\r
-// for (Resource p : parents) {\r
-// String parentIdentifier = getIdentifier(g, p);\r
-// if (parentIdentifier == null)\r
-// return null;\r
-// return parentIdentifier + "/" + name;\r
-// }\r
-// return null;\r
-// }\r
-\r
- static int kess = 0;\r
-\r
- static class Expansion extends AsyncReadRequest {\r
-\r
- final private Collection<Resource> roots;\r
- final Collection<DirectStatements>[] results;\r
- final Collection<Resource>[] listElements;\r
- \r
- public Expansion(Collection<Resource> roots, Collection<DirectStatements>[] results, Collection<Resource>[] listElements) {\r
- this.roots = roots;\r
- this.results = results;\r
- this.listElements = listElements;\r
- }\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
- final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);\r
-\r
- final DomainStatementProcedure proc = new DomainStatementProcedure(dqs, graph.getService(StatementSupport.class), graph.getService(Layer0.class), results, listElements);\r
-\r
- int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1;\r
-\r
- final Resource[] rootArray = roots.toArray(Resource.NONE);\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
-\r
- final int start = i * slice;\r
- final int end = Math.min(start + slice, rootArray.length);\r
-\r
- control.schedule(graph, i, new ControlProcedure() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph) {\r
- for (int index = start; index < end; index++) {\r
- dqs.forEachDirectStatement(graph, rootArray[index], proc);\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public int getFlags() {\r
- return 0;\r
- }\r
-\r
- }\r
-\r
- static class Expansion2 extends AsyncReadRequest {\r
-\r
- final private Collection<Resource> roots;\r
- final Collection<DirectStatements>[] results;\r
- final boolean ignoreVirtual;\r
-\r
- public Expansion2(Collection<Resource> roots, Collection<DirectStatements>[] results) {\r
- this(roots, results, true);\r
- }\r
-\r
- public Expansion2(Collection<Resource> roots, Collection<DirectStatements>[] results, boolean ignoreVirtual) {\r
- this.roots = roots;\r
- this.results = results;\r
- this.ignoreVirtual = ignoreVirtual;\r
- }\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
- final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);\r
-\r
- final DomainStatementProcedure2 proc = \r
- new DomainStatementProcedure2(results);\r
-\r
- int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1;\r
-\r
- final Resource[] rootArray = roots.toArray(Resource.NONE);\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
-\r
- final int start = i * slice;\r
- final int end = Math.min(start + slice, rootArray.length);\r
-\r
- control.schedule(graph, i, new ControlProcedure() {\r
- @Override\r
- public void execute(AsyncReadGraph graph) {\r
- if (ignoreVirtual) {\r
- for (int index = start; index < end; index++) {\r
- dqs.forEachDirectPersistentStatement(graph, rootArray[index], proc);\r
- }\r
- } else {\r
- for (int index = start; index < end; index++) {\r
- dqs.forEachDirectStatement(graph, rootArray[index], proc);\r
- }\r
- }\r
- }\r
- });\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public int getFlags() {\r
- return 0;\r
- }\r
-\r
- }\r
- \r
- static class DomainProcessor2 {\r
-\r
- Serializer variantSerializer;\r
-\r
- int id = 0;\r
-\r
- Set<Resource> fringe = null;\r
- Set<Resource> exclusions = new HashSet<Resource>();\r
- Set<Resource> internalDomain = new HashSet<Resource>();\r
- Set<Resource> sharedExternalReferences = null;\r
- TIntSet sharedExternalIds = null;\r
- Set<Resource> sharedExternalFringe = null;\r
- Set<Resource> predicates = null;\r
- Set<Resource> isRelatedToPredicates = null;\r
- Set<Resource> sharedPredicates = null;\r
- TIntIntHashMap ids = null;\r
- Map<Resource, Statement> specials = null;\r
- Map<Resource, ExtentStatus> status = null;\r
- Map<Resource, WeakStatus> weakInverses = null;\r
- \r
-// final ArrayList<Double> priorityList = new ArrayList<Double>();\r
-\r
- private long composedObjectCounter = 0;\r
- private long fastInternalCounter = 0;\r
- private long parentExternalCounter = 0;\r
- private long fullInternalCounter = 0;\r
- private long fullExternalCounter = 0;\r
-\r
- private long startupTime = 0;\r
- private long expandTime = 0;\r
- private long fullResolveTime = 0;\r
- private long fastResolveTime = 0;\r
- private long otherStatementTime = 0;\r
- private long parentResolveTime = 0;\r
- private long extentSeedTime = 0;\r
- private long composedPredicateTime = 0;\r
- private long composedObjectTime = 0;\r
-\r
- public void expand(ReadGraph graph, Set<Resource> fringe, Collection<DirectStatements>[] expansion) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- Collection<Collection<DirectStatements>[]> fullExpansion = new ArrayList<Collection<DirectStatements>[]>();\r
- QueryControl control = graph.getService(QueryControl.class);\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
- expansion[i] = new ArrayList<DirectStatements>();\r
- }\r
- \r
- graph.syncRequest(new Expansion2(fringe, expansion));\r
-\r
- fringe.clear();\r
-\r
- expandTime += (System.nanoTime() - start);\r
-\r
- }\r
-\r
- public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule) throws DatabaseException {\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
-\r
- final Layer0 L0 = Layer0.getInstance(graph);\r
- \r
- long start = System.nanoTime();\r
-\r
- final ConcurrentLinkedQueue<Resource> composedResult = new ConcurrentLinkedQueue<Resource>();\r
- final ConcurrentLinkedQueue<Resource> singleResult = new ConcurrentLinkedQueue<Resource>();\r
- final ConcurrentLinkedQueue<Resource> sharedResult = new ConcurrentLinkedQueue<Resource>();\r
- final ConcurrentLinkedQueue<Pair<Resource, Resource>> singles = new ConcurrentLinkedQueue<Pair<Resource, Resource>>();\r
-\r
- // Discover singles\r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : schedule) {\r
- \r
- graph.forPossibleSuperrelation(predicate, new AsyncProcedure<Resource>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, final Resource single) {\r
- singles.add(Pair.make(predicate, single));\r
- }\r
-\r
- });\r
-\r
- graph.forHasStatement(predicate, L0.SharedRange, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, final Boolean shared) {\r
- if(shared) sharedResult.add(predicate);\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
- \r
- });\r
-\r
- // Determine singles\r
- final Set<Resource> singleSchedule = cs.createSet();\r
- for(Pair<Resource, Resource> pair : singles) {\r
- \r
- Resource single = pair.second;\r
- if(single != null && predicates.add(single)) singleSchedule.add(single);\r
- \r
- }\r
- \r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : singleSchedule) {\r
-\r
- graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Boolean strong) {\r
- if (strong) singleResult.add(predicate);\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
- \r
- });\r
-\r
- isRelatedToPredicates.addAll(singleResult);\r
- sharedPredicates.addAll(sharedResult);\r
-\r
- final Set<Resource> specialSchedule = cs.createSet();\r
- \r
- // Classify\r
- for(Pair<Resource, Resource> pair : singles) {\r
- \r
- Resource single = pair.second;\r
- if(single != null) {\r
- if(isRelatedToPredicates.contains(single)) {\r
- isRelatedToPredicates.add(pair.first);\r
- }\r
- } else {\r
- specialSchedule.add(pair.first);\r
- }\r
- \r
- }\r
- \r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : specialSchedule) {\r
-\r
- graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Boolean composed) {\r
- if (composed) composedResult.add(predicate);\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
- \r
- });\r
- \r
- isRelatedToPredicates.addAll(composedResult);\r
-\r
- composedPredicateTime += (System.nanoTime() - start);\r
- \r
- }\r
- \r
- private Set<Resource> strongInverseSet = new HashSet<Resource>();\r
-\r
- public void classifyPredicates(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- final Set<Resource> schedule = cs.createSet();\r
- final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);\r
- \r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll)\r
- for(Statement stm : stms) {\r
- \r
- Resource predicate = stm.getPredicate();\r
- \r
- if(predicates.add(predicate)) {\r
- Resource inverse = graph.getPossibleInverse(predicate);\r
- schedule.add(predicate);\r
- if(inverse != null) {\r
- newPredicates.put(predicate, inverse);\r
- if(predicates.add(inverse)) schedule.add(inverse);\r
- }\r
- \r
- }\r
- \r
- }\r
-\r
- classifyPredicates(graph, schedule);\r
-\r
- for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {\r
- // Inverse is strong => this has strong inverse\r
- if(isRelatedToPredicates.contains(entry.getValue())) {\r
- strongInverseSet.add(entry.getKey());\r
- }\r
- // This is strong => inverse has strong inverse\r
- if(isRelatedToPredicates.contains(entry.getKey())) {\r
- strongInverseSet.add(entry.getValue());\r
- }\r
- }\r
- \r
- }\r
-\r
- /*\r
- * Composed objects are internal. Mark them for expansion.\r
- */\r
-\r
- public void processFringe(ReadGraph graph, Collection<DirectStatements>[] expansion,\r
- ObjectOutputStream otherStatementsOutput, ObjectOutputStream valueOutput) throws DatabaseException, IOException {\r
-\r
- SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
- TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class);\r
-\r
- Layer0 L0 = Layer0.getInstance(graph);\r
- \r
- long start = System.nanoTime();\r
- \r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll) {\r
- \r
- Resource subject = stms.getSubject();\r
- \r
- boolean partOf = false;\r
- for(Statement stm : stms) {\r
- Resource predicate = stm.getPredicate();\r
- if(L0.PartOf.equals(predicate)) {\r
- partOf = true;\r
- break;\r
- }\r
- }\r
- \r
- ExtentStatus subjectStatus = status.get(subject);\r
- if(LOG && subjectStatus != null) log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);\r
- if(subjectStatus == ExtentStatus.EXTERNAL) continue;\r
- if(partOf && (subjectStatus == null) && graph.getPossibleURI(subject) != null) {\r
- \r
- status.put(subject, ExtentStatus.EXTERNAL);\r
- if(LOG) {\r
- String uri = graph.getPossibleURI(subject);\r
- if(uri == null) log("[EXTERNAL]: No URI for " + subject);\r
- else log("[EXTERNAL] " + uri);\r
- }\r
-\r
- // Check for SharedRange statements\r
- for(Statement stm : stms) {\r
- Resource predicate = stm.getPredicate();\r
- if(sharedPredicates.contains(predicate)) {\r
- sharedExternalFringe.add(stm.getObject());\r
- if(LOG) {\r
- log("[SHARED EXTERNAL FRINGE]: " + NameUtils.getSafeName(graph, stm.getObject()));\r
- }\r
- }\r
- }\r
- \r
- } else {\r
- \r
- boolean special = specials.containsKey(subject);\r
- if(LOG) {\r
- if(special) {\r
- log("[SPECIAL] " + NameUtils.getSafeName(graph, subject));\r
- }\r
- }\r
- \r
- status.put(subject, ExtentStatus.INTERNAL);\r
- if(LOG) log("[INTERNAL] " + NameUtils.getSafeName(graph, subject));\r
-\r
- int sId = support.getTransientId(subject);\r
-\r
- if(graph.hasValue(subject)) {\r
- Datatype dt = graph.getRelatedValue(subject, L0.HasDataType, Bindings.getBindingUnchecked(Datatype.class));\r
- Binding b = Bindings.getBinding(dt);\r
- Object _value = graph.getValue(subject, b);\r
- Variant variant = new Variant(b, _value);\r
- byte[] value = variantSerializer.serialize(variant);\r
- if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject));\r
- valueOutput.writeInt(sId);\r
- valueOutput.writeInt(value.length);\r
- assert (value.length > 0);\r
- valueOutput.write(value);\r
- }\r
- \r
- TIntArrayList stream = new TIntArrayList();\r
- \r
- for(Statement stm : stms) {\r
-\r
- if(special) {\r
- \r
-// System.err.println("stm=" + stm + " special=" + specials.get(subject));\r
- \r
- }\r
-\r
- Resource predicate = stm.getPredicate();\r
- Resource object = stm.getObject();\r
-\r
- ExtentStatus objectStatus = status.get(object);\r
-\r
- // Strong predicate\r
- if (isRelatedToPredicates.contains(predicate) && (objectStatus != ExtentStatus.EXCLUDED)) {\r
- \r
- int pId = support.getTransientId(predicate);\r
- int oId = support.getTransientId(object);\r
- \r
- if(LOG) {\r
- String s = NameUtils.getSafeName(graph, subject);\r
- String p = NameUtils.getSafeName(graph, predicate);\r
- String o = NameUtils.getSafeName(graph, object);\r
- log("related=" + s + " - " + p + " - " + o);\r
- }\r
- \r
- stream.add(pId);\r
- stream.add(oId);\r
- \r
- if(objectStatus == null)\r
- fringe.add(object);\r
- \r
- } else {\r
- \r
- // Weak predicate\r
- if(objectStatus == ExtentStatus.INTERNAL) {\r
-\r
- // The inverse is also weak (or there is no inverse)\r
- if(!strongInverseSet.contains(predicate)) {\r
-\r
- int pId = support.getTransientId(predicate);\r
- int oId = support.getTransientId(object);\r
- \r
- stream.add(pId);\r
- stream.add(oId);\r
- \r
- if(LOG) {\r
- String s = NameUtils.getSafeName(graph, subject);\r
- String p = NameUtils.getSafeName(graph, predicate);\r
- String o = NameUtils.getSafeName(graph, object);\r
- log("fully weak internal=" + s + " - " + p + " - " + o + " - " + objectStatus);\r
- }\r
- \r
- } else {\r
-\r
- if(LOG) {\r
- String s = NameUtils.getSafeName(graph, subject);\r
- String p = NameUtils.getSafeName(graph, predicate);\r
- String o = NameUtils.getSafeName(graph, object);\r
- log("strong inverse internals=" + s + " - " + p + " - " + o + " - " + objectStatus);\r
- }\r
- \r
- }\r
- \r
- } else {\r
-\r
- if(special) {\r
- \r
-// System.err.println("stm=" + stm + " special=" + specials.get(subject));\r
- \r
- Statement spec = specials.get(subject);\r
- \r
- // This statement can be specially treated\r
- if(stm.getPredicate().equals(spec.getPredicate()) && stm.getObject().equals(spec.getObject())) {\r
-\r
- int pId = support.getTransientId(predicate);\r
- int oId = support.getTransientId(object);\r
- \r
- if(LOG) {\r
- String s = NameUtils.getSafeName(graph, subject);\r
- String p = NameUtils.getSafeName(graph, predicate);\r
- String o = NameUtils.getSafeName(graph, object);\r
- log("special=" + s + " - " + p + " - " + o);\r
- }\r
- \r
- stream.add(pId);\r
- stream.add(oId);\r
- \r
- }\r
- \r
- } else {\r
- \r
- if(LOG) {\r
- String s = NameUtils.getSafeName(graph, subject);\r
- String p = NameUtils.getSafeName(graph, predicate);\r
- String o = NameUtils.getSafeName(graph, object);\r
- log("weak with unknown object=" + s + " - " + p + " - " + o + " - " + objectStatus);\r
- }\r
- \r
- }\r
- \r
- }\r
- \r
- }\r
- \r
- }\r
- \r
- if(!stream.isEmpty()) {\r
- otherStatementsOutput.writeInt(sId);\r
- otherStatementsOutput.writeInt(stream.size() / 2);\r
- for (int i = 0; i < stream.size(); i++)\r
- otherStatementsOutput.writeInt(stream.getQuick(i));\r
- }\r
- \r
- }\r
- \r
- }\r
-\r
- composedObjectTime += System.nanoTime() - start;\r
-\r
- }\r
-\r
- public void process(ReadGraph graph,\r
- ObjectOutputStream otherStatementsOutput,\r
- ObjectOutputStream valueOutput)\r
- throws DatabaseException, IOException {\r
-\r
- this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
-\r
-// System.err.println("Begin ConsistsOfProcess");\r
- \r
- /*\r
- * Browse all stm = (s, ConsistsOf, o)\r
- * � All internal URIs are found => from now on, if unidentified resource has PartOf it is external.\r
- * � All s are internal\r
- * � All o are internal\r
- * � All stm are included\r
- */\r
- for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, true)) {\r
- if (status.put(r, ExtentStatus.INTERNAL) == null) {\r
- String URI = graph.getPossibleURI(r);\r
- if(URI != null) log("URI INTERNAL " + URI);\r
- else log("URI has no URI for " + r);\r
- fringe.add(r);\r
- internalDomain.add(r);\r
- }\r
- }\r
- \r
- /*\r
- * This loop resolves the transitive closure of all p < IsRelatedTo such that p does not contain the SharedRange tag.\r
- * Such resources are guaranteed to be internal.\r
- */\r
- while(!fringe.isEmpty()) {\r
-\r
-// System.err.println("Process Fringe with " + fringe.size() + ".");\r
-\r
- Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];\r
-\r
-// System.err.println("-expand");\r
-\r
- // Expand fringe\r
- expand(graph, fringe, expansion);\r
-\r
- /*\r
- * classify all p\r
- * -IsRelatedTo\r
- * -SharedRange\r
- * -Internal / External\r
- */\r
- \r
-// System.err.println("-classify");\r
-\r
- classifyPredicates(graph, expansion);\r
-\r
- /*\r
- * for stms in [stms] \r
- * if stms contains predicate PartOf => s is External\r
- * else s is Internal\r
- * for all stm=(s,p,o) in stms\r
- * if p <R IsRelatedTo => stm is included\r
- * Fringe <- o\r
- */\r
-\r
-// System.err.println("-process");\r
-\r
- processFringe(graph, expansion, otherStatementsOutput, valueOutput);\r
- \r
- }\r
-\r
- while(!sharedExternalFringe.isEmpty()) {\r
-\r
- Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];\r
- expand(graph, sharedExternalFringe, expansion);\r
- \r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll) {\r
-\r
- Resource subject = stms.getSubject();\r
- ExtentStatus subjectStatus = status.get(subject);\r
- \r
- if(ExtentStatus.INTERNAL == subjectStatus) {\r
-\r
- if(internalDomain.contains(subject)) continue;\r
- \r
- status.put(subject, ExtentStatus.EXTERNAL);\r
- sharedExternalReferences.add(subject);\r
-\r
- if(LOG) {\r
- log("[SHARED EXTERNAL REFERENCE]: " + NameUtils.getSafeName(graph, subject));\r
- }\r
- \r
- for(Statement stm : stms) {\r
- Resource predicate = stm.getPredicate();\r
- if (isRelatedToPredicates.contains(predicate)) {\r
- sharedExternalFringe.add(stm.getObject());\r
- }\r
- }\r
- \r
- }\r
- }\r
- \r
- }\r
-\r
- if (PROFILE) {\r
- System.out.println(composedObjectCounter + " " + fastInternalCounter\r
- + " " + parentExternalCounter + " "\r
- + fullExternalCounter + " " + fullInternalCounter);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- static class DomainProcessor {\r
-\r
- Serializer variantSerializer;\r
-\r
- int id = 0;\r
-\r
- Set<Resource> predicates = null;\r
- Set<Resource> composedPredicates = null;\r
- Set<Resource> expansionSeeds = null;\r
- Map<Resource, Integer> ids = null;\r
- Map<Resource, ExtentStatus> status = null;\r
- Map<Resource, WeakStatus> weakInverses = null;\r
- \r
- final Set<SubgraphAdvisor> advisors;\r
- final ArrayList<Double> priorityList = new ArrayList<Double>();\r
-\r
- private long composedObjectCounter = 0;\r
- private long fastInternalCounter = 0;\r
- private long parentExternalCounter = 0;\r
- private long fullInternalCounter = 0;\r
- private long fullExternalCounter = 0;\r
-\r
- private long startupTime = 0;\r
- private long expandTime = 0;\r
- private long fullResolveTime = 0;\r
- private long fastResolveTime = 0;\r
- private long otherStatementTime = 0;\r
- private long parentResolveTime = 0;\r
- private long extentSeedTime = 0;\r
- private long composedPredicateTime = 0;\r
- private long composedObjectTime = 0;\r
-\r
- public DomainProcessor(Set<SubgraphAdvisor> advisors) {\r
- this.advisors = advisors;\r
- HashSet<Double> prioritySet = new HashSet<Double>();\r
- for (SubgraphAdvisor advisor : advisors)\r
- prioritySet.add(advisor.priority());\r
- priorityList.addAll(prioritySet);\r
- Collections.sort(priorityList);\r
- }\r
-\r
- public void expand(ReadGraph graph, Collection<DirectStatements>[] expansion, Set<Resource> schedule) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
-// if (DEBUG)\r
-// System.out.println("expanding " + expansionSeeds.size() + " resources.");\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
-// final Collection<DirectStatements>[] results = new ArrayList[control.getAmountOfQueryThreads()];\r
- final ArrayList<Resource>[] listElements = new ArrayList[control.getAmountOfQueryThreads()];\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
-// results[i] = new ArrayList<DirectStatements>();\r
- listElements[i] = new ArrayList<Resource>();\r
- }\r
- \r
-// if(DEBUG) {\r
-// for(Resource r : expansionSeeds)\r
-// System.out.println("Expanding " + NameUtils.getSafeName(graph, r, true));\r
-// }\r
- \r
- graph.syncRequest(new Expansion(expansionSeeds, expansion, listElements));\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
-// for (DirectStatements s : results[i]) {\r
-// expansion.put(s.getSubject(), s);\r
-// }\r
- for (Resource s : listElements[i]) {\r
- schedule.add(s);\r
-// if(status.put(s, ExtentStatus.INTERNAL) == null) {\r
-// ids.put(s, id++);\r
-// }\r
- }\r
- }\r
-\r
- expandTime += (System.nanoTime() - start);\r
-\r
- }\r
-\r
- public void extractComposedPredicates(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- \r
- final ConcurrentLinkedQueue<Resource> composedResult = new ConcurrentLinkedQueue<Resource>();\r
- final ConcurrentLinkedQueue<Resource> singleResult = new ConcurrentLinkedQueue<Resource>();\r
- final ConcurrentLinkedQueue<Pair<Resource, Resource>> singles = new ConcurrentLinkedQueue<Pair<Resource, Resource>>();\r
- \r
- final Set<Resource> schedule = cs.createSet();\r
- \r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll)\r
- for(Statement stm : stms) {\r
- Resource predicate = stm.getPredicate();\r
- if(predicates.add(predicate)) schedule.add(predicate); \r
- }\r
-\r
- // Discover singles\r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : schedule) {\r
- \r
- graph.forPossibleSuperrelation(predicate, new AsyncProcedure<Resource>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, final Resource single) {\r
- singles.add(Pair.make(predicate, single));\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
- \r
- });\r
-\r
- // Determine singles\r
- final Set<Resource> singleSchedule = cs.createSet();\r
- for(Pair<Resource, Resource> pair : singles) {\r
- \r
- Resource single = pair.second;\r
- if(single != null && predicates.add(single)) singleSchedule.add(single);\r
- \r
- }\r
- \r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : singleSchedule) {\r
-\r
- graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Boolean composed) {\r
- if (composed) singleResult.add(predicate);\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
- \r
- });\r
-\r
- composedPredicates.addAll(singleResult);\r
-\r
- final Set<Resource> specialSchedule = cs.createSet();\r
- \r
- // Classify\r
- for(Pair<Resource, Resource> pair : singles) {\r
- \r
- Resource single = pair.second;\r
- if(single != null) {\r
- if(composedPredicates.contains(single)) {\r
- composedPredicates.add(pair.first);\r
- }\r
- } else {\r
- specialSchedule.add(pair.first);\r
- }\r
- \r
- }\r
- \r
- graph.syncRequest(new AsyncReadRequest() {\r
-\r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource predicate : specialSchedule) {\r
-\r
- graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Boolean composed) {\r
- if (composed) composedResult.add(predicate);\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
- }\r
- \r
- });\r
- \r
- composedPredicates.addAll(composedResult);\r
-\r
- composedPredicateTime += (System.nanoTime() - start);\r
-\r
- }\r
-\r
- /*\r
- * Composed objects are internal. Mark them for expansion.\r
- */\r
-\r
- public void collectComposedObjects(ReadGraph graph, Collection<DirectStatements>[] expansion, Set<Resource> typeTodo, Set<Resource> objectTodo,\r
- Set<Resource> predicateTodo) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- Layer0 l0 = Layer0.getInstance(graph);\r
- \r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll)\r
- for(Statement stm : stms) {\r
-\r
- Resource predicate = stm.getPredicate();\r
- Resource object = stm.getObject();\r
- \r
- if (composedPredicates.contains(predicate)) {\r
- \r
- ExtentStatus existing = status.put(object, ExtentStatus.INTERNAL);\r
- if(existing == null) {\r
- ids.put(object, id++);\r
- composedObjectCounter++;\r
- expansionSeeds.add(object);\r
-// System.err.println("internal: " + NameUtils.getSafeName(graph, object, true));\r
- if(LOG) log("[INTERNAL] (composed object) " + NameUtils.getSafeName(graph, object, true));\r
- } else if (existing == ExtentStatus.EXCLUDED) {\r
- System.err.println("preExcluded: " + NameUtils.getSafeName(graph, object, true));\r
- status.put(object, ExtentStatus.EXCLUDED);\r
- } else if (existing == ExtentStatus.EXTERNAL) {\r
- System.err.println("preExternal: " + NameUtils.getSafeName(graph, object, true));\r
- status.put(object, ExtentStatus.EXTERNAL);\r
- }\r
- \r
- } else {\r
-\r
-// System.err.println("internal2: " + NameUtils.getSafeName(graph, object, true));\r
- \r
- if (!status.containsKey(object)) {\r
- if (l0.InstanceOf.equalsResource(predicate)) {\r
- typeTodo.add(object);\r
- } else {\r
- objectTodo.add(object);\r
- }\r
- }\r
-\r
- if (!status.containsKey(predicate)) {\r
- predicateTodo.add(predicate);\r
- }\r
- \r
- }\r
- }\r
-\r
- composedObjectTime += System.nanoTime() - start;\r
-\r
- }\r
-\r
- public void writeOtherStatements(ReadGraph graph, Collection<Collection<DirectStatements>[]> expansion, ObjectOutputStream composedStatementsOutput, ObjectOutputStream otherStatementsOutput,\r
- ObjectOutputStream valueOutput) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- Layer0 l0 = Layer0.getInstance(graph);\r
- SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
- TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class);\r
-\r
- TIntArrayList other = new TIntArrayList();\r
- TIntArrayList composed = new TIntArrayList();\r
-\r
- try {\r
-\r
- for (Collection<DirectStatements>[] colls : expansion)\r
- for (Collection<DirectStatements> coll : colls)\r
- for (DirectStatements stms : coll) {\r
-\r
- Resource subject = stms.getSubject();\r
- composed.resetQuick();\r
-\r
- int sId = support.getTransientId(subject);\r
-\r
- composedStatementsOutput.writeInt(sId);\r
-\r
- if(graph.hasValue(subject)) {\r
- Datatype dt = graph.getRelatedValue(subject, l0.HasDataType, Bindings.getBindingUnchecked(Datatype.class));\r
- Binding b = Bindings.getBinding(dt);\r
- Object _value = graph.getValue(subject, b);\r
- Variant variant = new Variant(b, _value);\r
- byte[] value = variantSerializer.serialize(variant);\r
- if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject));\r
- valueOutput.writeInt(sId);\r
- valueOutput.writeInt(value.length);\r
- assert (value.length > 0);\r
- valueOutput.write(value);\r
- }\r
-\r
- for (Statement s : stms) {\r
-\r
- Resource object = s.getObject();\r
- Resource predicate = s.getPredicate();\r
-\r
- ExtentStatus objectStatus = status.get(object); \r
- \r
- if(objectStatus == ExtentStatus.INTERNAL) {\r
- composed.add(support.getTransientId(predicate));\r
- composed.add(support.getTransientId(object));\r
- if(LOG) log("[COMPOSED] (internal object) " + NameUtils.toIdString(graph, s));\r
- } else if (l0.InstanceOf.equalsResource(predicate)) {\r
- composed.add(support.getTransientId(predicate));\r
- composed.add(support.getTransientId(object));\r
- if(LOG) log("[COMPOSED] (instanceOf) " + NameUtils.toIdString(graph, s));\r
- } else if (l0.SubrelationOf.equalsResource(predicate)) {\r
- composed.add(support.getTransientId(predicate));\r
- composed.add(support.getTransientId(object));\r
- if(LOG) log("[COMPOSED] (subrelationOf) " + NameUtils.toIdString(graph, s));\r
- } else {\r
- if(objectStatus == ExtentStatus.EXTERNAL) {\r
- if(DEBUG)\r
- System.out.println("other " + NameUtils.toIdString(graph, s));\r
- //System.out.println("other.add " + predicate + " - " + object);\r
- other.add(support.getTransientId(predicate));\r
- other.add(support.getTransientId(object));\r
- if(LOG) log("[OTHER] (object is external) " + NameUtils.toIdString(graph, s));\r
- }\r
- }\r
-\r
- }\r
-\r
- if(!other.isEmpty()) {\r
- otherStatementsOutput.writeInt(sId);\r
- otherStatementsOutput.writeInt(other.size() / 2);\r
- for (int i = 0; i < other.size(); i++)\r
- otherStatementsOutput.writeInt(other.getQuick(i));\r
- other.resetQuick();\r
- }\r
-\r
- composedStatementsOutput.writeInt(composed.size() / 2);\r
- for (int i = 0; i < composed.size(); i++)\r
- composedStatementsOutput.writeInt(composed.getQuick(i));\r
-\r
- }\r
-\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
-\r
- otherStatementTime += (System.nanoTime() - start);\r
-\r
- }\r
-\r
- boolean hasStrictParents(ReadGraph g, Resource r)\r
- throws DatabaseException {\r
- if (g.getPossibleURI(r) != null)\r
- return true;\r
- return false;\r
- }\r
-\r
- public boolean getExpansionSeedsFromExtents(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- final ConcurrentLinkedQueue<Resource> accepts = new ConcurrentLinkedQueue<Resource>();\r
-\r
- /*\r
- * Determine statements which could accept statements with todo\r
- * objects\r
- */\r
- search: for (Double priority : priorityList) {\r
- \r
- for (final SubgraphAdvisor advisor : advisors) {\r
- \r
- if (advisor.priority() > 0)\r
- continue;\r
- \r
- if (advisor.priority() == priority) {\r
- \r
- graph.syncRequest(new ReadRequest() {\r
-\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
-\r
- for (Collection<DirectStatements> coll : expansion)\r
- for (DirectStatements stms : coll)\r
- for(final Statement stm : stms) {\r
-\r
- advisor.advise(graph, stm, new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Boolean accept) {\r
- if (accept) {\r
- accepts.add(stm.getObject());\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
- }\r
- if (!accepts.isEmpty())\r
- break search;\r
- }\r
- }\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- Set<Resource> schedule = cs.createSet();\r
- for (Resource r : accepts) {\r
- if(!status.containsKey(r))\r
- schedule.add(r);\r
- }\r
-\r
- extentSeedTime += (System.nanoTime() - start);\r
-\r
- if (schedule.isEmpty())\r
- return false;\r
-\r
- fastResolve(graph, schedule);\r
- uriResolve(graph, schedule);\r
- fullResolve(graph, schedule, "accepts");\r
-\r
- return true;\r
-\r
- }\r
-\r
- ConcurrentLinkedQueue<Resource> fastInternals = new ConcurrentLinkedQueue<Resource>();\r
-\r
- public void fastResolve(ReadGraph graph, final Set<Resource> rs)\r
- throws DatabaseException {\r
- // This collects and resolves weaks\r
- if(fastResolveLoop(graph, rs))\r
- // Weaks are now resolved\r
- fastResolveLoop(graph, rs);\r
- }\r
-\r
- public boolean fastResolveLoop(ReadGraph graph, final Set<Resource> rs)\r
- throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- final ConcurrentLinkedQueue<Resource> weakSchedule = new ConcurrentLinkedQueue<Resource>();\r
-\r
- graph.syncRequest(new AsyncRead<Boolean>() {\r
-\r
- @Override\r
- public int threadHash() {\r
- return hashCode();\r
- }\r
- \r
- @Override\r
- public int getFlags() {\r
- return 0;\r
- }\r
-\r
- @Override\r
- public void perform(AsyncReadGraph graph,\r
- AsyncProcedure<Boolean> procedure) {\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
- final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);\r
-\r
- int slice = (int) (rs.size() / control\r
- .getAmountOfQueryThreads()) + 1;\r
-\r
- final Resource[] rootArray = rs.toArray(Resource.NONE);\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
-\r
- final int start = i * slice;\r
- final int end = Math.min(start + slice,\r
- rootArray.length);\r
-\r
- control.schedule(graph, i, new ControlProcedure() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph) {\r
-\r
- for (int index = start; index < end; index++) {\r
-\r
- final Resource r = rootArray[index];\r
-\r
- //if (status.containsKey(r)) continue;\r
-\r
- graph.asyncRequest(new FastInternalRequest(dqs, r, status, weakInverses, weakSchedule),new AsyncProcedure<Boolean>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph,Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph,Boolean isInternal) {\r
- if (isInternal) {\r
- fastInternals.add(r);\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- procedure.execute(graph, true);\r
-\r
- }\r
-\r
- });\r
-\r
- if (!weakSchedule.isEmpty()) {\r
- THashSet<Resource> weaks = new THashSet<Resource>(weakSchedule); \r
- if (CLASSIFY_LOG)\r
- for (Resource p : weakSchedule)\r
- System.out.println("classify "\r
- + NameUtils.getSafeName(graph, p));\r
- graph.syncRequest(new ClassifyStatementsRequest(weaks, weakInverses));\r
- } \r
-\r
- for (Resource r : fastInternals) {\r
- rs.remove(r);\r
- if (status.put(r, ExtentStatus.INTERNAL) == null) {\r
- if(LOG) log("[INTERNAL] (fast) " + NameUtils.getSafeName(graph, r, true));\r
- ids.put(r, id++);\r
- fastInternalCounter++;\r
- expansionSeeds.add(r);\r
- }\r
- }\r
-\r
- fastResolveTime += (System.nanoTime() - start);\r
- \r
- return !weakSchedule.isEmpty(); \r
-\r
- }\r
-\r
- private ExtentStatus resolveExtent(ReadGraph graph, Resource r, Map<Resource, ExtentStatus> statuses, Set<Resource> expansionSeeds, THashSet<Resource> pending,\r
- ArrayList<Resource> stack) throws DatabaseException {\r
-\r
- ExtentStatus current = statuses.get(r);\r
- if(current != null) return current;\r
- \r
- if (pending.contains(r))\r
- return ExtentStatus.PENDING;\r
-\r
- // In order to break cyclic dependencies\r
- pending.add(r);\r
-\r
- if (PARENT_DEBUG)\r
- System.out.println("resolveExtent "\r
- + NameUtils.getSafeName(graph, r));\r
-\r
- ExtentStatus status = ExtentStatus.INTERNAL;\r
- for (Resource p : getParents(graph, r)) {\r
- if (PARENT_DEBUG) {\r
- ExtentStatus ps = statuses.get(p);\r
- System.out.println(" parent " + NameUtils.getSafeName(graph, p) + "(" + ps + ")");\r
- }\r
- switch (resolveExtent(graph, p, statuses,\r
- expansionSeeds, pending, stack)) {\r
- case EXTERNAL:\r
- return ExtentStatus.EXTERNAL;\r
- case PENDING:\r
- status = ExtentStatus.PENDING;\r
- }\r
- }\r
- if (status == ExtentStatus.INTERNAL) {\r
- pending.remove(r);\r
- stack.add(r);\r
- if (DEBUG)\r
- System.out.println(NameUtils.getSafeName(graph, r, true)\r
- + " is internal.");\r
- }\r
- return status;\r
- }\r
-\r
- public void uriResolve(ReadGraph graph, final Set<Resource> todo)\r
- throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- for(Resource r : todo) System.out.println("uriResolve " +\r
- NameUtils.getSafeName(graph, r));\r
-\r
- final ConcurrentSkipListSet<Resource> found = new ConcurrentSkipListSet<Resource>();\r
-\r
- graph.syncRequest(new AsyncReadRequest() {\r
- \r
- @Override\r
- public void run(AsyncReadGraph graph) {\r
-\r
- for (final Resource r : todo) {\r
-\r
- // System.out.println("uriresolve before " + r);\r
-\r
- if (status.containsKey(r)) continue;\r
-\r
- // System.out.println("uriresolve " + r);\r
-\r
- graph.forURI(r, new AsyncProcedure<String>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, String uri) {\r
-\r
- if (uri != null) {\r
-\r
- // System.out.println("uriresolve has uri "\r
- // + r);\r
-\r
- if(found.add(r)) {\r
- parentExternalCounter++;\r
- }\r
-\r
- } else {\r
-\r
- // System.out.println("uriresolve ask inverse "\r
- // + r);\r
-\r
- graph.forPossibleInverse(r, new AsyncProcedure<Resource>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Resource inverse) {\r
-\r
- if (inverse != null) {\r
-\r
- graph.forURI(inverse, new AsyncProcedure<String>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- throwable.printStackTrace();\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, String uri) {\r
-\r
- if (uri != null) {\r
-\r
- if(found.add(r)) {\r
- parentExternalCounter++;\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
- \r
- });\r
-\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- todo.removeAll(found);\r
- for(Resource r : found) {\r
- status.put(r, ExtentStatus.EXTERNAL);\r
- if(LOG) log("[EXTERNAL] (uriResolve) " + NameUtils.getSafeName(graph, r, true));\r
- }\r
- \r
- parentResolveTime += System.nanoTime() - start;\r
-\r
- }\r
-\r
- public void fullResolve(ReadGraph graph, Collection<Resource> rs,\r
- String koss) throws DatabaseException {\r
-\r
- long start = System.nanoTime();\r
-\r
- for (final Resource r : rs) {\r
-\r
- if(status.containsKey(r)) continue;\r
-\r
- THashSet<Resource> pending = new THashSet<Resource>();\r
- ArrayList<Resource> stack = new ArrayList<Resource>();\r
-\r
- ExtentStatus s = resolveExtent(graph, r, status, expansionSeeds, pending, stack);\r
- if (ExtentStatus.INTERNAL == s || ExtentStatus.PENDING == s) {\r
- if (status.put(r, ExtentStatus.INTERNAL) == null) {\r
- if(LOG) log("[INTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true));\r
- ids.put(r, id++);\r
- fullInternalCounter++;\r
- expansionSeeds.add(r);\r
- }\r
- }\r
- if (ExtentStatus.EXTERNAL == s) {\r
- if (status.put(r, ExtentStatus.EXTERNAL) == null) {\r
- if(LOG) log("[EXTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true));\r
- fullExternalCounter++;\r
- }\r
- }\r
-\r
- }\r
-\r
- fullResolveTime += (System.nanoTime() - start);\r
-\r
- }\r
-\r
- public void process(ReadGraph graph,\r
- ObjectOutputStream composedStatementsOutput,\r
- ObjectOutputStream otherStatementsOutput,\r
- ObjectOutputStream valueOutput)\r
- throws DatabaseException {\r
-\r
- this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
-\r
- Set<Resource> typeTodo = cs.createSet();\r
- Set<Resource> objectTodo = cs.createSet();\r
- Set<Resource> predicateTodo = cs.createSet();\r
- \r
- Collection<Collection<DirectStatements>[]> fullExpansion = new ArrayList<Collection<DirectStatements>[]>();\r
- \r
- do {\r
-\r
- QueryControl control = graph.getService(QueryControl.class);\r
- Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];\r
- for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {\r
- expansion[i] = new ArrayList<DirectStatements>();\r
- }\r
- \r
- // Expand expansionSeeds\r
- expand(graph, expansion, objectTodo);\r
- \r
- // Start collecting new seeds\r
- expansionSeeds = cs.createSet();\r
-\r
- // Collect predicates which are <R IsComposedOf\r
- extractComposedPredicates(graph, expansion);\r
-\r
- // Make composed objects internal and make sure they are further\r
- // expanded\r
- collectComposedObjects(graph, expansion, typeTodo, objectTodo, predicateTodo);\r
-\r
- /*\r
- * Use the expansion seed heuristic to find new resources to\r
- * expand before full analysis.\r
- */\r
- getExpansionSeedsFromExtents(graph, expansion);\r
-\r
- fullExpansion.add(expansion);\r
-\r
- } while (!expansionSeeds.isEmpty());\r
-\r
- fastResolve(graph, objectTodo);\r
- uriResolve(graph, predicateTodo);\r
- uriResolve(graph, objectTodo);\r
- uriResolve(graph, typeTodo);\r
- fullResolve(graph, objectTodo, "objectTodo");\r
- fullResolve(graph, predicateTodo, "predicateTodo");\r
- fullResolve(graph, typeTodo, "typeTodo");\r
-\r
- writeOtherStatements(graph, fullExpansion, composedStatementsOutput, otherStatementsOutput, valueOutput);\r
-\r
- if (PROFILE) {\r
- System.out.println(composedObjectCounter + " " + fastInternalCounter\r
- + " " + parentExternalCounter + " "\r
- + fullExternalCounter + " " + fullInternalCounter);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- public static void getDomain2(ReadGraph graph, TIntIntHashMap ids,\r
- Collection<Resource> roots, Map<Resource, ExtentStatus> preStatus,\r
- Map<Resource, Statement> specials,\r
- ObjectOutputStream otherStatementsOutput,\r
- ObjectOutputStream valueOutput,\r
- TreeMap<String, Variant> extensions,\r
- TIntHashSet excludedShared) throws DatabaseException {\r
-\r
- ITask task = ThreadLogger.getInstance().begin("getDomain2");\r
-\r
- final DomainProcessor2 processor = new DomainProcessor2();\r
- \r
- processor.startupTime = System.nanoTime();\r
-\r
- Layer0 l0 = Layer0.getInstance(graph);\r
- \r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
-\r
- processor.ids = ids;\r
- processor.specials = specials;\r
- processor.status = cs.createMap(ExtentStatus.class);\r
- processor.weakInverses = cs.createMap(WeakStatus.class);\r
- processor.predicates = cs.createSet();\r
- processor.isRelatedToPredicates = cs.createSet();\r
- processor.sharedPredicates = cs.createSet();\r
-// processor.expansionSeeds = cs.createSet();\r
-\r
- for(Map.Entry<Resource, ExtentStatus> entry : preStatus.entrySet()) {\r
- processor.status.put(entry.getKey(), entry.getValue());\r
- if(ExtentStatus.EXCLUDED.equals(entry.getValue())) processor.exclusions.add(entry.getKey());\r
- }\r
- \r
-// for (Resource r : excluded) {\r
-// processor.status.put(r, ExtentStatus.EXCLUDED);\r
-// }\r
- \r
- Resource rootLibrary = graph.getResource("http:/");\r
- \r
- if (!roots.contains(rootLibrary))\r
- processor.status.put(rootLibrary, ExtentStatus.EXTERNAL);\r
-\r
- for (Resource root : roots) {\r
- processor.status.put(root, ExtentStatus.INTERNAL);\r
- //processor.ids.put(support.getTransientId(root), processor.ids.size());\r
- for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) {\r
- processor.status.put(owner, ExtentStatus.EXTERNAL);\r
- }\r
- }\r
-\r
- processor.startupTime = System.nanoTime() - processor.startupTime;\r
-\r
- processor.fringe = new HashSet<Resource>();\r
- processor.fringe.addAll(roots);\r
- \r
- processor.internalDomain.addAll(roots);\r
- \r
- processor.sharedExternalReferences = new HashSet<Resource>();\r
- processor.sharedExternalFringe = new HashSet<Resource>();\r
-\r
- try {\r
- \r
- processor.process(graph, otherStatementsOutput, valueOutput);\r
- \r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- \r
- for(Resource r : processor.sharedExternalReferences) excludedShared.add(support.getTransientId(r));\r
- \r
- ClusteringSupport cls = graph.getService(ClusteringSupport.class);\r
- TLongObjectHashMap<TIntArrayList> clusterMap = new TLongObjectHashMap<TIntArrayList>();\r
- for(Map.Entry<Resource, ExtentStatus> entry : processor.status.entrySet()) {\r
- if(ExtentStatus.INTERNAL == entry.getValue()) {\r
- \r
- long cluster = cls.getCluster(entry.getKey());\r
- TIntArrayList list = clusterMap.get(cluster);\r
- if(list == null) {\r
- list = new TIntArrayList();\r
- clusterMap.put(cluster, list);\r
- }\r
- list.add(support.getTransientId(entry.getKey()));\r
- \r
- }\r
- }\r
- final TIntArrayList clustering = new TIntArrayList();\r
- clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {\r
- \r
- @Override\r
- public boolean execute(long cluster, TIntArrayList b) {\r
- clustering.add(b.size());\r
- b.forEach(new TIntProcedure() {\r
- \r
- @Override\r
- public boolean execute(int rId) {\r
- processor.ids.put(rId, processor.id++);\r
- return true;\r
- }\r
- \r
- });\r
- return true;\r
- }\r
- \r
- });\r
- \r
- extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray()));\r
- \r
- long total = processor.startupTime + processor.expandTime\r
- + processor.composedPredicateTime\r
- + processor.composedObjectTime + processor.extentSeedTime\r
- + processor.fullResolveTime + processor.fastResolveTime + \r
- + processor.parentResolveTime + processor.otherStatementTime;\r
-\r
- if (PROFILE) {\r
- System.out.println("startup took " + 1e-9 * processor.startupTime\r
- + "s.");\r
- System.out.println("expand took " + 1e-9 * processor.expandTime\r
- + "s.");\r
- System.out.println("composedPredicates took " + 1e-9\r
- * processor.composedPredicateTime + "s.");\r
- System.out.println("composedObjects took " + 1e-9\r
- * processor.composedObjectTime + "s.");\r
- System.out.println("extentSeeding took " + 1e-9\r
- * processor.extentSeedTime + "s.");\r
- System.out.println("fullResolve took " + 1e-9\r
- * processor.fullResolveTime + "s.");\r
- System.out.println("fastResolve took " + 1e-9\r
- * processor.fastResolveTime + "s.");\r
- System.out.println("parentResolve took " + 1e-9\r
- * processor.parentResolveTime + "s.");\r
- System.out.println("otherStatements took " + 1e-9\r
- * processor.otherStatementTime + "s.");\r
- System.out.println("total " + 1e-9 * total + "s.");\r
- }\r
-\r
- task.finish();\r
-\r
- \r
- }\r
-\r
- \r
- public static void getDomain(ReadGraph graph, Map<Resource, Integer> ids,\r
- Collection<Resource> roots, Map<Resource, ExtentStatus> preStatus, Set<SubgraphAdvisor> advisors,\r
- ObjectOutputStream composedStatementsOutput,\r
- ObjectOutputStream otherStatementsOutput,\r
- ObjectOutputStream valueOutput) throws DatabaseException {\r
-\r
- ITask task = ThreadLogger.getInstance().begin("getDomain");\r
-\r
- DomainProcessor processor = new DomainProcessor(advisors);\r
- \r
- processor.startupTime = System.nanoTime();\r
-\r
- Layer0 l0 = Layer0.getInstance(graph);\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
-\r
- processor.ids = ids;\r
- processor.status = cs.createMap(ExtentStatus.class);\r
- processor.weakInverses = cs.createMap(WeakStatus.class);\r
- processor.predicates = cs.createSet();\r
- processor.composedPredicates = cs.createSet();\r
- processor.expansionSeeds = cs.createSet();\r
-\r
- for(Map.Entry<Resource, ExtentStatus> entry : preStatus.entrySet()) {\r
- processor.status.put(entry.getKey(), entry.getValue());\r
- }\r
-\r
-// for (Resource r : excluded) {\r
-// processor.status.put(r, ExtentStatus.EXCLUDED);\r
-// }\r
- \r
- if (!roots.contains(graph.getRootLibrary()))\r
- processor.status.put(graph.getRootLibrary(), ExtentStatus.EXTERNAL);\r
-\r
- for (Resource root : roots) {\r
- processor.status.put(root, ExtentStatus.INTERNAL);\r
- processor.ids.put(root, processor.id++);\r
- for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) {\r
- processor.status.put(owner, ExtentStatus.EXTERNAL);\r
- }\r
- }\r
-\r
-\r
- processor.expansionSeeds.addAll(roots);\r
-\r
- processor.startupTime = System.nanoTime() - processor.startupTime;\r
-\r
- while (!processor.expansionSeeds.isEmpty()) {\r
-\r
- processor.process(graph, composedStatementsOutput,\r
- otherStatementsOutput, valueOutput);\r
-\r
- }\r
-\r
- long total = processor.startupTime + processor.expandTime\r
- + processor.composedPredicateTime\r
- + processor.composedObjectTime + processor.extentSeedTime\r
- + processor.fullResolveTime + processor.fastResolveTime + \r
- + processor.parentResolveTime + processor.otherStatementTime;\r
-\r
- if (PROFILE) {\r
- System.out.println("startup took " + 1e-9 * processor.startupTime\r
- + "s.");\r
- System.out.println("expand took " + 1e-9 * processor.expandTime\r
- + "s.");\r
- System.out.println("composedPredicates took " + 1e-9\r
- * processor.composedPredicateTime + "s.");\r
- System.out.println("composedObjects took " + 1e-9\r
- * processor.composedObjectTime + "s.");\r
- System.out.println("extentSeeding took " + 1e-9\r
- * processor.extentSeedTime + "s.");\r
- System.out.println("fullResolve took " + 1e-9\r
- * processor.fullResolveTime + "s.");\r
- System.out.println("fastResolve took " + 1e-9\r
- * processor.fastResolveTime + "s.");\r
- System.out.println("parentResolve took " + 1e-9\r
- * processor.parentResolveTime + "s.");\r
- System.out.println("otherStatements took " + 1e-9\r
- * processor.otherStatementTime + "s.");\r
- System.out.println("total " + 1e-9 * total + "s.");\r
- }\r
-\r
- task.finish();\r
-\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.layer0.util;
+
+import gnu.trove.list.array.TIntArrayList;
+import gnu.trove.map.hash.TIntIntHashMap;
+import gnu.trove.map.hash.TLongObjectHashMap;
+import gnu.trove.procedure.TIntProcedure;
+import gnu.trove.procedure.TLongObjectProcedure;
+import gnu.trove.set.TIntSet;
+import gnu.trove.set.hash.THashSet;
+import gnu.trove.set.hash.TIntHashSet;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.Databoard;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.mutable.Variant;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.DirectStatements;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.Statement;
+import org.simantics.db.common.request.AsyncReadRequest;
+import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.common.request.ResourceAsyncRead;
+import org.simantics.db.common.utils.NameUtils;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.layer0.adapter.SubgraphAdvisor;
+import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.request.AsyncRead;
+import org.simantics.db.service.ClusteringSupport;
+import org.simantics.db.service.CollectionSupport;
+import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.QueryControl;
+import org.simantics.db.service.QueryControl.ControlProcedure;
+import org.simantics.db.service.SerialisationSupport;
+import org.simantics.db.service.StatementSupport;
+import org.simantics.db.service.TransferableGraphSupport;
+import org.simantics.graph.representation.Extensions;
+import org.simantics.layer0.Layer0;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+
+public class Subgraphs {
+
+ public static String LOG_FILE = "export.log";
+ final static private boolean LOG = false;
+ final static private boolean DEBUG = false;
+ final static private boolean PARENT_DEBUG = DEBUG | false;
+ final static private boolean EXTERNAL_DEBUG = DEBUG | false;
+ final static private boolean ADVISOR_LOG = LOG & false;
+ final static private boolean EXPANSION_LOG = LOG & false;
+ final static private boolean INTERNAL_LOG = LOG & false;
+ final static private boolean COMPOSED_LOG = LOG & false;
+ final static private boolean RESOLVE_LOG = LOG & false;
+ final static private boolean CLASSIFY_LOG = LOG & false;
+ final static private boolean EXTERNAL_LOG = LOG & false;
+ final static private boolean PROFILE = false;
+
+ static enum WeakStatus {
+ STRONG, WEAK
+ }
+
+
+ static DataOutput log;
+
+ static {
+
+ if (LOG) {
+ try {
+ FileOutputStream stream = new FileOutputStream(LOG_FILE, false);
+ log = new DataOutputStream(stream);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private static void log(String line) {
+ if (LOG) {
+ try {
+ log.write((line + "\n").getBytes());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static Collection<Resource> getParents(ReadGraph g, Resource r)
+ throws DatabaseException {
+ return getParents(g, r, false);
+ }
+
+ static class FastInternalRequest extends ResourceAsyncRead<Boolean> {
+
+ final DirectQuerySupport dqs;
+ final ConcurrentLinkedQueue<Resource> queue;
+ final Map<Resource, WeakStatus> weakInverses;
+ final Map<Resource, ExtentStatus> status;
+
+ public FastInternalRequest(DirectQuerySupport dqs, Resource resource,
+ Map<Resource, ExtentStatus> status,
+ Map<Resource, WeakStatus> weakInverses,
+ ConcurrentLinkedQueue<Resource> queue) {
+ super(resource);
+ this.dqs = dqs;
+ this.status = status;
+ this.weakInverses = weakInverses;
+ this.queue = queue;
+ }
+
+ @Override
+ public int getFlags() {
+ return 0;
+ }
+
+ @Override
+ public void perform(AsyncReadGraph graph, final AsyncProcedure<Boolean> procedure) {
+
+ dqs.forEachDirectStatement(graph, resource, new AsyncProcedure<DirectStatements>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, DirectStatements ss) {
+ boolean ok = true;
+ for(Statement statement : ss) {
+ if (status.get(statement.getObject()) == ExtentStatus.INTERNAL) continue;
+ WeakStatus status = weakInverses.get(statement.getPredicate());
+ if(status == WeakStatus.WEAK) continue;
+ else if (status == null) {
+ queue.add(statement.getPredicate());
+ }
+ ok = false;
+ }
+ procedure.execute(graph, ok);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ });
+
+ }
+
+ }
+
+ static class ClassifyStatementsRequest implements AsyncRead<Boolean> {
+
+ final Set<Resource> schedule;
+ final Map<Resource, WeakStatus> weakMap;
+
+ public ClassifyStatementsRequest(Set<Resource> schedule, Map<Resource, WeakStatus> weakMap) {
+ this.weakMap = weakMap;
+ this.schedule = schedule;
+ }
+
+ @Override
+ public int threadHash() {
+ return hashCode();
+ }
+
+ @Override
+ public int getFlags() {
+ return 0;
+ }
+
+ @Override
+ public void perform(AsyncReadGraph graph, final AsyncProcedure<Boolean> procedure) {
+
+ for (final Resource p : schedule) {
+
+ graph.forPossibleInverse(p, new AsyncProcedure<Resource>() {
+
+ private void register(AsyncReadGraph graph, Resource predicate, Resource superRelation, WeakStatus status) {
+ synchronized (weakMap) {
+ weakMap.put(predicate, status);
+ if(superRelation != null) weakMap.put(superRelation, status);
+ }
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final Resource inverse) {
+
+ if (inverse == null) {
+
+ register(graph, p, null, WeakStatus.WEAK);
+
+ } else {
+
+ graph.forPossibleSuperrelation(inverse, new AsyncProcedure<Resource>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final Resource superRelation) {
+
+ if(superRelation != null && weakMap.containsKey(superRelation)) {
+ register(graph, p, null, weakMap.get(superRelation));
+ return;
+ }
+
+ graph.forIsSubrelationOf(inverse, graph.getService(Layer0.class).IsRelatedTo, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph,Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph,Boolean strong) {
+ register(graph, p, superRelation, strong ? WeakStatus.STRONG : WeakStatus.WEAK);
+ }
+
+ });
+
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ }
+
+ procedure.execute(graph, false);
+
+ }
+
+ }
+
+ private static Collection<Resource> getParents(ReadGraph g, Resource r, boolean isStrong) throws DatabaseException {
+
+ System.out.println("getParents " + NameUtils.getSafeName(g, r));
+
+ Layer0 l0 = Layer0.getInstance(g);
+
+ Collection<Resource> predicates = g.getPredicates(r);
+
+ // --- Consists Of ----------------------------------------------------
+
+ if (predicates.contains(l0.PartOf)) {
+ Collection<Resource> parents = g.getObjects(r, l0.PartOf);
+ if (parents.size() == 1)
+ return parents;
+ ArrayList<Resource> libraryParents = new ArrayList<Resource>(1);
+ for (Resource p : parents)
+ if (g.isInstanceOf(p, l0.Library))
+ libraryParents.add(p);
+ if (!libraryParents.isEmpty())
+ return libraryParents;
+ else
+ return parents;
+ }
+
+ // --- Ordered sets ---------------------------------------------------
+
+ {
+ Collection<Resource> parents = null;
+ for (Resource p : predicates)
+ if (g.isInstanceOf(p, l0.OrderedSet) && !p.equals(r)) {
+ if (parents == null)
+ parents = new ArrayList<Resource>(1);
+ parents.add(p);
+ }
+ if (parents != null) {
+ if (DEBUG)
+ System.out.println("ORDERED SET");
+ return parents;
+ }
+ }
+
+
+ if (isStrong)
+ return Collections.emptyList();
+ else {
+
+ if (predicates.contains(l0.InverseOf)) {
+
+ Resource inv = g.getInverse(r);
+ return getParents(g, inv, true);
+
+ } else {
+
+ /*
+ * Depends On
+ *
+ * If there are DependsOn parents, then IsRelatedTo parents are discarded
+ *
+ */
+ HashSet<Resource> result = new HashSet<Resource>();
+ for(Resource predicate : predicates) {
+ if(g.isSubrelationOf(predicate, l0.IsDependencyOf)) result.addAll(g.getObjects(r, predicate));
+ }
+ if(!result.isEmpty()) return result;
+
+ /*
+ * Is Related To
+ *
+ * At this point all Is Related To are parents.
+ *
+ */
+ for(Resource predicate : predicates) {
+ Resource inv = g.getPossibleInverse(predicate);
+ if(inv != null) {
+ if(g.isSubrelationOf(inv, l0.IsRelatedTo)) result.addAll(g.getObjects(r, predicate));
+ }
+ }
+
+ return result;
+
+ }
+
+ /*
+ Collection<Resource> invR = g.getObjects(r, b.IsRelatedTo_Inverse);
+ if (predicates.contains(b.InverseOf)) {
+ if (invR.size() > 1) {
+ if (DEBUG)
+ System.out
+ .println("###########################################");
+ Resource inv = g.getInverse(r);
+ Collection<Resource> ret = new ArrayList<Resource>();
+ for (Statement pp : g.getStatements(r,
+ b.IsRelatedTo_Inverse))
+ if (!pp.getPredicate().equals(inv)) {
+ if (DEBUG) {
+ System.out.println("<"
+ + NameUtils.getSafeName(g, pp
+ .getSubject())
+ + ","
+ + NameUtils.getSafeName(g, pp
+ .getPredicate())
+ + ","
+ + NameUtils.getSafeName(g, pp
+ .getObject()) + ">");
+ }
+ ret.add(pp.getObject());
+ }
+ return ret;
+ }
+ // System.out.println("?????????????????");
+ Collection<Resource> invParents = getParents(g,
+ g.getInverse(r), true);
+ if (!invParents.isEmpty())
+ return invParents;
+ }
+ if (DEBUG) {
+ System.out.print("invR");
+ for (Resource res : invR)
+ System.out.print(" " + NameUtils.getSafeName(g, res));
+ System.out.println();
+ }
+ return invR;
+ */
+ }
+
+ }
+
+// public static String getIdentifier(ReadGraph g, Resource r)
+// throws DatabaseException {
+// Layer0 L0 = Layer0.getInstance(g);
+// if (r.equals(g.getRootLibrary()))
+// return "";
+// String name = g.getPossibleRelatedValue(r, L0.HasName);
+// if (name == null)
+// return null;
+// Collection<Resource> parents = getParents(g, r, true);
+// if (parents.size() != 1)
+// return null;
+// for (Resource p : parents) {
+// String parentIdentifier = getIdentifier(g, p);
+// if (parentIdentifier == null)
+// return null;
+// return parentIdentifier + "/" + name;
+// }
+// return null;
+// }
+
+ static int kess = 0;
+
+ static class Expansion extends AsyncReadRequest {
+
+ final private Collection<Resource> roots;
+ final Collection<DirectStatements>[] results;
+ final Collection<Resource>[] listElements;
+
+ public Expansion(Collection<Resource> roots, Collection<DirectStatements>[] results, Collection<Resource>[] listElements) {
+ this.roots = roots;
+ this.results = results;
+ this.listElements = listElements;
+ }
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ QueryControl control = graph.getService(QueryControl.class);
+ final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);
+
+ final DomainStatementProcedure proc = new DomainStatementProcedure(dqs, graph.getService(StatementSupport.class), graph.getService(Layer0.class), results, listElements);
+
+ int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1;
+
+ final Resource[] rootArray = roots.toArray(Resource.NONE);
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+
+ final int start = i * slice;
+ final int end = Math.min(start + slice, rootArray.length);
+
+ control.schedule(graph, i, new ControlProcedure() {
+
+ @Override
+ public void execute(AsyncReadGraph graph) {
+ for (int index = start; index < end; index++) {
+ dqs.forEachDirectStatement(graph, rootArray[index], proc);
+ }
+
+ }
+
+ });
+
+ }
+
+ }
+
+ @Override
+ public int getFlags() {
+ return 0;
+ }
+
+ }
+
+ static class Expansion2 extends AsyncReadRequest {
+
+ final private Collection<Resource> roots;
+ final Collection<DirectStatements>[] results;
+ final boolean ignoreVirtual;
+
+ public Expansion2(Collection<Resource> roots, Collection<DirectStatements>[] results) {
+ this(roots, results, true);
+ }
+
+ public Expansion2(Collection<Resource> roots, Collection<DirectStatements>[] results, boolean ignoreVirtual) {
+ this.roots = roots;
+ this.results = results;
+ this.ignoreVirtual = ignoreVirtual;
+ }
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ QueryControl control = graph.getService(QueryControl.class);
+ final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);
+
+ final DomainStatementProcedure2 proc =
+ new DomainStatementProcedure2(results);
+
+ int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1;
+
+ final Resource[] rootArray = roots.toArray(Resource.NONE);
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+
+ final int start = i * slice;
+ final int end = Math.min(start + slice, rootArray.length);
+
+ control.schedule(graph, i, new ControlProcedure() {
+ @Override
+ public void execute(AsyncReadGraph graph) {
+ if (ignoreVirtual) {
+ for (int index = start; index < end; index++) {
+ dqs.forEachDirectPersistentStatement(graph, rootArray[index], proc);
+ }
+ } else {
+ for (int index = start; index < end; index++) {
+ dqs.forEachDirectStatement(graph, rootArray[index], proc);
+ }
+ }
+ }
+ });
+
+ }
+
+ }
+
+ @Override
+ public int getFlags() {
+ return 0;
+ }
+
+ }
+
+ static class DomainProcessor2 {
+
+ Serializer variantSerializer;
+
+ int id = 0;
+
+ Set<Resource> fringe = null;
+ Set<Resource> exclusions = new HashSet<Resource>();
+ Set<Resource> internalDomain = new HashSet<Resource>();
+ Set<Resource> sharedExternalReferences = null;
+ TIntSet sharedExternalIds = null;
+ Set<Resource> sharedExternalFringe = null;
+ Set<Resource> predicates = null;
+ Set<Resource> isRelatedToPredicates = null;
+ Set<Resource> sharedPredicates = null;
+ TIntIntHashMap ids = null;
+ Map<Resource, Statement> specials = null;
+ Map<Resource, ExtentStatus> status = null;
+ Map<Resource, WeakStatus> weakInverses = null;
+
+// final ArrayList<Double> priorityList = new ArrayList<Double>();
+
+ private long composedObjectCounter = 0;
+ private long fastInternalCounter = 0;
+ private long parentExternalCounter = 0;
+ private long fullInternalCounter = 0;
+ private long fullExternalCounter = 0;
+
+ private long startupTime = 0;
+ private long expandTime = 0;
+ private long fullResolveTime = 0;
+ private long fastResolveTime = 0;
+ private long otherStatementTime = 0;
+ private long parentResolveTime = 0;
+ private long extentSeedTime = 0;
+ private long composedPredicateTime = 0;
+ private long composedObjectTime = 0;
+
+ public void expand(ReadGraph graph, Set<Resource> fringe, Collection<DirectStatements>[] expansion) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ Collection<Collection<DirectStatements>[]> fullExpansion = new ArrayList<Collection<DirectStatements>[]>();
+ QueryControl control = graph.getService(QueryControl.class);
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+ expansion[i] = new ArrayList<DirectStatements>();
+ }
+
+ graph.syncRequest(new Expansion2(fringe, expansion));
+
+ fringe.clear();
+
+ expandTime += (System.nanoTime() - start);
+
+ }
+
+ public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule) throws DatabaseException {
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+
+ final Layer0 L0 = Layer0.getInstance(graph);
+
+ long start = System.nanoTime();
+
+ final ConcurrentLinkedQueue<Resource> composedResult = new ConcurrentLinkedQueue<Resource>();
+ final ConcurrentLinkedQueue<Resource> singleResult = new ConcurrentLinkedQueue<Resource>();
+ final ConcurrentLinkedQueue<Resource> sharedResult = new ConcurrentLinkedQueue<Resource>();
+ final ConcurrentLinkedQueue<Pair<Resource, Resource>> singles = new ConcurrentLinkedQueue<Pair<Resource, Resource>>();
+
+ // Discover singles
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : schedule) {
+
+ graph.forPossibleSuperrelation(predicate, new AsyncProcedure<Resource>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final Resource single) {
+ singles.add(Pair.make(predicate, single));
+ }
+
+ });
+
+ graph.forHasStatement(predicate, L0.SharedRange, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final Boolean shared) {
+ if(shared) sharedResult.add(predicate);
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ // Determine singles
+ final Set<Resource> singleSchedule = cs.createSet();
+ for(Pair<Resource, Resource> pair : singles) {
+
+ Resource single = pair.second;
+ if(single != null && predicates.add(single)) singleSchedule.add(single);
+
+ }
+
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : singleSchedule) {
+
+ graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean strong) {
+ if (strong) singleResult.add(predicate);
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ isRelatedToPredicates.addAll(singleResult);
+ sharedPredicates.addAll(sharedResult);
+
+ final Set<Resource> specialSchedule = cs.createSet();
+
+ // Classify
+ for(Pair<Resource, Resource> pair : singles) {
+
+ Resource single = pair.second;
+ if(single != null) {
+ if(isRelatedToPredicates.contains(single)) {
+ isRelatedToPredicates.add(pair.first);
+ }
+ } else {
+ specialSchedule.add(pair.first);
+ }
+
+ }
+
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : specialSchedule) {
+
+ graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean composed) {
+ if (composed) composedResult.add(predicate);
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ isRelatedToPredicates.addAll(composedResult);
+
+ composedPredicateTime += (System.nanoTime() - start);
+
+ }
+
+ private Set<Resource> strongInverseSet = new HashSet<Resource>();
+
+ public void classifyPredicates(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ final Set<Resource> schedule = cs.createSet();
+ final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll)
+ for(Statement stm : stms) {
+
+ Resource predicate = stm.getPredicate();
+
+ if(predicates.add(predicate)) {
+ Resource inverse = graph.getPossibleInverse(predicate);
+ schedule.add(predicate);
+ if(inverse != null) {
+ newPredicates.put(predicate, inverse);
+ if(predicates.add(inverse)) schedule.add(inverse);
+ }
+
+ }
+
+ }
+
+ classifyPredicates(graph, schedule);
+
+ for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
+ // Inverse is strong => this has strong inverse
+ if(isRelatedToPredicates.contains(entry.getValue())) {
+ strongInverseSet.add(entry.getKey());
+ }
+ // This is strong => inverse has strong inverse
+ if(isRelatedToPredicates.contains(entry.getKey())) {
+ strongInverseSet.add(entry.getValue());
+ }
+ }
+
+ }
+
+ /*
+ * Composed objects are internal. Mark them for expansion.
+ */
+
+ public void processFringe(ReadGraph graph, Collection<DirectStatements>[] expansion,
+ ObjectOutputStream otherStatementsOutput, ObjectOutputStream valueOutput) throws DatabaseException, IOException {
+
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
+ TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class);
+
+ Layer0 L0 = Layer0.getInstance(graph);
+
+ long start = System.nanoTime();
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll) {
+
+ Resource subject = stms.getSubject();
+
+ boolean partOf = false;
+ for(Statement stm : stms) {
+ Resource predicate = stm.getPredicate();
+ if(L0.PartOf.equals(predicate)) {
+ partOf = true;
+ break;
+ }
+ }
+
+ ExtentStatus subjectStatus = status.get(subject);
+ if(LOG && subjectStatus != null) log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
+ if(subjectStatus == ExtentStatus.EXTERNAL) continue;
+ if(partOf && (subjectStatus == null) && graph.getPossibleURI(subject) != null) {
+
+ status.put(subject, ExtentStatus.EXTERNAL);
+ if(LOG) {
+ String uri = graph.getPossibleURI(subject);
+ if(uri == null) log("[EXTERNAL]: No URI for " + subject);
+ else log("[EXTERNAL] " + uri);
+ }
+
+ // Check for SharedRange statements
+ for(Statement stm : stms) {
+ Resource predicate = stm.getPredicate();
+ if(sharedPredicates.contains(predicate)) {
+ sharedExternalFringe.add(stm.getObject());
+ if(LOG) {
+ log("[SHARED EXTERNAL FRINGE]: " + NameUtils.getSafeName(graph, stm.getObject()));
+ }
+ }
+ }
+
+ } else {
+
+ boolean special = specials.containsKey(subject);
+ if(LOG) {
+ if(special) {
+ log("[SPECIAL] " + NameUtils.getSafeName(graph, subject));
+ }
+ }
+
+ status.put(subject, ExtentStatus.INTERNAL);
+ if(LOG) log("[INTERNAL] " + NameUtils.getSafeName(graph, subject));
+
+ int sId = support.getTransientId(subject);
+
+ if(graph.hasValue(subject)) {
+ Datatype dt = graph.getRelatedValue(subject, L0.HasDataType, Bindings.getBindingUnchecked(Datatype.class));
+ Binding b = Bindings.getBinding(dt);
+ Object _value = graph.getValue(subject, b);
+ Variant variant = new Variant(b, _value);
+ byte[] value = variantSerializer.serialize(variant);
+ if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject));
+ valueOutput.writeInt(sId);
+ valueOutput.writeInt(value.length);
+ assert (value.length > 0);
+ valueOutput.write(value);
+ }
+
+ TIntArrayList stream = new TIntArrayList();
+
+ for(Statement stm : stms) {
+
+ if(special) {
+
+// System.err.println("stm=" + stm + " special=" + specials.get(subject));
+
+ }
+
+ Resource predicate = stm.getPredicate();
+ Resource object = stm.getObject();
+
+ ExtentStatus objectStatus = status.get(object);
+
+ // Strong predicate
+ if (isRelatedToPredicates.contains(predicate) && (objectStatus != ExtentStatus.EXCLUDED)) {
+
+ int pId = support.getTransientId(predicate);
+ int oId = support.getTransientId(object);
+
+ if(LOG) {
+ String s = NameUtils.getSafeName(graph, subject);
+ String p = NameUtils.getSafeName(graph, predicate);
+ String o = NameUtils.getSafeName(graph, object);
+ log("related=" + s + " - " + p + " - " + o);
+ }
+
+ stream.add(pId);
+ stream.add(oId);
+
+ if(objectStatus == null)
+ fringe.add(object);
+
+ } else {
+
+ // Weak predicate
+ if(objectStatus == ExtentStatus.INTERNAL) {
+
+ // The inverse is also weak (or there is no inverse)
+ if(!strongInverseSet.contains(predicate)) {
+
+ int pId = support.getTransientId(predicate);
+ int oId = support.getTransientId(object);
+
+ stream.add(pId);
+ stream.add(oId);
+
+ if(LOG) {
+ String s = NameUtils.getSafeName(graph, subject);
+ String p = NameUtils.getSafeName(graph, predicate);
+ String o = NameUtils.getSafeName(graph, object);
+ log("fully weak internal=" + s + " - " + p + " - " + o + " - " + objectStatus);
+ }
+
+ } else {
+
+ if(LOG) {
+ String s = NameUtils.getSafeName(graph, subject);
+ String p = NameUtils.getSafeName(graph, predicate);
+ String o = NameUtils.getSafeName(graph, object);
+ log("strong inverse internals=" + s + " - " + p + " - " + o + " - " + objectStatus);
+ }
+
+ }
+
+ } else {
+
+ if(special) {
+
+// System.err.println("stm=" + stm + " special=" + specials.get(subject));
+
+ Statement spec = specials.get(subject);
+
+ // This statement can be specially treated
+ if(stm.getPredicate().equals(spec.getPredicate()) && stm.getObject().equals(spec.getObject())) {
+
+ int pId = support.getTransientId(predicate);
+ int oId = support.getTransientId(object);
+
+ if(LOG) {
+ String s = NameUtils.getSafeName(graph, subject);
+ String p = NameUtils.getSafeName(graph, predicate);
+ String o = NameUtils.getSafeName(graph, object);
+ log("special=" + s + " - " + p + " - " + o);
+ }
+
+ stream.add(pId);
+ stream.add(oId);
+
+ }
+
+ } else {
+
+ if(LOG) {
+ String s = NameUtils.getSafeName(graph, subject);
+ String p = NameUtils.getSafeName(graph, predicate);
+ String o = NameUtils.getSafeName(graph, object);
+ log("weak with unknown object=" + s + " - " + p + " - " + o + " - " + objectStatus);
+ }
+
+ }
+
+ }
+
+ }
+
+ }
+
+ if(!stream.isEmpty()) {
+ otherStatementsOutput.writeInt(sId);
+ otherStatementsOutput.writeInt(stream.size() / 2);
+ for (int i = 0; i < stream.size(); i++)
+ otherStatementsOutput.writeInt(stream.getQuick(i));
+ }
+
+ }
+
+ }
+
+ composedObjectTime += System.nanoTime() - start;
+
+ }
+
+ public void process(ReadGraph graph,
+ ObjectOutputStream otherStatementsOutput,
+ ObjectOutputStream valueOutput)
+ throws DatabaseException, IOException {
+
+ this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
+
+ QueryControl control = graph.getService(QueryControl.class);
+
+// System.err.println("Begin ConsistsOfProcess");
+
+ /*
+ * Browse all stm = (s, ConsistsOf, o)
+ * � All internal URIs are found => from now on, if unidentified resource has PartOf it is external.
+ * � All s are internal
+ * � All o are internal
+ * � All stm are included
+ */
+ for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, true)) {
+ if (status.put(r, ExtentStatus.INTERNAL) == null) {
+ String URI = graph.getPossibleURI(r);
+ if(URI != null) log("URI INTERNAL " + URI);
+ else log("URI has no URI for " + r);
+ fringe.add(r);
+ internalDomain.add(r);
+ }
+ }
+
+ /*
+ * This loop resolves the transitive closure of all p < IsRelatedTo such that p does not contain the SharedRange tag.
+ * Such resources are guaranteed to be internal.
+ */
+ while(!fringe.isEmpty()) {
+
+// System.err.println("Process Fringe with " + fringe.size() + ".");
+
+ Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];
+
+// System.err.println("-expand");
+
+ // Expand fringe
+ expand(graph, fringe, expansion);
+
+ /*
+ * classify all p
+ * -IsRelatedTo
+ * -SharedRange
+ * -Internal / External
+ */
+
+// System.err.println("-classify");
+
+ classifyPredicates(graph, expansion);
+
+ /*
+ * for stms in [stms]
+ * if stms contains predicate PartOf => s is External
+ * else s is Internal
+ * for all stm=(s,p,o) in stms
+ * if p <R IsRelatedTo => stm is included
+ * Fringe <- o
+ */
+
+// System.err.println("-process");
+
+ processFringe(graph, expansion, otherStatementsOutput, valueOutput);
+
+ }
+
+ while(!sharedExternalFringe.isEmpty()) {
+
+ Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];
+ expand(graph, sharedExternalFringe, expansion);
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll) {
+
+ Resource subject = stms.getSubject();
+ ExtentStatus subjectStatus = status.get(subject);
+
+ if(ExtentStatus.INTERNAL == subjectStatus) {
+
+ if(internalDomain.contains(subject)) continue;
+
+ status.put(subject, ExtentStatus.EXTERNAL);
+ sharedExternalReferences.add(subject);
+
+ if(LOG) {
+ log("[SHARED EXTERNAL REFERENCE]: " + NameUtils.getSafeName(graph, subject));
+ }
+
+ for(Statement stm : stms) {
+ Resource predicate = stm.getPredicate();
+ if (isRelatedToPredicates.contains(predicate)) {
+ sharedExternalFringe.add(stm.getObject());
+ }
+ }
+
+ }
+ }
+
+ }
+
+ if (PROFILE) {
+ System.out.println(composedObjectCounter + " " + fastInternalCounter
+ + " " + parentExternalCounter + " "
+ + fullExternalCounter + " " + fullInternalCounter);
+ }
+
+ }
+
+ }
+
+ static class DomainProcessor {
+
+ Serializer variantSerializer;
+
+ int id = 0;
+
+ Set<Resource> predicates = null;
+ Set<Resource> composedPredicates = null;
+ Set<Resource> expansionSeeds = null;
+ Map<Resource, Integer> ids = null;
+ Map<Resource, ExtentStatus> status = null;
+ Map<Resource, WeakStatus> weakInverses = null;
+
+ final Set<SubgraphAdvisor> advisors;
+ final ArrayList<Double> priorityList = new ArrayList<Double>();
+
+ private long composedObjectCounter = 0;
+ private long fastInternalCounter = 0;
+ private long parentExternalCounter = 0;
+ private long fullInternalCounter = 0;
+ private long fullExternalCounter = 0;
+
+ private long startupTime = 0;
+ private long expandTime = 0;
+ private long fullResolveTime = 0;
+ private long fastResolveTime = 0;
+ private long otherStatementTime = 0;
+ private long parentResolveTime = 0;
+ private long extentSeedTime = 0;
+ private long composedPredicateTime = 0;
+ private long composedObjectTime = 0;
+
+ public DomainProcessor(Set<SubgraphAdvisor> advisors) {
+ this.advisors = advisors;
+ HashSet<Double> prioritySet = new HashSet<Double>();
+ for (SubgraphAdvisor advisor : advisors)
+ prioritySet.add(advisor.priority());
+ priorityList.addAll(prioritySet);
+ Collections.sort(priorityList);
+ }
+
+ public void expand(ReadGraph graph, Collection<DirectStatements>[] expansion, Set<Resource> schedule) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+// if (DEBUG)
+// System.out.println("expanding " + expansionSeeds.size() + " resources.");
+
+ QueryControl control = graph.getService(QueryControl.class);
+// final Collection<DirectStatements>[] results = new ArrayList[control.getAmountOfQueryThreads()];
+ final ArrayList<Resource>[] listElements = new ArrayList[control.getAmountOfQueryThreads()];
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+// results[i] = new ArrayList<DirectStatements>();
+ listElements[i] = new ArrayList<Resource>();
+ }
+
+// if(DEBUG) {
+// for(Resource r : expansionSeeds)
+// System.out.println("Expanding " + NameUtils.getSafeName(graph, r, true));
+// }
+
+ graph.syncRequest(new Expansion(expansionSeeds, expansion, listElements));
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+// for (DirectStatements s : results[i]) {
+// expansion.put(s.getSubject(), s);
+// }
+ for (Resource s : listElements[i]) {
+ schedule.add(s);
+// if(status.put(s, ExtentStatus.INTERNAL) == null) {
+// ids.put(s, id++);
+// }
+ }
+ }
+
+ expandTime += (System.nanoTime() - start);
+
+ }
+
+ public void extractComposedPredicates(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+
+ final ConcurrentLinkedQueue<Resource> composedResult = new ConcurrentLinkedQueue<Resource>();
+ final ConcurrentLinkedQueue<Resource> singleResult = new ConcurrentLinkedQueue<Resource>();
+ final ConcurrentLinkedQueue<Pair<Resource, Resource>> singles = new ConcurrentLinkedQueue<Pair<Resource, Resource>>();
+
+ final Set<Resource> schedule = cs.createSet();
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll)
+ for(Statement stm : stms) {
+ Resource predicate = stm.getPredicate();
+ if(predicates.add(predicate)) schedule.add(predicate);
+ }
+
+ // Discover singles
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : schedule) {
+
+ graph.forPossibleSuperrelation(predicate, new AsyncProcedure<Resource>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final Resource single) {
+ singles.add(Pair.make(predicate, single));
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ // Determine singles
+ final Set<Resource> singleSchedule = cs.createSet();
+ for(Pair<Resource, Resource> pair : singles) {
+
+ Resource single = pair.second;
+ if(single != null && predicates.add(single)) singleSchedule.add(single);
+
+ }
+
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : singleSchedule) {
+
+ graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean composed) {
+ if (composed) singleResult.add(predicate);
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ composedPredicates.addAll(singleResult);
+
+ final Set<Resource> specialSchedule = cs.createSet();
+
+ // Classify
+ for(Pair<Resource, Resource> pair : singles) {
+
+ Resource single = pair.second;
+ if(single != null) {
+ if(composedPredicates.contains(single)) {
+ composedPredicates.add(pair.first);
+ }
+ } else {
+ specialSchedule.add(pair.first);
+ }
+
+ }
+
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource predicate : specialSchedule) {
+
+ graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean composed) {
+ if (composed) composedResult.add(predicate);
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ composedPredicates.addAll(composedResult);
+
+ composedPredicateTime += (System.nanoTime() - start);
+
+ }
+
+ /*
+ * Composed objects are internal. Mark them for expansion.
+ */
+
+ public void collectComposedObjects(ReadGraph graph, Collection<DirectStatements>[] expansion, Set<Resource> typeTodo, Set<Resource> objectTodo,
+ Set<Resource> predicateTodo) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ Layer0 l0 = Layer0.getInstance(graph);
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll)
+ for(Statement stm : stms) {
+
+ Resource predicate = stm.getPredicate();
+ Resource object = stm.getObject();
+
+ if (composedPredicates.contains(predicate)) {
+
+ ExtentStatus existing = status.put(object, ExtentStatus.INTERNAL);
+ if(existing == null) {
+ ids.put(object, id++);
+ composedObjectCounter++;
+ expansionSeeds.add(object);
+// System.err.println("internal: " + NameUtils.getSafeName(graph, object, true));
+ if(LOG) log("[INTERNAL] (composed object) " + NameUtils.getSafeName(graph, object, true));
+ } else if (existing == ExtentStatus.EXCLUDED) {
+ System.err.println("preExcluded: " + NameUtils.getSafeName(graph, object, true));
+ status.put(object, ExtentStatus.EXCLUDED);
+ } else if (existing == ExtentStatus.EXTERNAL) {
+ System.err.println("preExternal: " + NameUtils.getSafeName(graph, object, true));
+ status.put(object, ExtentStatus.EXTERNAL);
+ }
+
+ } else {
+
+// System.err.println("internal2: " + NameUtils.getSafeName(graph, object, true));
+
+ if (!status.containsKey(object)) {
+ if (l0.InstanceOf.equalsResource(predicate)) {
+ typeTodo.add(object);
+ } else {
+ objectTodo.add(object);
+ }
+ }
+
+ if (!status.containsKey(predicate)) {
+ predicateTodo.add(predicate);
+ }
+
+ }
+ }
+
+ composedObjectTime += System.nanoTime() - start;
+
+ }
+
+ public void writeOtherStatements(ReadGraph graph, Collection<Collection<DirectStatements>[]> expansion, ObjectOutputStream composedStatementsOutput, ObjectOutputStream otherStatementsOutput,
+ ObjectOutputStream valueOutput) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ Layer0 l0 = Layer0.getInstance(graph);
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
+ TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class);
+
+ TIntArrayList other = new TIntArrayList();
+ TIntArrayList composed = new TIntArrayList();
+
+ try {
+
+ for (Collection<DirectStatements>[] colls : expansion)
+ for (Collection<DirectStatements> coll : colls)
+ for (DirectStatements stms : coll) {
+
+ Resource subject = stms.getSubject();
+ composed.resetQuick();
+
+ int sId = support.getTransientId(subject);
+
+ composedStatementsOutput.writeInt(sId);
+
+ if(graph.hasValue(subject)) {
+ Datatype dt = graph.getRelatedValue(subject, l0.HasDataType, Bindings.getBindingUnchecked(Datatype.class));
+ Binding b = Bindings.getBinding(dt);
+ Object _value = graph.getValue(subject, b);
+ Variant variant = new Variant(b, _value);
+ byte[] value = variantSerializer.serialize(variant);
+ if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject));
+ valueOutput.writeInt(sId);
+ valueOutput.writeInt(value.length);
+ assert (value.length > 0);
+ valueOutput.write(value);
+ }
+
+ for (Statement s : stms) {
+
+ Resource object = s.getObject();
+ Resource predicate = s.getPredicate();
+
+ ExtentStatus objectStatus = status.get(object);
+
+ if(objectStatus == ExtentStatus.INTERNAL) {
+ composed.add(support.getTransientId(predicate));
+ composed.add(support.getTransientId(object));
+ if(LOG) log("[COMPOSED] (internal object) " + NameUtils.toIdString(graph, s));
+ } else if (l0.InstanceOf.equalsResource(predicate)) {
+ composed.add(support.getTransientId(predicate));
+ composed.add(support.getTransientId(object));
+ if(LOG) log("[COMPOSED] (instanceOf) " + NameUtils.toIdString(graph, s));
+ } else if (l0.SubrelationOf.equalsResource(predicate)) {
+ composed.add(support.getTransientId(predicate));
+ composed.add(support.getTransientId(object));
+ if(LOG) log("[COMPOSED] (subrelationOf) " + NameUtils.toIdString(graph, s));
+ } else {
+ if(objectStatus == ExtentStatus.EXTERNAL) {
+ if(DEBUG)
+ System.out.println("other " + NameUtils.toIdString(graph, s));
+ //System.out.println("other.add " + predicate + " - " + object);
+ other.add(support.getTransientId(predicate));
+ other.add(support.getTransientId(object));
+ if(LOG) log("[OTHER] (object is external) " + NameUtils.toIdString(graph, s));
+ }
+ }
+
+ }
+
+ if(!other.isEmpty()) {
+ otherStatementsOutput.writeInt(sId);
+ otherStatementsOutput.writeInt(other.size() / 2);
+ for (int i = 0; i < other.size(); i++)
+ otherStatementsOutput.writeInt(other.getQuick(i));
+ other.resetQuick();
+ }
+
+ composedStatementsOutput.writeInt(composed.size() / 2);
+ for (int i = 0; i < composed.size(); i++)
+ composedStatementsOutput.writeInt(composed.getQuick(i));
+
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ otherStatementTime += (System.nanoTime() - start);
+
+ }
+
+ boolean hasStrictParents(ReadGraph g, Resource r)
+ throws DatabaseException {
+ if (g.getPossibleURI(r) != null)
+ return true;
+ return false;
+ }
+
+ public boolean getExpansionSeedsFromExtents(ReadGraph graph, final Collection<DirectStatements>[] expansion) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ final ConcurrentLinkedQueue<Resource> accepts = new ConcurrentLinkedQueue<Resource>();
+
+ /*
+ * Determine statements which could accept statements with todo
+ * objects
+ */
+ search: for (Double priority : priorityList) {
+
+ for (final SubgraphAdvisor advisor : advisors) {
+
+ if (advisor.priority() > 0)
+ continue;
+
+ if (advisor.priority() == priority) {
+
+ graph.syncRequest(new ReadRequest() {
+
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+
+ for (Collection<DirectStatements> coll : expansion)
+ for (DirectStatements stms : coll)
+ for(final Statement stm : stms) {
+
+ advisor.advise(graph, stm, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean accept) {
+ if (accept) {
+ accepts.add(stm.getObject());
+ }
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+ }
+ if (!accepts.isEmpty())
+ break search;
+ }
+ }
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ Set<Resource> schedule = cs.createSet();
+ for (Resource r : accepts) {
+ if(!status.containsKey(r))
+ schedule.add(r);
+ }
+
+ extentSeedTime += (System.nanoTime() - start);
+
+ if (schedule.isEmpty())
+ return false;
+
+ fastResolve(graph, schedule);
+ uriResolve(graph, schedule);
+ fullResolve(graph, schedule, "accepts");
+
+ return true;
+
+ }
+
+ ConcurrentLinkedQueue<Resource> fastInternals = new ConcurrentLinkedQueue<Resource>();
+
+ public void fastResolve(ReadGraph graph, final Set<Resource> rs)
+ throws DatabaseException {
+ // This collects and resolves weaks
+ if(fastResolveLoop(graph, rs))
+ // Weaks are now resolved
+ fastResolveLoop(graph, rs);
+ }
+
+ public boolean fastResolveLoop(ReadGraph graph, final Set<Resource> rs)
+ throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ final ConcurrentLinkedQueue<Resource> weakSchedule = new ConcurrentLinkedQueue<Resource>();
+
+ graph.syncRequest(new AsyncRead<Boolean>() {
+
+ @Override
+ public int threadHash() {
+ return hashCode();
+ }
+
+ @Override
+ public int getFlags() {
+ return 0;
+ }
+
+ @Override
+ public void perform(AsyncReadGraph graph,
+ AsyncProcedure<Boolean> procedure) {
+
+ QueryControl control = graph.getService(QueryControl.class);
+ final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);
+
+ int slice = (int) (rs.size() / control
+ .getAmountOfQueryThreads()) + 1;
+
+ final Resource[] rootArray = rs.toArray(Resource.NONE);
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+
+ final int start = i * slice;
+ final int end = Math.min(start + slice,
+ rootArray.length);
+
+ control.schedule(graph, i, new ControlProcedure() {
+
+ @Override
+ public void execute(AsyncReadGraph graph) {
+
+ for (int index = start; index < end; index++) {
+
+ final Resource r = rootArray[index];
+
+ //if (status.containsKey(r)) continue;
+
+ graph.asyncRequest(new FastInternalRequest(dqs, r, status, weakInverses, weakSchedule),new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph,Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph,Boolean isInternal) {
+ if (isInternal) {
+ fastInternals.add(r);
+ }
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ }
+
+ procedure.execute(graph, true);
+
+ }
+
+ });
+
+ if (!weakSchedule.isEmpty()) {
+ THashSet<Resource> weaks = new THashSet<Resource>(weakSchedule);
+ if (CLASSIFY_LOG)
+ for (Resource p : weakSchedule)
+ System.out.println("classify "
+ + NameUtils.getSafeName(graph, p));
+ graph.syncRequest(new ClassifyStatementsRequest(weaks, weakInverses));
+ }
+
+ for (Resource r : fastInternals) {
+ rs.remove(r);
+ if (status.put(r, ExtentStatus.INTERNAL) == null) {
+ if(LOG) log("[INTERNAL] (fast) " + NameUtils.getSafeName(graph, r, true));
+ ids.put(r, id++);
+ fastInternalCounter++;
+ expansionSeeds.add(r);
+ }
+ }
+
+ fastResolveTime += (System.nanoTime() - start);
+
+ return !weakSchedule.isEmpty();
+
+ }
+
+ private ExtentStatus resolveExtent(ReadGraph graph, Resource r, Map<Resource, ExtentStatus> statuses, Set<Resource> expansionSeeds, THashSet<Resource> pending,
+ ArrayList<Resource> stack) throws DatabaseException {
+
+ ExtentStatus current = statuses.get(r);
+ if(current != null) return current;
+
+ if (pending.contains(r))
+ return ExtentStatus.PENDING;
+
+ // In order to break cyclic dependencies
+ pending.add(r);
+
+ if (PARENT_DEBUG)
+ System.out.println("resolveExtent "
+ + NameUtils.getSafeName(graph, r));
+
+ ExtentStatus status = ExtentStatus.INTERNAL;
+ for (Resource p : getParents(graph, r)) {
+ if (PARENT_DEBUG) {
+ ExtentStatus ps = statuses.get(p);
+ System.out.println(" parent " + NameUtils.getSafeName(graph, p) + "(" + ps + ")");
+ }
+ switch (resolveExtent(graph, p, statuses,
+ expansionSeeds, pending, stack)) {
+ case EXTERNAL:
+ return ExtentStatus.EXTERNAL;
+ case PENDING:
+ status = ExtentStatus.PENDING;
+ }
+ }
+ if (status == ExtentStatus.INTERNAL) {
+ pending.remove(r);
+ stack.add(r);
+ if (DEBUG)
+ System.out.println(NameUtils.getSafeName(graph, r, true)
+ + " is internal.");
+ }
+ return status;
+ }
+
+ public void uriResolve(ReadGraph graph, final Set<Resource> todo)
+ throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ for(Resource r : todo) System.out.println("uriResolve " +
+ NameUtils.getSafeName(graph, r));
+
+ final ConcurrentSkipListSet<Resource> found = new ConcurrentSkipListSet<Resource>();
+
+ graph.syncRequest(new AsyncReadRequest() {
+
+ @Override
+ public void run(AsyncReadGraph graph) {
+
+ for (final Resource r : todo) {
+
+ // System.out.println("uriresolve before " + r);
+
+ if (status.containsKey(r)) continue;
+
+ // System.out.println("uriresolve " + r);
+
+ graph.forURI(r, new AsyncProcedure<String>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, String uri) {
+
+ if (uri != null) {
+
+ // System.out.println("uriresolve has uri "
+ // + r);
+
+ if(found.add(r)) {
+ parentExternalCounter++;
+ }
+
+ } else {
+
+ // System.out.println("uriresolve ask inverse "
+ // + r);
+
+ graph.forPossibleInverse(r, new AsyncProcedure<Resource>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Resource inverse) {
+
+ if (inverse != null) {
+
+ graph.forURI(inverse, new AsyncProcedure<String>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, String uri) {
+
+ if (uri != null) {
+
+ if(found.add(r)) {
+ parentExternalCounter++;
+ }
+
+ }
+
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ }
+ }
+
+ });
+
+ }
+
+ }
+
+ });
+
+ todo.removeAll(found);
+ for(Resource r : found) {
+ status.put(r, ExtentStatus.EXTERNAL);
+ if(LOG) log("[EXTERNAL] (uriResolve) " + NameUtils.getSafeName(graph, r, true));
+ }
+
+ parentResolveTime += System.nanoTime() - start;
+
+ }
+
+ public void fullResolve(ReadGraph graph, Collection<Resource> rs,
+ String koss) throws DatabaseException {
+
+ long start = System.nanoTime();
+
+ for (final Resource r : rs) {
+
+ if(status.containsKey(r)) continue;
+
+ THashSet<Resource> pending = new THashSet<Resource>();
+ ArrayList<Resource> stack = new ArrayList<Resource>();
+
+ ExtentStatus s = resolveExtent(graph, r, status, expansionSeeds, pending, stack);
+ if (ExtentStatus.INTERNAL == s || ExtentStatus.PENDING == s) {
+ if (status.put(r, ExtentStatus.INTERNAL) == null) {
+ if(LOG) log("[INTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true));
+ ids.put(r, id++);
+ fullInternalCounter++;
+ expansionSeeds.add(r);
+ }
+ }
+ if (ExtentStatus.EXTERNAL == s) {
+ if (status.put(r, ExtentStatus.EXTERNAL) == null) {
+ if(LOG) log("[EXTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true));
+ fullExternalCounter++;
+ }
+ }
+
+ }
+
+ fullResolveTime += (System.nanoTime() - start);
+
+ }
+
+ public void process(ReadGraph graph,
+ ObjectOutputStream composedStatementsOutput,
+ ObjectOutputStream otherStatementsOutput,
+ ObjectOutputStream valueOutput)
+ throws DatabaseException {
+
+ this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+
+ Set<Resource> typeTodo = cs.createSet();
+ Set<Resource> objectTodo = cs.createSet();
+ Set<Resource> predicateTodo = cs.createSet();
+
+ Collection<Collection<DirectStatements>[]> fullExpansion = new ArrayList<Collection<DirectStatements>[]>();
+
+ do {
+
+ QueryControl control = graph.getService(QueryControl.class);
+ Collection<DirectStatements>[] expansion = new ArrayList[control.getAmountOfQueryThreads()];
+ for (int i = 0; i < control.getAmountOfQueryThreads(); i++) {
+ expansion[i] = new ArrayList<DirectStatements>();
+ }
+
+ // Expand expansionSeeds
+ expand(graph, expansion, objectTodo);
+
+ // Start collecting new seeds
+ expansionSeeds = cs.createSet();
+
+ // Collect predicates which are <R IsComposedOf
+ extractComposedPredicates(graph, expansion);
+
+ // Make composed objects internal and make sure they are further
+ // expanded
+ collectComposedObjects(graph, expansion, typeTodo, objectTodo, predicateTodo);
+
+ /*
+ * Use the expansion seed heuristic to find new resources to
+ * expand before full analysis.
+ */
+ getExpansionSeedsFromExtents(graph, expansion);
+
+ fullExpansion.add(expansion);
+
+ } while (!expansionSeeds.isEmpty());
+
+ fastResolve(graph, objectTodo);
+ uriResolve(graph, predicateTodo);
+ uriResolve(graph, objectTodo);
+ uriResolve(graph, typeTodo);
+ fullResolve(graph, objectTodo, "objectTodo");
+ fullResolve(graph, predicateTodo, "predicateTodo");
+ fullResolve(graph, typeTodo, "typeTodo");
+
+ writeOtherStatements(graph, fullExpansion, composedStatementsOutput, otherStatementsOutput, valueOutput);
+
+ if (PROFILE) {
+ System.out.println(composedObjectCounter + " " + fastInternalCounter
+ + " " + parentExternalCounter + " "
+ + fullExternalCounter + " " + fullInternalCounter);
+ }
+
+ }
+
+ }
+
+ public static void getDomain2(ReadGraph graph, TIntIntHashMap ids,
+ Collection<Resource> roots, Map<Resource, ExtentStatus> preStatus,
+ Map<Resource, Statement> specials,
+ ObjectOutputStream otherStatementsOutput,
+ ObjectOutputStream valueOutput,
+ TreeMap<String, Variant> extensions,
+ TIntHashSet excludedShared) throws DatabaseException {
+
+ ITask task = ThreadLogger.getInstance().begin("getDomain2");
+
+ final DomainProcessor2 processor = new DomainProcessor2();
+
+ processor.startupTime = System.nanoTime();
+
+ Layer0 l0 = Layer0.getInstance(graph);
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
+
+ processor.ids = ids;
+ processor.specials = specials;
+ processor.status = cs.createMap(ExtentStatus.class);
+ processor.weakInverses = cs.createMap(WeakStatus.class);
+ processor.predicates = cs.createSet();
+ processor.isRelatedToPredicates = cs.createSet();
+ processor.sharedPredicates = cs.createSet();
+// processor.expansionSeeds = cs.createSet();
+
+ for(Map.Entry<Resource, ExtentStatus> entry : preStatus.entrySet()) {
+ processor.status.put(entry.getKey(), entry.getValue());
+ if(ExtentStatus.EXCLUDED.equals(entry.getValue())) processor.exclusions.add(entry.getKey());
+ }
+
+// for (Resource r : excluded) {
+// processor.status.put(r, ExtentStatus.EXCLUDED);
+// }
+
+ Resource rootLibrary = graph.getResource("http:/");
+
+ if (!roots.contains(rootLibrary))
+ processor.status.put(rootLibrary, ExtentStatus.EXTERNAL);
+
+ for (Resource root : roots) {
+ processor.status.put(root, ExtentStatus.INTERNAL);
+ //processor.ids.put(support.getTransientId(root), processor.ids.size());
+ for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) {
+ processor.status.put(owner, ExtentStatus.EXTERNAL);
+ }
+ }
+
+ processor.startupTime = System.nanoTime() - processor.startupTime;
+
+ processor.fringe = new HashSet<Resource>();
+ processor.fringe.addAll(roots);
+
+ processor.internalDomain.addAll(roots);
+
+ processor.sharedExternalReferences = new HashSet<Resource>();
+ processor.sharedExternalFringe = new HashSet<Resource>();
+
+ try {
+
+ processor.process(graph, otherStatementsOutput, valueOutput);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ for(Resource r : processor.sharedExternalReferences) excludedShared.add(support.getTransientId(r));
+
+ ClusteringSupport cls = graph.getService(ClusteringSupport.class);
+ TLongObjectHashMap<TIntArrayList> clusterMap = new TLongObjectHashMap<TIntArrayList>();
+ for(Map.Entry<Resource, ExtentStatus> entry : processor.status.entrySet()) {
+ if(ExtentStatus.INTERNAL == entry.getValue()) {
+
+ long cluster = cls.getCluster(entry.getKey());
+ TIntArrayList list = clusterMap.get(cluster);
+ if(list == null) {
+ list = new TIntArrayList();
+ clusterMap.put(cluster, list);
+ }
+ list.add(support.getTransientId(entry.getKey()));
+
+ }
+ }
+ final TIntArrayList clustering = new TIntArrayList();
+ clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {
+
+ @Override
+ public boolean execute(long cluster, TIntArrayList b) {
+ clustering.add(b.size());
+ b.forEach(new TIntProcedure() {
+
+ @Override
+ public boolean execute(int rId) {
+ processor.ids.put(rId, processor.id++);
+ return true;
+ }
+
+ });
+ return true;
+ }
+
+ });
+
+ extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray()));
+
+ long total = processor.startupTime + processor.expandTime
+ + processor.composedPredicateTime
+ + processor.composedObjectTime + processor.extentSeedTime
+ + processor.fullResolveTime + processor.fastResolveTime +
+ + processor.parentResolveTime + processor.otherStatementTime;
+
+ if (PROFILE) {
+ System.out.println("startup took " + 1e-9 * processor.startupTime
+ + "s.");
+ System.out.println("expand took " + 1e-9 * processor.expandTime
+ + "s.");
+ System.out.println("composedPredicates took " + 1e-9
+ * processor.composedPredicateTime + "s.");
+ System.out.println("composedObjects took " + 1e-9
+ * processor.composedObjectTime + "s.");
+ System.out.println("extentSeeding took " + 1e-9
+ * processor.extentSeedTime + "s.");
+ System.out.println("fullResolve took " + 1e-9
+ * processor.fullResolveTime + "s.");
+ System.out.println("fastResolve took " + 1e-9
+ * processor.fastResolveTime + "s.");
+ System.out.println("parentResolve took " + 1e-9
+ * processor.parentResolveTime + "s.");
+ System.out.println("otherStatements took " + 1e-9
+ * processor.otherStatementTime + "s.");
+ System.out.println("total " + 1e-9 * total + "s.");
+ }
+
+ task.finish();
+
+
+ }
+
+
+ public static void getDomain(ReadGraph graph, Map<Resource, Integer> ids,
+ Collection<Resource> roots, Map<Resource, ExtentStatus> preStatus, Set<SubgraphAdvisor> advisors,
+ ObjectOutputStream composedStatementsOutput,
+ ObjectOutputStream otherStatementsOutput,
+ ObjectOutputStream valueOutput) throws DatabaseException {
+
+ ITask task = ThreadLogger.getInstance().begin("getDomain");
+
+ DomainProcessor processor = new DomainProcessor(advisors);
+
+ processor.startupTime = System.nanoTime();
+
+ Layer0 l0 = Layer0.getInstance(graph);
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+
+ processor.ids = ids;
+ processor.status = cs.createMap(ExtentStatus.class);
+ processor.weakInverses = cs.createMap(WeakStatus.class);
+ processor.predicates = cs.createSet();
+ processor.composedPredicates = cs.createSet();
+ processor.expansionSeeds = cs.createSet();
+
+ for(Map.Entry<Resource, ExtentStatus> entry : preStatus.entrySet()) {
+ processor.status.put(entry.getKey(), entry.getValue());
+ }
+
+// for (Resource r : excluded) {
+// processor.status.put(r, ExtentStatus.EXCLUDED);
+// }
+
+ if (!roots.contains(graph.getRootLibrary()))
+ processor.status.put(graph.getRootLibrary(), ExtentStatus.EXTERNAL);
+
+ for (Resource root : roots) {
+ processor.status.put(root, ExtentStatus.INTERNAL);
+ processor.ids.put(root, processor.id++);
+ for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) {
+ processor.status.put(owner, ExtentStatus.EXTERNAL);
+ }
+ }
+
+
+ processor.expansionSeeds.addAll(roots);
+
+ processor.startupTime = System.nanoTime() - processor.startupTime;
+
+ while (!processor.expansionSeeds.isEmpty()) {
+
+ processor.process(graph, composedStatementsOutput,
+ otherStatementsOutput, valueOutput);
+
+ }
+
+ long total = processor.startupTime + processor.expandTime
+ + processor.composedPredicateTime
+ + processor.composedObjectTime + processor.extentSeedTime
+ + processor.fullResolveTime + processor.fastResolveTime +
+ + processor.parentResolveTime + processor.otherStatementTime;
+
+ if (PROFILE) {
+ System.out.println("startup took " + 1e-9 * processor.startupTime
+ + "s.");
+ System.out.println("expand took " + 1e-9 * processor.expandTime
+ + "s.");
+ System.out.println("composedPredicates took " + 1e-9
+ * processor.composedPredicateTime + "s.");
+ System.out.println("composedObjects took " + 1e-9
+ * processor.composedObjectTime + "s.");
+ System.out.println("extentSeeding took " + 1e-9
+ * processor.extentSeedTime + "s.");
+ System.out.println("fullResolve took " + 1e-9
+ * processor.fullResolveTime + "s.");
+ System.out.println("fastResolve took " + 1e-9
+ * processor.fastResolveTime + "s.");
+ System.out.println("parentResolve took " + 1e-9
+ * processor.parentResolveTime + "s.");
+ System.out.println("otherStatements took " + 1e-9
+ * processor.otherStatementTime + "s.");
+ System.out.println("total " + 1e-9 * total + "s.");
+ }
+
+ task.finish();
+
+ }
+
+}