X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.layer0%2Fsrc%2Forg%2Fsimantics%2Fdb%2Flayer0%2Futil%2FSubgraphs.java;h=518efc882e45964dafb612ced7068636972a6f6b;hp=2280fbe41168b3cfccd41d01989c9e1b07908fef;hb=e19c37f84fd1ce2d946578f7c05f3e45444ba67a;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/Subgraphs.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/Subgraphs.java index 2280fbe41..518efc882 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/Subgraphs.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/Subgraphs.java @@ -1,2119 +1,2130 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.layer0.util; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TLongObjectHashMap; -import gnu.trove.procedure.TIntProcedure; -import gnu.trove.procedure.TLongObjectProcedure; -import gnu.trove.set.TIntSet; -import gnu.trove.set.hash.THashSet; -import gnu.trove.set.hash.TIntHashSet; - -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentSkipListSet; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Databoard; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.mutable.Variant; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.type.Datatype; -import org.simantics.db.AsyncReadGraph; -import org.simantics.db.DirectStatements; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.Statement; -import org.simantics.db.common.request.AsyncReadRequest; -import org.simantics.db.common.request.ReadRequest; -import org.simantics.db.common.request.ResourceAsyncRead; -import org.simantics.db.common.utils.NameUtils; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.layer0.adapter.SubgraphAdvisor; -import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; -import org.simantics.db.procedure.AsyncProcedure; -import org.simantics.db.request.AsyncRead; -import org.simantics.db.service.ClusteringSupport; -import org.simantics.db.service.CollectionSupport; -import org.simantics.db.service.DirectQuerySupport; -import org.simantics.db.service.QueryControl; -import org.simantics.db.service.QueryControl.ControlProcedure; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.db.service.StatementSupport; -import org.simantics.db.service.TransferableGraphSupport; -import org.simantics.graph.representation.Extensions; -import org.simantics.layer0.Layer0; -import org.simantics.utils.datastructures.Pair; -import org.simantics.utils.threads.logger.ITask; -import org.simantics.utils.threads.logger.ThreadLogger; - -public class Subgraphs { - - public static String LOG_FILE = "export.log"; - final static private boolean LOG = false; - final static private boolean DEBUG = false; - final static private boolean PARENT_DEBUG = DEBUG | false; - final static private boolean EXTERNAL_DEBUG = DEBUG | false; - final static private boolean ADVISOR_LOG = LOG & false; - final static private boolean EXPANSION_LOG = LOG & false; - final static private boolean INTERNAL_LOG = LOG & false; - final static private boolean COMPOSED_LOG = LOG & false; - final static private boolean RESOLVE_LOG = LOG & false; - final static private boolean CLASSIFY_LOG = LOG & false; - final static private boolean EXTERNAL_LOG = LOG & false; - final static private boolean PROFILE = false; - - static enum WeakStatus { - STRONG, WEAK - } - - - static DataOutput log; - - static { - - if (LOG) { - try { - FileOutputStream stream = new FileOutputStream(LOG_FILE, false); - log = new DataOutputStream(stream); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - } - - } - - private static void log(String line) { - if (LOG) { - try { - log.write((line + "\n").getBytes()); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public static Collection getParents(ReadGraph g, Resource r) - throws DatabaseException { - return getParents(g, r, false); - } - - static class FastInternalRequest extends ResourceAsyncRead { - - final DirectQuerySupport dqs; - final ConcurrentLinkedQueue queue; - final Map weakInverses; - final Map status; - - public FastInternalRequest(DirectQuerySupport dqs, Resource resource, - Map status, - Map weakInverses, - ConcurrentLinkedQueue queue) { - super(resource); - this.dqs = dqs; - this.status = status; - this.weakInverses = weakInverses; - this.queue = queue; - } - - @Override - public int getFlags() { - return 0; - } - - @Override - public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { - - dqs.forEachDirectStatement(graph, resource, new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph graph, DirectStatements ss) { - boolean ok = true; - for(Statement statement : ss) { - if (status.get(statement.getObject()) == ExtentStatus.INTERNAL) continue; - WeakStatus status = weakInverses.get(statement.getPredicate()); - if(status == WeakStatus.WEAK) continue; - else if (status == null) { - queue.add(statement.getPredicate()); - } - ok = false; - } - procedure.execute(graph, ok); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - }); - - } - - } - - static class ClassifyStatementsRequest implements AsyncRead { - - final Set schedule; - final Map weakMap; - - public ClassifyStatementsRequest(Set schedule, Map weakMap) { - this.weakMap = weakMap; - this.schedule = schedule; - } - - @Override - public int threadHash() { - return hashCode(); - } - - @Override - public int getFlags() { - return 0; - } - - @Override - public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { - - for (final Resource p : schedule) { - - graph.forPossibleInverse(p, new AsyncProcedure() { - - private void register(AsyncReadGraph graph, Resource predicate, Resource superRelation, WeakStatus status) { - synchronized (weakMap) { - weakMap.put(predicate, status); - if(superRelation != null) weakMap.put(superRelation, status); - } - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, final Resource inverse) { - - if (inverse == null) { - - register(graph, p, null, WeakStatus.WEAK); - - } else { - - graph.forPossibleSuperrelation(inverse, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, final Resource superRelation) { - - if(superRelation != null && weakMap.containsKey(superRelation)) { - register(graph, p, null, weakMap.get(superRelation)); - return; - } - - graph.forIsSubrelationOf(inverse, graph.getService(Layer0.class).IsRelatedTo, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph,Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph,Boolean strong) { - register(graph, p, superRelation, strong ? WeakStatus.STRONG : WeakStatus.WEAK); - } - - }); - - } - - }); - - } - - } - - }); - - } - - procedure.execute(graph, false); - - } - - } - - private static Collection getParents(ReadGraph g, Resource r, boolean isStrong) throws DatabaseException { - - System.out.println("getParents " + NameUtils.getSafeName(g, r)); - - Layer0 l0 = Layer0.getInstance(g); - - Collection predicates = g.getPredicates(r); - - // --- Consists Of ---------------------------------------------------- - - if (predicates.contains(l0.PartOf)) { - Collection parents = g.getObjects(r, l0.PartOf); - if (parents.size() == 1) - return parents; - ArrayList libraryParents = new ArrayList(1); - for (Resource p : parents) - if (g.isInstanceOf(p, l0.Library)) - libraryParents.add(p); - if (!libraryParents.isEmpty()) - return libraryParents; - else - return parents; - } - - // --- Ordered sets --------------------------------------------------- - - { - Collection parents = null; - for (Resource p : predicates) - if (g.isInstanceOf(p, l0.OrderedSet) && !p.equals(r)) { - if (parents == null) - parents = new ArrayList(1); - parents.add(p); - } - if (parents != null) { - if (DEBUG) - System.out.println("ORDERED SET"); - return parents; - } - } - - - if (isStrong) - return Collections.emptyList(); - else { - - if (predicates.contains(l0.InverseOf)) { - - Resource inv = g.getInverse(r); - return getParents(g, inv, true); - - } else { - - /* - * Depends On - * - * If there are DependsOn parents, then IsRelatedTo parents are discarded - * - */ - HashSet result = new HashSet(); - for(Resource predicate : predicates) { - if(g.isSubrelationOf(predicate, l0.IsDependencyOf)) result.addAll(g.getObjects(r, predicate)); - } - if(!result.isEmpty()) return result; - - /* - * Is Related To - * - * At this point all Is Related To are parents. - * - */ - for(Resource predicate : predicates) { - Resource inv = g.getPossibleInverse(predicate); - if(inv != null) { - if(g.isSubrelationOf(inv, l0.IsRelatedTo)) result.addAll(g.getObjects(r, predicate)); - } - } - - return result; - - } - - /* - Collection invR = g.getObjects(r, b.IsRelatedTo_Inverse); - if (predicates.contains(b.InverseOf)) { - if (invR.size() > 1) { - if (DEBUG) - System.out - .println("###########################################"); - Resource inv = g.getInverse(r); - Collection ret = new ArrayList(); - for (Statement pp : g.getStatements(r, - b.IsRelatedTo_Inverse)) - if (!pp.getPredicate().equals(inv)) { - if (DEBUG) { - System.out.println("<" - + NameUtils.getSafeName(g, pp - .getSubject()) - + "," - + NameUtils.getSafeName(g, pp - .getPredicate()) - + "," - + NameUtils.getSafeName(g, pp - .getObject()) + ">"); - } - ret.add(pp.getObject()); - } - return ret; - } - // System.out.println("?????????????????"); - Collection invParents = getParents(g, - g.getInverse(r), true); - if (!invParents.isEmpty()) - return invParents; - } - if (DEBUG) { - System.out.print("invR"); - for (Resource res : invR) - System.out.print(" " + NameUtils.getSafeName(g, res)); - System.out.println(); - } - return invR; - */ - } - - } - -// public static String getIdentifier(ReadGraph g, Resource r) -// throws DatabaseException { -// Layer0 L0 = Layer0.getInstance(g); -// if (r.equals(g.getRootLibrary())) -// return ""; -// String name = g.getPossibleRelatedValue(r, L0.HasName); -// if (name == null) -// return null; -// Collection parents = getParents(g, r, true); -// if (parents.size() != 1) -// return null; -// for (Resource p : parents) { -// String parentIdentifier = getIdentifier(g, p); -// if (parentIdentifier == null) -// return null; -// return parentIdentifier + "/" + name; -// } -// return null; -// } - - static int kess = 0; - - static class Expansion extends AsyncReadRequest { - - final private Collection roots; - final Collection[] results; - final Collection[] listElements; - - public Expansion(Collection roots, Collection[] results, Collection[] listElements) { - this.roots = roots; - this.results = results; - this.listElements = listElements; - } - - @Override - public void run(AsyncReadGraph graph) { - - QueryControl control = graph.getService(QueryControl.class); - final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); - - final DomainStatementProcedure proc = new DomainStatementProcedure(dqs, graph.getService(StatementSupport.class), graph.getService(Layer0.class), results, listElements); - - int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1; - - final Resource[] rootArray = roots.toArray(Resource.NONE); - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { - - final int start = i * slice; - final int end = Math.min(start + slice, rootArray.length); - - control.schedule(graph, i, new ControlProcedure() { - - @Override - public void execute(AsyncReadGraph graph) { - for (int index = start; index < end; index++) { - dqs.forEachDirectStatement(graph, rootArray[index], proc); - } - - } - - }); - - } - - } - - @Override - public int getFlags() { - return 0; - } - - } - - static class Expansion2 extends AsyncReadRequest { - - final private Collection roots; - final Collection[] results; - final boolean ignoreVirtual; - - public Expansion2(Collection roots, Collection[] results) { - this(roots, results, true); - } - - public Expansion2(Collection roots, Collection[] results, boolean ignoreVirtual) { - this.roots = roots; - this.results = results; - this.ignoreVirtual = ignoreVirtual; - } - - @Override - public void run(AsyncReadGraph graph) { - - QueryControl control = graph.getService(QueryControl.class); - final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); - - final DomainStatementProcedure2 proc = - new DomainStatementProcedure2(results); - - int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1; - - final Resource[] rootArray = roots.toArray(Resource.NONE); - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { - - final int start = i * slice; - final int end = Math.min(start + slice, rootArray.length); - - control.schedule(graph, i, new ControlProcedure() { - @Override - public void execute(AsyncReadGraph graph) { - if (ignoreVirtual) { - for (int index = start; index < end; index++) { - dqs.forEachDirectPersistentStatement(graph, rootArray[index], proc); - } - } else { - for (int index = start; index < end; index++) { - dqs.forEachDirectStatement(graph, rootArray[index], proc); - } - } - } - }); - - } - - } - - @Override - public int getFlags() { - return 0; - } - - } - - static class DomainProcessor2 { - - Serializer variantSerializer; - - int id = 0; - - Set fringe = null; - Set exclusions = new HashSet(); - Set internalDomain = new HashSet(); - Set sharedExternalReferences = null; - TIntSet sharedExternalIds = null; - Set sharedExternalFringe = null; - Set predicates = null; - Set isRelatedToPredicates = null; - Set sharedPredicates = null; - TIntIntHashMap ids = null; - Map specials = null; - Map status = null; - Map weakInverses = null; - -// final ArrayList priorityList = new ArrayList(); - - private long composedObjectCounter = 0; - private long fastInternalCounter = 0; - private long parentExternalCounter = 0; - private long fullInternalCounter = 0; - private long fullExternalCounter = 0; - - private long startupTime = 0; - private long expandTime = 0; - private long fullResolveTime = 0; - private long fastResolveTime = 0; - private long otherStatementTime = 0; - private long parentResolveTime = 0; - private long extentSeedTime = 0; - private long composedPredicateTime = 0; - private long composedObjectTime = 0; - - public void expand(ReadGraph graph, Set fringe, Collection[] expansion) throws DatabaseException { - - long start = System.nanoTime(); - - Collection[]> fullExpansion = new ArrayList[]>(); - QueryControl control = graph.getService(QueryControl.class); - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { - expansion[i] = new ArrayList(); - } - - graph.syncRequest(new Expansion2(fringe, expansion)); - - fringe.clear(); - - expandTime += (System.nanoTime() - start); - - } - - public void classifyPredicates(ReadGraph graph, final Set schedule) throws DatabaseException { - - CollectionSupport cs = graph.getService(CollectionSupport.class); - - final Layer0 L0 = Layer0.getInstance(graph); - - long start = System.nanoTime(); - - final ConcurrentLinkedQueue composedResult = new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue singleResult = new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue sharedResult = new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue> singles = new ConcurrentLinkedQueue>(); - - // Discover singles - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : schedule) { - - graph.forPossibleSuperrelation(predicate, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, final Resource single) { - singles.add(Pair.make(predicate, single)); - } - - }); - - graph.forHasStatement(predicate, L0.SharedRange, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, final Boolean shared) { - if(shared) sharedResult.add(predicate); - } - - }); - - } - - } - - }); - - // Determine singles - final Set singleSchedule = cs.createSet(); - for(Pair pair : singles) { - - Resource single = pair.second; - if(single != null && predicates.add(single)) singleSchedule.add(single); - - } - - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : singleSchedule) { - - graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Boolean strong) { - if (strong) singleResult.add(predicate); - } - - }); - - } - - } - - }); - - isRelatedToPredicates.addAll(singleResult); - sharedPredicates.addAll(sharedResult); - - final Set specialSchedule = cs.createSet(); - - // Classify - for(Pair pair : singles) { - - Resource single = pair.second; - if(single != null) { - if(isRelatedToPredicates.contains(single)) { - isRelatedToPredicates.add(pair.first); - } - } else { - specialSchedule.add(pair.first); - } - - } - - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : specialSchedule) { - - graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Boolean composed) { - if (composed) composedResult.add(predicate); - } - - }); - - } - - } - - }); - - isRelatedToPredicates.addAll(composedResult); - - composedPredicateTime += (System.nanoTime() - start); - - } - - private Set strongInverseSet = new HashSet(); - - public void classifyPredicates(ReadGraph graph, final Collection[] expansion) throws DatabaseException { - - CollectionSupport cs = graph.getService(CollectionSupport.class); - final Set schedule = cs.createSet(); - final Map newPredicates = cs.createMap(Resource.class); - - for (Collection coll : expansion) - for (DirectStatements stms : coll) - for(Statement stm : stms) { - - Resource predicate = stm.getPredicate(); - - if(predicates.add(predicate)) { - Resource inverse = graph.getPossibleInverse(predicate); - schedule.add(predicate); - if(inverse != null) { - newPredicates.put(predicate, inverse); - if(predicates.add(inverse)) schedule.add(inverse); - } - - } - - } - - classifyPredicates(graph, schedule); - - for(Map.Entry entry : newPredicates.entrySet()) { - // Inverse is strong => this has strong inverse - if(isRelatedToPredicates.contains(entry.getValue())) { - strongInverseSet.add(entry.getKey()); - } - // This is strong => inverse has strong inverse - if(isRelatedToPredicates.contains(entry.getKey())) { - strongInverseSet.add(entry.getValue()); - } - } - - } - - /* - * Composed objects are internal. Mark them for expansion. - */ - - public void processFringe(ReadGraph graph, Collection[] expansion, - ObjectOutputStream otherStatementsOutput, ObjectOutputStream valueOutput) throws DatabaseException, IOException { - - SerialisationSupport support = graph.getService(SerialisationSupport.class); - TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class); - - Layer0 L0 = Layer0.getInstance(graph); - - long start = System.nanoTime(); - - for (Collection coll : expansion) - for (DirectStatements stms : coll) { - - Resource subject = stms.getSubject(); - - boolean partOf = false; - for(Statement stm : stms) { - Resource predicate = stm.getPredicate(); - if(L0.PartOf.equals(predicate)) { - partOf = true; - break; - } - } - - ExtentStatus subjectStatus = status.get(subject); - if(LOG && subjectStatus != null) log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); - if(subjectStatus == ExtentStatus.EXTERNAL) continue; - if(partOf && (subjectStatus == null) && graph.getPossibleURI(subject) != null) { - - status.put(subject, ExtentStatus.EXTERNAL); - if(LOG) { - String uri = graph.getPossibleURI(subject); - if(uri == null) log("[EXTERNAL]: No URI for " + subject); - else log("[EXTERNAL] " + uri); - } - - // Check for SharedRange statements - for(Statement stm : stms) { - Resource predicate = stm.getPredicate(); - if(sharedPredicates.contains(predicate)) { - sharedExternalFringe.add(stm.getObject()); - if(LOG) { - log("[SHARED EXTERNAL FRINGE]: " + NameUtils.getSafeName(graph, stm.getObject())); - } - } - } - - } else { - - boolean special = specials.containsKey(subject); - if(LOG) { - if(special) { - log("[SPECIAL] " + NameUtils.getSafeName(graph, subject)); - } - } - - status.put(subject, ExtentStatus.INTERNAL); - if(LOG) log("[INTERNAL] " + NameUtils.getSafeName(graph, subject)); - - int sId = support.getTransientId(subject); - - if(graph.hasValue(subject)) { - Datatype dt = graph.getRelatedValue(subject, L0.HasDataType, Bindings.getBindingUnchecked(Datatype.class)); - Binding b = Bindings.getBinding(dt); - Object _value = graph.getValue(subject, b); - Variant variant = new Variant(b, _value); - byte[] value = variantSerializer.serialize(variant); - if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject)); - valueOutput.writeInt(sId); - valueOutput.writeInt(value.length); - assert (value.length > 0); - valueOutput.write(value); - } - - TIntArrayList stream = new TIntArrayList(); - - for(Statement stm : stms) { - - if(special) { - -// System.err.println("stm=" + stm + " special=" + specials.get(subject)); - - } - - Resource predicate = stm.getPredicate(); - Resource object = stm.getObject(); - - ExtentStatus objectStatus = status.get(object); - - // Strong predicate - if (isRelatedToPredicates.contains(predicate) && (objectStatus != ExtentStatus.EXCLUDED)) { - - int pId = support.getTransientId(predicate); - int oId = support.getTransientId(object); - - if(LOG) { - String s = NameUtils.getSafeName(graph, subject); - String p = NameUtils.getSafeName(graph, predicate); - String o = NameUtils.getSafeName(graph, object); - log("related=" + s + " - " + p + " - " + o); - } - - stream.add(pId); - stream.add(oId); - - if(objectStatus == null) - fringe.add(object); - - } else { - - // Weak predicate - if(objectStatus == ExtentStatus.INTERNAL) { - - // The inverse is also weak (or there is no inverse) - if(!strongInverseSet.contains(predicate)) { - - int pId = support.getTransientId(predicate); - int oId = support.getTransientId(object); - - stream.add(pId); - stream.add(oId); - - if(LOG) { - String s = NameUtils.getSafeName(graph, subject); - String p = NameUtils.getSafeName(graph, predicate); - String o = NameUtils.getSafeName(graph, object); - log("fully weak internal=" + s + " - " + p + " - " + o + " - " + objectStatus); - } - - } else { - - if(LOG) { - String s = NameUtils.getSafeName(graph, subject); - String p = NameUtils.getSafeName(graph, predicate); - String o = NameUtils.getSafeName(graph, object); - log("strong inverse internals=" + s + " - " + p + " - " + o + " - " + objectStatus); - } - - } - - } else { - - if(special) { - -// System.err.println("stm=" + stm + " special=" + specials.get(subject)); - - Statement spec = specials.get(subject); - - // This statement can be specially treated - if(stm.getPredicate().equals(spec.getPredicate()) && stm.getObject().equals(spec.getObject())) { - - int pId = support.getTransientId(predicate); - int oId = support.getTransientId(object); - - if(LOG) { - String s = NameUtils.getSafeName(graph, subject); - String p = NameUtils.getSafeName(graph, predicate); - String o = NameUtils.getSafeName(graph, object); - log("special=" + s + " - " + p + " - " + o); - } - - stream.add(pId); - stream.add(oId); - - } - - } else { - - if(LOG) { - String s = NameUtils.getSafeName(graph, subject); - String p = NameUtils.getSafeName(graph, predicate); - String o = NameUtils.getSafeName(graph, object); - log("weak with unknown object=" + s + " - " + p + " - " + o + " - " + objectStatus); - } - - } - - } - - } - - } - - if(!stream.isEmpty()) { - otherStatementsOutput.writeInt(sId); - otherStatementsOutput.writeInt(stream.size() / 2); - for (int i = 0; i < stream.size(); i++) - otherStatementsOutput.writeInt(stream.getQuick(i)); - } - - } - - } - - composedObjectTime += System.nanoTime() - start; - - } - - public void process(ReadGraph graph, - ObjectOutputStream otherStatementsOutput, - ObjectOutputStream valueOutput) - throws DatabaseException, IOException { - - this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); - - QueryControl control = graph.getService(QueryControl.class); - -// System.err.println("Begin ConsistsOfProcess"); - - /* - * Browse all stm = (s, ConsistsOf, o) - * � All internal URIs are found => from now on, if unidentified resource has PartOf it is external. - * � All s are internal - * � All o are internal - * � All stm are included - */ - for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, true)) { - if (status.put(r, ExtentStatus.INTERNAL) == null) { - String URI = graph.getPossibleURI(r); - if(URI != null) log("URI INTERNAL " + URI); - else log("URI has no URI for " + r); - fringe.add(r); - internalDomain.add(r); - } - } - - /* - * This loop resolves the transitive closure of all p < IsRelatedTo such that p does not contain the SharedRange tag. - * Such resources are guaranteed to be internal. - */ - while(!fringe.isEmpty()) { - -// System.err.println("Process Fringe with " + fringe.size() + "."); - - Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; - -// System.err.println("-expand"); - - // Expand fringe - expand(graph, fringe, expansion); - - /* - * classify all p - * -IsRelatedTo - * -SharedRange - * -Internal / External - */ - -// System.err.println("-classify"); - - classifyPredicates(graph, expansion); - - /* - * for stms in [stms] - * if stms contains predicate PartOf => s is External - * else s is Internal - * for all stm=(s,p,o) in stms - * if p stm is included - * Fringe <- o - */ - -// System.err.println("-process"); - - processFringe(graph, expansion, otherStatementsOutput, valueOutput); - - } - - while(!sharedExternalFringe.isEmpty()) { - - Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; - expand(graph, sharedExternalFringe, expansion); - - for (Collection coll : expansion) - for (DirectStatements stms : coll) { - - Resource subject = stms.getSubject(); - ExtentStatus subjectStatus = status.get(subject); - - if(ExtentStatus.INTERNAL == subjectStatus) { - - if(internalDomain.contains(subject)) continue; - - status.put(subject, ExtentStatus.EXTERNAL); - sharedExternalReferences.add(subject); - - if(LOG) { - log("[SHARED EXTERNAL REFERENCE]: " + NameUtils.getSafeName(graph, subject)); - } - - for(Statement stm : stms) { - Resource predicate = stm.getPredicate(); - if (isRelatedToPredicates.contains(predicate)) { - sharedExternalFringe.add(stm.getObject()); - } - } - - } - } - - } - - if (PROFILE) { - System.out.println(composedObjectCounter + " " + fastInternalCounter - + " " + parentExternalCounter + " " - + fullExternalCounter + " " + fullInternalCounter); - } - - } - - } - - static class DomainProcessor { - - Serializer variantSerializer; - - int id = 0; - - Set predicates = null; - Set composedPredicates = null; - Set expansionSeeds = null; - Map ids = null; - Map status = null; - Map weakInverses = null; - - final Set advisors; - final ArrayList priorityList = new ArrayList(); - - private long composedObjectCounter = 0; - private long fastInternalCounter = 0; - private long parentExternalCounter = 0; - private long fullInternalCounter = 0; - private long fullExternalCounter = 0; - - private long startupTime = 0; - private long expandTime = 0; - private long fullResolveTime = 0; - private long fastResolveTime = 0; - private long otherStatementTime = 0; - private long parentResolveTime = 0; - private long extentSeedTime = 0; - private long composedPredicateTime = 0; - private long composedObjectTime = 0; - - public DomainProcessor(Set advisors) { - this.advisors = advisors; - HashSet prioritySet = new HashSet(); - for (SubgraphAdvisor advisor : advisors) - prioritySet.add(advisor.priority()); - priorityList.addAll(prioritySet); - Collections.sort(priorityList); - } - - public void expand(ReadGraph graph, Collection[] expansion, Set schedule) throws DatabaseException { - - long start = System.nanoTime(); - -// if (DEBUG) -// System.out.println("expanding " + expansionSeeds.size() + " resources."); - - QueryControl control = graph.getService(QueryControl.class); -// final Collection[] results = new ArrayList[control.getAmountOfQueryThreads()]; - final ArrayList[] listElements = new ArrayList[control.getAmountOfQueryThreads()]; - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { -// results[i] = new ArrayList(); - listElements[i] = new ArrayList(); - } - -// if(DEBUG) { -// for(Resource r : expansionSeeds) -// System.out.println("Expanding " + NameUtils.getSafeName(graph, r, true)); -// } - - graph.syncRequest(new Expansion(expansionSeeds, expansion, listElements)); - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { -// for (DirectStatements s : results[i]) { -// expansion.put(s.getSubject(), s); -// } - for (Resource s : listElements[i]) { - schedule.add(s); -// if(status.put(s, ExtentStatus.INTERNAL) == null) { -// ids.put(s, id++); -// } - } - } - - expandTime += (System.nanoTime() - start); - - } - - public void extractComposedPredicates(ReadGraph graph, final Collection[] expansion) throws DatabaseException { - - long start = System.nanoTime(); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - - final ConcurrentLinkedQueue composedResult = new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue singleResult = new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue> singles = new ConcurrentLinkedQueue>(); - - final Set schedule = cs.createSet(); - - for (Collection coll : expansion) - for (DirectStatements stms : coll) - for(Statement stm : stms) { - Resource predicate = stm.getPredicate(); - if(predicates.add(predicate)) schedule.add(predicate); - } - - // Discover singles - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : schedule) { - - graph.forPossibleSuperrelation(predicate, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, final Resource single) { - singles.add(Pair.make(predicate, single)); - } - - }); - - } - - } - - }); - - // Determine singles - final Set singleSchedule = cs.createSet(); - for(Pair pair : singles) { - - Resource single = pair.second; - if(single != null && predicates.add(single)) singleSchedule.add(single); - - } - - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : singleSchedule) { - - graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Boolean composed) { - if (composed) singleResult.add(predicate); - } - - }); - - } - - } - - }); - - composedPredicates.addAll(singleResult); - - final Set specialSchedule = cs.createSet(); - - // Classify - for(Pair pair : singles) { - - Resource single = pair.second; - if(single != null) { - if(composedPredicates.contains(single)) { - composedPredicates.add(pair.first); - } - } else { - specialSchedule.add(pair.first); - } - - } - - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource predicate : specialSchedule) { - - graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Boolean composed) { - if (composed) composedResult.add(predicate); - } - - }); - - } - - } - - }); - - composedPredicates.addAll(composedResult); - - composedPredicateTime += (System.nanoTime() - start); - - } - - /* - * Composed objects are internal. Mark them for expansion. - */ - - public void collectComposedObjects(ReadGraph graph, Collection[] expansion, Set typeTodo, Set objectTodo, - Set predicateTodo) throws DatabaseException { - - long start = System.nanoTime(); - - Layer0 l0 = Layer0.getInstance(graph); - - for (Collection coll : expansion) - for (DirectStatements stms : coll) - for(Statement stm : stms) { - - Resource predicate = stm.getPredicate(); - Resource object = stm.getObject(); - - if (composedPredicates.contains(predicate)) { - - ExtentStatus existing = status.put(object, ExtentStatus.INTERNAL); - if(existing == null) { - ids.put(object, id++); - composedObjectCounter++; - expansionSeeds.add(object); -// System.err.println("internal: " + NameUtils.getSafeName(graph, object, true)); - if(LOG) log("[INTERNAL] (composed object) " + NameUtils.getSafeName(graph, object, true)); - } else if (existing == ExtentStatus.EXCLUDED) { - System.err.println("preExcluded: " + NameUtils.getSafeName(graph, object, true)); - status.put(object, ExtentStatus.EXCLUDED); - } else if (existing == ExtentStatus.EXTERNAL) { - System.err.println("preExternal: " + NameUtils.getSafeName(graph, object, true)); - status.put(object, ExtentStatus.EXTERNAL); - } - - } else { - -// System.err.println("internal2: " + NameUtils.getSafeName(graph, object, true)); - - if (!status.containsKey(object)) { - if (l0.InstanceOf.equalsResource(predicate)) { - typeTodo.add(object); - } else { - objectTodo.add(object); - } - } - - if (!status.containsKey(predicate)) { - predicateTodo.add(predicate); - } - - } - } - - composedObjectTime += System.nanoTime() - start; - - } - - public void writeOtherStatements(ReadGraph graph, Collection[]> expansion, ObjectOutputStream composedStatementsOutput, ObjectOutputStream otherStatementsOutput, - ObjectOutputStream valueOutput) throws DatabaseException { - - long start = System.nanoTime(); - - Layer0 l0 = Layer0.getInstance(graph); - SerialisationSupport support = graph.getService(SerialisationSupport.class); - TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class); - - TIntArrayList other = new TIntArrayList(); - TIntArrayList composed = new TIntArrayList(); - - try { - - for (Collection[] colls : expansion) - for (Collection coll : colls) - for (DirectStatements stms : coll) { - - Resource subject = stms.getSubject(); - composed.resetQuick(); - - int sId = support.getTransientId(subject); - - composedStatementsOutput.writeInt(sId); - - if(graph.hasValue(subject)) { - Datatype dt = graph.getRelatedValue(subject, l0.HasDataType, Bindings.getBindingUnchecked(Datatype.class)); - Binding b = Bindings.getBinding(dt); - Object _value = graph.getValue(subject, b); - Variant variant = new Variant(b, _value); - byte[] value = variantSerializer.serialize(variant); - if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject)); - valueOutput.writeInt(sId); - valueOutput.writeInt(value.length); - assert (value.length > 0); - valueOutput.write(value); - } - - for (Statement s : stms) { - - Resource object = s.getObject(); - Resource predicate = s.getPredicate(); - - ExtentStatus objectStatus = status.get(object); - - if(objectStatus == ExtentStatus.INTERNAL) { - composed.add(support.getTransientId(predicate)); - composed.add(support.getTransientId(object)); - if(LOG) log("[COMPOSED] (internal object) " + NameUtils.toIdString(graph, s)); - } else if (l0.InstanceOf.equalsResource(predicate)) { - composed.add(support.getTransientId(predicate)); - composed.add(support.getTransientId(object)); - if(LOG) log("[COMPOSED] (instanceOf) " + NameUtils.toIdString(graph, s)); - } else if (l0.SubrelationOf.equalsResource(predicate)) { - composed.add(support.getTransientId(predicate)); - composed.add(support.getTransientId(object)); - if(LOG) log("[COMPOSED] (subrelationOf) " + NameUtils.toIdString(graph, s)); - } else { - if(objectStatus == ExtentStatus.EXTERNAL) { - if(DEBUG) - System.out.println("other " + NameUtils.toIdString(graph, s)); - //System.out.println("other.add " + predicate + " - " + object); - other.add(support.getTransientId(predicate)); - other.add(support.getTransientId(object)); - if(LOG) log("[OTHER] (object is external) " + NameUtils.toIdString(graph, s)); - } - } - - } - - if(!other.isEmpty()) { - otherStatementsOutput.writeInt(sId); - otherStatementsOutput.writeInt(other.size() / 2); - for (int i = 0; i < other.size(); i++) - otherStatementsOutput.writeInt(other.getQuick(i)); - other.resetQuick(); - } - - composedStatementsOutput.writeInt(composed.size() / 2); - for (int i = 0; i < composed.size(); i++) - composedStatementsOutput.writeInt(composed.getQuick(i)); - - } - - } catch (IOException e) { - e.printStackTrace(); - } - - otherStatementTime += (System.nanoTime() - start); - - } - - boolean hasStrictParents(ReadGraph g, Resource r) - throws DatabaseException { - if (g.getPossibleURI(r) != null) - return true; - return false; - } - - public boolean getExpansionSeedsFromExtents(ReadGraph graph, final Collection[] expansion) throws DatabaseException { - - long start = System.nanoTime(); - - final ConcurrentLinkedQueue accepts = new ConcurrentLinkedQueue(); - - /* - * Determine statements which could accept statements with todo - * objects - */ - search: for (Double priority : priorityList) { - - for (final SubgraphAdvisor advisor : advisors) { - - if (advisor.priority() > 0) - continue; - - if (advisor.priority() == priority) { - - graph.syncRequest(new ReadRequest() { - - @Override - public void run(ReadGraph graph) throws DatabaseException { - - for (Collection coll : expansion) - for (DirectStatements stms : coll) - for(final Statement stm : stms) { - - advisor.advise(graph, stm, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Boolean accept) { - if (accept) { - accepts.add(stm.getObject()); - } - } - - }); - - } - - } - - }); - } - if (!accepts.isEmpty()) - break search; - } - } - - CollectionSupport cs = graph.getService(CollectionSupport.class); - Set schedule = cs.createSet(); - for (Resource r : accepts) { - if(!status.containsKey(r)) - schedule.add(r); - } - - extentSeedTime += (System.nanoTime() - start); - - if (schedule.isEmpty()) - return false; - - fastResolve(graph, schedule); - uriResolve(graph, schedule); - fullResolve(graph, schedule, "accepts"); - - return true; - - } - - ConcurrentLinkedQueue fastInternals = new ConcurrentLinkedQueue(); - - public void fastResolve(ReadGraph graph, final Set rs) - throws DatabaseException { - // This collects and resolves weaks - if(fastResolveLoop(graph, rs)) - // Weaks are now resolved - fastResolveLoop(graph, rs); - } - - public boolean fastResolveLoop(ReadGraph graph, final Set rs) - throws DatabaseException { - - long start = System.nanoTime(); - - final ConcurrentLinkedQueue weakSchedule = new ConcurrentLinkedQueue(); - - graph.syncRequest(new AsyncRead() { - - @Override - public int threadHash() { - return hashCode(); - } - - @Override - public int getFlags() { - return 0; - } - - @Override - public void perform(AsyncReadGraph graph, - AsyncProcedure procedure) { - - QueryControl control = graph.getService(QueryControl.class); - final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); - - int slice = (int) (rs.size() / control - .getAmountOfQueryThreads()) + 1; - - final Resource[] rootArray = rs.toArray(Resource.NONE); - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { - - final int start = i * slice; - final int end = Math.min(start + slice, - rootArray.length); - - control.schedule(graph, i, new ControlProcedure() { - - @Override - public void execute(AsyncReadGraph graph) { - - for (int index = start; index < end; index++) { - - final Resource r = rootArray[index]; - - //if (status.containsKey(r)) continue; - - graph.asyncRequest(new FastInternalRequest(dqs, r, status, weakInverses, weakSchedule),new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph,Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph,Boolean isInternal) { - if (isInternal) { - fastInternals.add(r); - } - } - - }); - - } - - } - - }); - - } - - procedure.execute(graph, true); - - } - - }); - - if (!weakSchedule.isEmpty()) { - THashSet weaks = new THashSet(weakSchedule); - if (CLASSIFY_LOG) - for (Resource p : weakSchedule) - System.out.println("classify " - + NameUtils.getSafeName(graph, p)); - graph.syncRequest(new ClassifyStatementsRequest(weaks, weakInverses)); - } - - for (Resource r : fastInternals) { - rs.remove(r); - if (status.put(r, ExtentStatus.INTERNAL) == null) { - if(LOG) log("[INTERNAL] (fast) " + NameUtils.getSafeName(graph, r, true)); - ids.put(r, id++); - fastInternalCounter++; - expansionSeeds.add(r); - } - } - - fastResolveTime += (System.nanoTime() - start); - - return !weakSchedule.isEmpty(); - - } - - private ExtentStatus resolveExtent(ReadGraph graph, Resource r, Map statuses, Set expansionSeeds, THashSet pending, - ArrayList stack) throws DatabaseException { - - ExtentStatus current = statuses.get(r); - if(current != null) return current; - - if (pending.contains(r)) - return ExtentStatus.PENDING; - - // In order to break cyclic dependencies - pending.add(r); - - if (PARENT_DEBUG) - System.out.println("resolveExtent " - + NameUtils.getSafeName(graph, r)); - - ExtentStatus status = ExtentStatus.INTERNAL; - for (Resource p : getParents(graph, r)) { - if (PARENT_DEBUG) { - ExtentStatus ps = statuses.get(p); - System.out.println(" parent " + NameUtils.getSafeName(graph, p) + "(" + ps + ")"); - } - switch (resolveExtent(graph, p, statuses, - expansionSeeds, pending, stack)) { - case EXTERNAL: - return ExtentStatus.EXTERNAL; - case PENDING: - status = ExtentStatus.PENDING; - } - } - if (status == ExtentStatus.INTERNAL) { - pending.remove(r); - stack.add(r); - if (DEBUG) - System.out.println(NameUtils.getSafeName(graph, r, true) - + " is internal."); - } - return status; - } - - public void uriResolve(ReadGraph graph, final Set todo) - throws DatabaseException { - - long start = System.nanoTime(); - - for(Resource r : todo) System.out.println("uriResolve " + - NameUtils.getSafeName(graph, r)); - - final ConcurrentSkipListSet found = new ConcurrentSkipListSet(); - - graph.syncRequest(new AsyncReadRequest() { - - @Override - public void run(AsyncReadGraph graph) { - - for (final Resource r : todo) { - - // System.out.println("uriresolve before " + r); - - if (status.containsKey(r)) continue; - - // System.out.println("uriresolve " + r); - - graph.forURI(r, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, String uri) { - - if (uri != null) { - - // System.out.println("uriresolve has uri " - // + r); - - if(found.add(r)) { - parentExternalCounter++; - } - - } else { - - // System.out.println("uriresolve ask inverse " - // + r); - - graph.forPossibleInverse(r, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, Resource inverse) { - - if (inverse != null) { - - graph.forURI(inverse, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void execute(AsyncReadGraph graph, String uri) { - - if (uri != null) { - - if(found.add(r)) { - parentExternalCounter++; - } - - } - - } - - }); - - } - - } - - }); - - } - } - - }); - - } - - } - - }); - - todo.removeAll(found); - for(Resource r : found) { - status.put(r, ExtentStatus.EXTERNAL); - if(LOG) log("[EXTERNAL] (uriResolve) " + NameUtils.getSafeName(graph, r, true)); - } - - parentResolveTime += System.nanoTime() - start; - - } - - public void fullResolve(ReadGraph graph, Collection rs, - String koss) throws DatabaseException { - - long start = System.nanoTime(); - - for (final Resource r : rs) { - - if(status.containsKey(r)) continue; - - THashSet pending = new THashSet(); - ArrayList stack = new ArrayList(); - - ExtentStatus s = resolveExtent(graph, r, status, expansionSeeds, pending, stack); - if (ExtentStatus.INTERNAL == s || ExtentStatus.PENDING == s) { - if (status.put(r, ExtentStatus.INTERNAL) == null) { - if(LOG) log("[INTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true)); - ids.put(r, id++); - fullInternalCounter++; - expansionSeeds.add(r); - } - } - if (ExtentStatus.EXTERNAL == s) { - if (status.put(r, ExtentStatus.EXTERNAL) == null) { - if(LOG) log("[EXTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true)); - fullExternalCounter++; - } - } - - } - - fullResolveTime += (System.nanoTime() - start); - - } - - public void process(ReadGraph graph, - ObjectOutputStream composedStatementsOutput, - ObjectOutputStream otherStatementsOutput, - ObjectOutputStream valueOutput) - throws DatabaseException { - - this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - - Set typeTodo = cs.createSet(); - Set objectTodo = cs.createSet(); - Set predicateTodo = cs.createSet(); - - Collection[]> fullExpansion = new ArrayList[]>(); - - do { - - QueryControl control = graph.getService(QueryControl.class); - Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; - for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { - expansion[i] = new ArrayList(); - } - - // Expand expansionSeeds - expand(graph, expansion, objectTodo); - - // Start collecting new seeds - expansionSeeds = cs.createSet(); - - // Collect predicates which are roots, Map preStatus, - Map specials, - ObjectOutputStream otherStatementsOutput, - ObjectOutputStream valueOutput, - TreeMap extensions, - TIntHashSet excludedShared) throws DatabaseException { - - ITask task = ThreadLogger.getInstance().begin("getDomain2"); - - final DomainProcessor2 processor = new DomainProcessor2(); - - processor.startupTime = System.nanoTime(); - - Layer0 l0 = Layer0.getInstance(graph); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - SerialisationSupport support = graph.getService(SerialisationSupport.class); - - processor.ids = ids; - processor.specials = specials; - processor.status = cs.createMap(ExtentStatus.class); - processor.weakInverses = cs.createMap(WeakStatus.class); - processor.predicates = cs.createSet(); - processor.isRelatedToPredicates = cs.createSet(); - processor.sharedPredicates = cs.createSet(); -// processor.expansionSeeds = cs.createSet(); - - for(Map.Entry entry : preStatus.entrySet()) { - processor.status.put(entry.getKey(), entry.getValue()); - if(ExtentStatus.EXCLUDED.equals(entry.getValue())) processor.exclusions.add(entry.getKey()); - } - -// for (Resource r : excluded) { -// processor.status.put(r, ExtentStatus.EXCLUDED); -// } - - Resource rootLibrary = graph.getResource("http:/"); - - if (!roots.contains(rootLibrary)) - processor.status.put(rootLibrary, ExtentStatus.EXTERNAL); - - for (Resource root : roots) { - processor.status.put(root, ExtentStatus.INTERNAL); - //processor.ids.put(support.getTransientId(root), processor.ids.size()); - for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) { - processor.status.put(owner, ExtentStatus.EXTERNAL); - } - } - - processor.startupTime = System.nanoTime() - processor.startupTime; - - processor.fringe = new HashSet(); - processor.fringe.addAll(roots); - - processor.internalDomain.addAll(roots); - - processor.sharedExternalReferences = new HashSet(); - processor.sharedExternalFringe = new HashSet(); - - try { - - processor.process(graph, otherStatementsOutput, valueOutput); - - } catch (IOException e) { - e.printStackTrace(); - } - - for(Resource r : processor.sharedExternalReferences) excludedShared.add(support.getTransientId(r)); - - ClusteringSupport cls = graph.getService(ClusteringSupport.class); - TLongObjectHashMap clusterMap = new TLongObjectHashMap(); - for(Map.Entry entry : processor.status.entrySet()) { - if(ExtentStatus.INTERNAL == entry.getValue()) { - - long cluster = cls.getCluster(entry.getKey()); - TIntArrayList list = clusterMap.get(cluster); - if(list == null) { - list = new TIntArrayList(); - clusterMap.put(cluster, list); - } - list.add(support.getTransientId(entry.getKey())); - - } - } - final TIntArrayList clustering = new TIntArrayList(); - clusterMap.forEachEntry(new TLongObjectProcedure() { - - @Override - public boolean execute(long cluster, TIntArrayList b) { - clustering.add(b.size()); - b.forEach(new TIntProcedure() { - - @Override - public boolean execute(int rId) { - processor.ids.put(rId, processor.id++); - return true; - } - - }); - return true; - } - - }); - - extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray())); - - long total = processor.startupTime + processor.expandTime - + processor.composedPredicateTime - + processor.composedObjectTime + processor.extentSeedTime - + processor.fullResolveTime + processor.fastResolveTime + - + processor.parentResolveTime + processor.otherStatementTime; - - if (PROFILE) { - System.out.println("startup took " + 1e-9 * processor.startupTime - + "s."); - System.out.println("expand took " + 1e-9 * processor.expandTime - + "s."); - System.out.println("composedPredicates took " + 1e-9 - * processor.composedPredicateTime + "s."); - System.out.println("composedObjects took " + 1e-9 - * processor.composedObjectTime + "s."); - System.out.println("extentSeeding took " + 1e-9 - * processor.extentSeedTime + "s."); - System.out.println("fullResolve took " + 1e-9 - * processor.fullResolveTime + "s."); - System.out.println("fastResolve took " + 1e-9 - * processor.fastResolveTime + "s."); - System.out.println("parentResolve took " + 1e-9 - * processor.parentResolveTime + "s."); - System.out.println("otherStatements took " + 1e-9 - * processor.otherStatementTime + "s."); - System.out.println("total " + 1e-9 * total + "s."); - } - - task.finish(); - - - } - - - public static void getDomain(ReadGraph graph, Map ids, - Collection roots, Map preStatus, Set advisors, - ObjectOutputStream composedStatementsOutput, - ObjectOutputStream otherStatementsOutput, - ObjectOutputStream valueOutput) throws DatabaseException { - - ITask task = ThreadLogger.getInstance().begin("getDomain"); - - DomainProcessor processor = new DomainProcessor(advisors); - - processor.startupTime = System.nanoTime(); - - Layer0 l0 = Layer0.getInstance(graph); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - - processor.ids = ids; - processor.status = cs.createMap(ExtentStatus.class); - processor.weakInverses = cs.createMap(WeakStatus.class); - processor.predicates = cs.createSet(); - processor.composedPredicates = cs.createSet(); - processor.expansionSeeds = cs.createSet(); - - for(Map.Entry entry : preStatus.entrySet()) { - processor.status.put(entry.getKey(), entry.getValue()); - } - -// for (Resource r : excluded) { -// processor.status.put(r, ExtentStatus.EXCLUDED); -// } - - if (!roots.contains(graph.getRootLibrary())) - processor.status.put(graph.getRootLibrary(), ExtentStatus.EXTERNAL); - - for (Resource root : roots) { - processor.status.put(root, ExtentStatus.INTERNAL); - processor.ids.put(root, processor.id++); - for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) { - processor.status.put(owner, ExtentStatus.EXTERNAL); - } - } - - - processor.expansionSeeds.addAll(roots); - - processor.startupTime = System.nanoTime() - processor.startupTime; - - while (!processor.expansionSeeds.isEmpty()) { - - processor.process(graph, composedStatementsOutput, - otherStatementsOutput, valueOutput); - - } - - long total = processor.startupTime + processor.expandTime - + processor.composedPredicateTime - + processor.composedObjectTime + processor.extentSeedTime - + processor.fullResolveTime + processor.fastResolveTime + - + processor.parentResolveTime + processor.otherStatementTime; - - if (PROFILE) { - System.out.println("startup took " + 1e-9 * processor.startupTime - + "s."); - System.out.println("expand took " + 1e-9 * processor.expandTime - + "s."); - System.out.println("composedPredicates took " + 1e-9 - * processor.composedPredicateTime + "s."); - System.out.println("composedObjects took " + 1e-9 - * processor.composedObjectTime + "s."); - System.out.println("extentSeeding took " + 1e-9 - * processor.extentSeedTime + "s."); - System.out.println("fullResolve took " + 1e-9 - * processor.fullResolveTime + "s."); - System.out.println("fastResolve took " + 1e-9 - * processor.fastResolveTime + "s."); - System.out.println("parentResolve took " + 1e-9 - * processor.parentResolveTime + "s."); - System.out.println("otherStatements took " + 1e-9 - * processor.otherStatementTime + "s."); - System.out.println("total " + 1e-9 * total + "s."); - } - - task.finish(); - - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.layer0.util; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Databoard; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.mutable.Variant; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Datatype; +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.DirectStatements; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.Statement; +import org.simantics.db.common.request.AsyncReadRequest; +import org.simantics.db.common.request.ReadRequest; +import org.simantics.db.common.request.ResourceAsyncRead; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.layer0.adapter.SubgraphAdvisor; +import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; +import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.request.AsyncRead; +import org.simantics.db.service.ClusteringSupport; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.QueryControl; +import org.simantics.db.service.QueryControl.ControlProcedure; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.StatementSupport; +import org.simantics.db.service.TransferableGraphSupport; +import org.simantics.graph.representation.Extensions; +import org.simantics.layer0.Layer0; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.logger.ITask; +import org.simantics.utils.threads.logger.ThreadLogger; + +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TLongObjectHashMap; +import gnu.trove.procedure.TIntProcedure; +import gnu.trove.procedure.TLongObjectProcedure; +import gnu.trove.set.TIntSet; +import gnu.trove.set.hash.THashSet; +import gnu.trove.set.hash.TIntHashSet; + +public class Subgraphs { + + public static String LOG_FILE = "export.log"; + final static private boolean LOG = false; + final static private boolean DEBUG = false; + final static private boolean PARENT_DEBUG = DEBUG | false; + final static private boolean EXTERNAL_DEBUG = DEBUG | false; + final static private boolean ADVISOR_LOG = LOG & false; + final static private boolean EXPANSION_LOG = LOG & false; + final static private boolean INTERNAL_LOG = LOG & false; + final static private boolean COMPOSED_LOG = LOG & false; + final static private boolean RESOLVE_LOG = LOG & false; + final static private boolean CLASSIFY_LOG = LOG & false; + final static private boolean EXTERNAL_LOG = LOG & false; + final static private boolean PROFILE = false; + + static enum WeakStatus { + STRONG, WEAK + } + + + static DataOutput log; + + static { + + if (LOG) { + try { + FileOutputStream stream = new FileOutputStream(LOG_FILE, false); + log = new DataOutputStream(stream); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } + + } + + private static void log(String line) { + if (LOG) { + try { + log.write((line + "\n").getBytes()); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public static Collection getParents(ReadGraph g, Resource r) + throws DatabaseException { + return getParents(g, r, false); + } + + static class FastInternalRequest extends ResourceAsyncRead { + + final DirectQuerySupport dqs; + final ConcurrentLinkedQueue queue; + final Map weakInverses; + final Map status; + + public FastInternalRequest(DirectQuerySupport dqs, Resource resource, + Map status, + Map weakInverses, + ConcurrentLinkedQueue queue) { + super(resource); + this.dqs = dqs; + this.status = status; + this.weakInverses = weakInverses; + this.queue = queue; + } + + @Override + public int getFlags() { + return 0; + } + + @Override + public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { + + dqs.forEachDirectStatement(graph, resource, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, DirectStatements ss) { + boolean ok = true; + for(Statement statement : ss) { + if (status.get(statement.getObject()) == ExtentStatus.INTERNAL) continue; + WeakStatus status = weakInverses.get(statement.getPredicate()); + if(status == WeakStatus.WEAK) continue; + else if (status == null) { + queue.add(statement.getPredicate()); + } + ok = false; + } + procedure.execute(graph, ok); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + }); + + } + + } + + static class ClassifyStatementsRequest implements AsyncRead { + + final Set schedule; + final Map weakMap; + + public ClassifyStatementsRequest(Set schedule, Map weakMap) { + this.weakMap = weakMap; + this.schedule = schedule; + } + + @Override + public int threadHash() { + return hashCode(); + } + + @Override + public int getFlags() { + return 0; + } + + @Override + public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { + + for (final Resource p : schedule) { + + graph.forPossibleInverse(p, new AsyncProcedure() { + + private void register(AsyncReadGraph graph, Resource predicate, Resource superRelation, WeakStatus status) { + synchronized (weakMap) { + weakMap.put(predicate, status); + if(superRelation != null) weakMap.put(superRelation, status); + } + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, final Resource inverse) { + + if (inverse == null) { + + register(graph, p, null, WeakStatus.WEAK); + + } else { + + graph.forPossibleSuperrelation(inverse, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, final Resource superRelation) { + + if(superRelation != null && weakMap.containsKey(superRelation)) { + register(graph, p, null, weakMap.get(superRelation)); + return; + } + + graph.forIsSubrelationOf(inverse, graph.getService(Layer0.class).IsRelatedTo, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph,Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph,Boolean strong) { + register(graph, p, superRelation, strong ? WeakStatus.STRONG : WeakStatus.WEAK); + } + + }); + + } + + }); + + } + + } + + }); + + } + + procedure.execute(graph, false); + + } + + } + + private static Collection getParents(ReadGraph g, Resource r, boolean isStrong) throws DatabaseException { + + System.out.println("getParents " + NameUtils.getSafeName(g, r)); + + Layer0 l0 = Layer0.getInstance(g); + + Collection predicates = g.getPredicates(r); + + // --- Consists Of ---------------------------------------------------- + + if (predicates.contains(l0.PartOf)) { + Collection parents = g.getObjects(r, l0.PartOf); + if (parents.size() == 1) + return parents; + ArrayList libraryParents = new ArrayList(1); + for (Resource p : parents) + if (g.isInstanceOf(p, l0.Library)) + libraryParents.add(p); + if (!libraryParents.isEmpty()) + return libraryParents; + else + return parents; + } + + // --- Ordered sets --------------------------------------------------- + + { + Collection parents = null; + for (Resource p : predicates) + if (g.isInstanceOf(p, l0.OrderedSet) && !p.equals(r)) { + if (parents == null) + parents = new ArrayList(1); + parents.add(p); + } + if (parents != null) { + if (DEBUG) + System.out.println("ORDERED SET"); + return parents; + } + } + + + if (isStrong) + return Collections.emptyList(); + else { + + if (predicates.contains(l0.InverseOf)) { + + Resource inv = g.getInverse(r); + return getParents(g, inv, true); + + } else { + + /* + * Depends On + * + * If there are DependsOn parents, then IsRelatedTo parents are discarded + * + */ + HashSet result = new HashSet(); + for(Resource predicate : predicates) { + if(g.isSubrelationOf(predicate, l0.IsDependencyOf)) result.addAll(g.getObjects(r, predicate)); + } + if(!result.isEmpty()) return result; + + /* + * Is Related To + * + * At this point all Is Related To are parents. + * + */ + for(Resource predicate : predicates) { + Resource inv = g.getPossibleInverse(predicate); + if(inv != null) { + if(g.isSubrelationOf(inv, l0.IsRelatedTo)) result.addAll(g.getObjects(r, predicate)); + } + } + + return result; + + } + + /* + Collection invR = g.getObjects(r, b.IsRelatedTo_Inverse); + if (predicates.contains(b.InverseOf)) { + if (invR.size() > 1) { + if (DEBUG) + System.out + .println("###########################################"); + Resource inv = g.getInverse(r); + Collection ret = new ArrayList(); + for (Statement pp : g.getStatements(r, + b.IsRelatedTo_Inverse)) + if (!pp.getPredicate().equals(inv)) { + if (DEBUG) { + System.out.println("<" + + NameUtils.getSafeName(g, pp + .getSubject()) + + "," + + NameUtils.getSafeName(g, pp + .getPredicate()) + + "," + + NameUtils.getSafeName(g, pp + .getObject()) + ">"); + } + ret.add(pp.getObject()); + } + return ret; + } + // System.out.println("?????????????????"); + Collection invParents = getParents(g, + g.getInverse(r), true); + if (!invParents.isEmpty()) + return invParents; + } + if (DEBUG) { + System.out.print("invR"); + for (Resource res : invR) + System.out.print(" " + NameUtils.getSafeName(g, res)); + System.out.println(); + } + return invR; + */ + } + + } + +// public static String getIdentifier(ReadGraph g, Resource r) +// throws DatabaseException { +// Layer0 L0 = Layer0.getInstance(g); +// if (r.equals(g.getRootLibrary())) +// return ""; +// String name = g.getPossibleRelatedValue(r, L0.HasName); +// if (name == null) +// return null; +// Collection parents = getParents(g, r, true); +// if (parents.size() != 1) +// return null; +// for (Resource p : parents) { +// String parentIdentifier = getIdentifier(g, p); +// if (parentIdentifier == null) +// return null; +// return parentIdentifier + "/" + name; +// } +// return null; +// } + + static int kess = 0; + + static class Expansion extends AsyncReadRequest { + + final private Collection roots; + final Collection[] results; + final Collection[] listElements; + + public Expansion(Collection roots, Collection[] results, Collection[] listElements) { + this.roots = roots; + this.results = results; + this.listElements = listElements; + } + + @Override + public void run(AsyncReadGraph graph) { + + QueryControl control = graph.getService(QueryControl.class); + final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); + + final DomainStatementProcedure proc = new DomainStatementProcedure(dqs, graph.getService(StatementSupport.class), graph.getService(Layer0.class), results, listElements); + + int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1; + + final Resource[] rootArray = roots.toArray(Resource.NONE); + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { + + final int start = i * slice; + final int end = Math.min(start + slice, rootArray.length); + + control.schedule(graph, i, new ControlProcedure() { + + @Override + public void execute(AsyncReadGraph graph) { + for (int index = start; index < end; index++) { + dqs.forEachDirectStatement(graph, rootArray[index], proc); + } + + } + + }); + + } + + } + + @Override + public int getFlags() { + return 0; + } + + } + + static class Expansion2 extends AsyncReadRequest { + + final private Collection roots; + final Collection[] results; + final boolean ignoreVirtual; + + public Expansion2(Collection roots, Collection[] results) { + this(roots, results, true); + } + + public Expansion2(Collection roots, Collection[] results, boolean ignoreVirtual) { + this.roots = roots; + this.results = results; + this.ignoreVirtual = ignoreVirtual; + } + + @Override + public void run(AsyncReadGraph graph) { + + QueryControl control = graph.getService(QueryControl.class); + final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); + + final DomainStatementProcedure2 proc = + new DomainStatementProcedure2(results); + + int slice = (int) (roots.size() / control.getAmountOfQueryThreads()) + 1; + + final Resource[] rootArray = roots.toArray(Resource.NONE); + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { + + final int start = i * slice; + final int end = Math.min(start + slice, rootArray.length); + + control.schedule(graph, i, new ControlProcedure() { + @Override + public void execute(AsyncReadGraph graph) { + if (ignoreVirtual) { + for (int index = start; index < end; index++) { + dqs.forEachDirectPersistentStatement(graph, rootArray[index], proc); + } + } else { + for (int index = start; index < end; index++) { + dqs.forEachDirectStatement(graph, rootArray[index], proc); + } + } + } + }); + + } + + } + + @Override + public int getFlags() { + return 0; + } + + } + + static class DomainProcessor2 { + + Serializer variantSerializer; + + int id = 0; + + Set fringe = null; + Set exclusions = new HashSet(); + Set internalDomain = new HashSet(); + Set sharedExternalReferences = null; + TIntSet sharedExternalIds = null; + Set sharedExternalFringe = null; + Set predicates = null; + Set isRelatedToPredicates = null; + Set sharedPredicates = null; + TIntIntHashMap ids = null; + Map specials = null; + Map status = null; + Map weakInverses = null; + +// final ArrayList priorityList = new ArrayList(); + + private long composedObjectCounter = 0; + private long fastInternalCounter = 0; + private long parentExternalCounter = 0; + private long fullInternalCounter = 0; + private long fullExternalCounter = 0; + + private long startupTime = 0; + private long expandTime = 0; + private long fullResolveTime = 0; + private long fastResolveTime = 0; + private long otherStatementTime = 0; + private long parentResolveTime = 0; + private long extentSeedTime = 0; + private long composedPredicateTime = 0; + private long composedObjectTime = 0; + + public void expand(ReadGraph graph, Set fringe, Collection[] expansion) throws DatabaseException { + + long start = System.nanoTime(); + + Collection[]> fullExpansion = new ArrayList[]>(); + QueryControl control = graph.getService(QueryControl.class); + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { + expansion[i] = new ArrayList(); + } + + graph.syncRequest(new Expansion2(fringe, expansion)); + + fringe.clear(); + + expandTime += (System.nanoTime() - start); + + } + + public void classifyPredicates(ReadGraph graph, final Set schedule) throws DatabaseException { + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + final Layer0 L0 = Layer0.getInstance(graph); + + long start = System.nanoTime(); + + final ConcurrentLinkedQueue composedResult = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue singleResult = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue sharedResult = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue> singles = new ConcurrentLinkedQueue>(); + + // Discover singles + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : schedule) { + + graph.forPossibleSuperrelation(predicate, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, final Resource single) { + singles.add(Pair.make(predicate, single)); + } + + }); + + graph.forHasStatement(predicate, L0.SharedRange, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, final Boolean shared) { + if(shared) sharedResult.add(predicate); + } + + }); + + } + + } + + }); + + // Determine singles + final Set singleSchedule = cs.createSet(); + for(Pair pair : singles) { + + Resource single = pair.second; + if(single != null && predicates.add(single)) singleSchedule.add(single); + + } + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : singleSchedule) { + + graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean strong) { + if (strong) singleResult.add(predicate); + } + + }); + + } + + } + + }); + + isRelatedToPredicates.addAll(singleResult); + sharedPredicates.addAll(sharedResult); + + final Set specialSchedule = cs.createSet(); + + // Classify + for(Pair pair : singles) { + + Resource single = pair.second; + if(single != null) { + if(isRelatedToPredicates.contains(single)) { + isRelatedToPredicates.add(pair.first); + } + } else { + specialSchedule.add(pair.first); + } + + } + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : specialSchedule) { + + graph.forIsSubrelationOf(predicate, L0.IsRelatedTo, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean composed) { + if (composed) composedResult.add(predicate); + } + + }); + + } + + } + + }); + + isRelatedToPredicates.addAll(composedResult); + + composedPredicateTime += (System.nanoTime() - start); + + } + + private Set strongInverseSet = new HashSet(); + + public void classifyPredicates(ReadGraph graph, final Collection[] expansion) throws DatabaseException { + + CollectionSupport cs = graph.getService(CollectionSupport.class); + final Set schedule = cs.createSet(); + final Map newPredicates = cs.createMap(Resource.class); + + for (Collection coll : expansion) + for (DirectStatements stms : coll) + for(Statement stm : stms) { + + Resource predicate = stm.getPredicate(); + + if(predicates.add(predicate)) { + Resource inverse = graph.getPossibleInverse(predicate); + schedule.add(predicate); + if(inverse != null) { + newPredicates.put(predicate, inverse); + if(predicates.add(inverse)) schedule.add(inverse); + } + + } + + } + + classifyPredicates(graph, schedule); + + for(Map.Entry entry : newPredicates.entrySet()) { + // Inverse is strong => this has strong inverse + if(isRelatedToPredicates.contains(entry.getValue())) { + strongInverseSet.add(entry.getKey()); + } + // This is strong => inverse has strong inverse + if(isRelatedToPredicates.contains(entry.getKey())) { + strongInverseSet.add(entry.getValue()); + } + } + + } + + /* + * Composed objects are internal. Mark them for expansion. + */ + + public void processFringe(ReadGraph graph, Collection[] expansion, + ObjectOutputStream otherStatementsOutput, ObjectOutputStream valueOutput) throws DatabaseException, IOException { + + SerialisationSupport support = graph.getService(SerialisationSupport.class); + TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class); + + Layer0 L0 = Layer0.getInstance(graph); + + long start = System.nanoTime(); + + for (Collection coll : expansion) + for (DirectStatements stms : coll) { + + Resource subject = stms.getSubject(); + + boolean partOf = false; + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if(L0.PartOf.equals(predicate)) { + partOf = true; + break; + } + } + + ExtentStatus subjectStatus = status.get(subject); + if(LOG && subjectStatus != null) log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); + if(subjectStatus == ExtentStatus.EXTERNAL) continue; + if(partOf && (subjectStatus == null) && graph.getPossibleURI(subject) != null) { + + status.put(subject, ExtentStatus.EXTERNAL); + if(LOG) { + String uri = graph.getPossibleURI(subject); + if(uri == null) log("[EXTERNAL]: No URI for " + subject); + else log("[EXTERNAL] " + uri); + } + + // Check for SharedRange statements + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if(sharedPredicates.contains(predicate)) { + sharedExternalFringe.add(stm.getObject()); + if(LOG) { + log("[SHARED EXTERNAL FRINGE]: " + NameUtils.getSafeName(graph, stm.getObject())); + } + } + } + + } else { + + boolean special = specials.containsKey(subject); + if(LOG) { + if(special) { + log("[SPECIAL] " + NameUtils.getSafeName(graph, subject)); + } + } + + status.put(subject, ExtentStatus.INTERNAL); + if(LOG) log("[INTERNAL] " + NameUtils.getSafeName(graph, subject)); + + int sId = support.getTransientId(subject); + + if(graph.hasValue(subject)) { + Datatype dt = graph.getRelatedValue(subject, L0.HasDataType, Bindings.getBindingUnchecked(Datatype.class)); + Binding b = Bindings.getBinding(dt); + Object _value = graph.getValue(subject, b); + Variant variant = new Variant(b, _value); + byte[] value = variantSerializer.serialize(variant); + if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject)); + valueOutput.writeInt(sId); + valueOutput.writeInt(value.length); + assert (value.length > 0); + valueOutput.write(value); + } + + TIntArrayList stream = new TIntArrayList(); + + for(Statement stm : stms) { + + if(special) { + +// System.err.println("stm=" + stm + " special=" + specials.get(subject)); + + } + + Resource predicate = stm.getPredicate(); + Resource object = stm.getObject(); + + ExtentStatus objectStatus = status.get(object); + + // Strong predicate + if (isRelatedToPredicates.contains(predicate) && (objectStatus != ExtentStatus.EXCLUDED)) { + + int pId = support.getTransientId(predicate); + int oId = support.getTransientId(object); + + if(LOG) { + String s = NameUtils.getSafeName(graph, subject); + String p = NameUtils.getSafeName(graph, predicate); + String o = NameUtils.getSafeName(graph, object); + log("related=" + s + " - " + p + " - " + o); + } + + stream.add(pId); + stream.add(oId); + + if(objectStatus == null) + fringe.add(object); + + } else { + + // Weak predicate + if(objectStatus == ExtentStatus.INTERNAL) { + + // The inverse is also weak (or there is no inverse) + if(!strongInverseSet.contains(predicate)) { + + int pId = support.getTransientId(predicate); + int oId = support.getTransientId(object); + + stream.add(pId); + stream.add(oId); + + if(LOG) { + String s = NameUtils.getSafeName(graph, subject); + String p = NameUtils.getSafeName(graph, predicate); + String o = NameUtils.getSafeName(graph, object); + log("fully weak internal=" + s + " - " + p + " - " + o + " - " + objectStatus); + } + + } else { + + if(LOG) { + String s = NameUtils.getSafeName(graph, subject); + String p = NameUtils.getSafeName(graph, predicate); + String o = NameUtils.getSafeName(graph, object); + log("strong inverse internals=" + s + " - " + p + " - " + o + " - " + objectStatus); + } + + } + + } else { + + if(special) { + +// System.err.println("stm=" + stm + " special=" + specials.get(subject)); + + Statement spec = specials.get(subject); + + // This statement can be specially treated + if(stm.getPredicate().equals(spec.getPredicate()) && stm.getObject().equals(spec.getObject())) { + + int pId = support.getTransientId(predicate); + int oId = support.getTransientId(object); + + if(LOG) { + String s = NameUtils.getSafeName(graph, subject); + String p = NameUtils.getSafeName(graph, predicate); + String o = NameUtils.getSafeName(graph, object); + log("special=" + s + " - " + p + " - " + o); + } + + stream.add(pId); + stream.add(oId); + + } + + } else { + + if(LOG) { + String s = NameUtils.getSafeName(graph, subject); + String p = NameUtils.getSafeName(graph, predicate); + String o = NameUtils.getSafeName(graph, object); + log("weak with unknown object=" + s + " - " + p + " - " + o + " - " + objectStatus); + } + + } + + } + + } + + } + + if(!stream.isEmpty()) { + otherStatementsOutput.writeInt(sId); + otherStatementsOutput.writeInt(stream.size() / 2); + for (int i = 0; i < stream.size(); i++) + otherStatementsOutput.writeInt(stream.getQuick(i)); + } + + } + + } + + composedObjectTime += System.nanoTime() - start; + + } + + public void process(ReadGraph graph, + ObjectOutputStream otherStatementsOutput, + ObjectOutputStream valueOutput) + throws DatabaseException, IOException { + + this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); + + QueryControl control = graph.getService(QueryControl.class); + +// System.err.println("Begin ConsistsOfProcess"); + + /* + * Browse all stm = (s, ConsistsOf, o) + * � All internal URIs are found => from now on, if unidentified resource has PartOf it is external. + * � All s are internal + * � All o are internal + * � All stm are included + */ + Pair,Set> pair = ConsistsOfProcess.walk(graph, null, fringe, exclusions, true); + List entries = pair.first; + for(InternalEntry entry : entries) { + Resource r = entry.resource; + if (status.put(r, ExtentStatus.INTERNAL) == null) { + String URI = graph.getPossibleURI(r); + if(URI != null) log("URI INTERNAL " + URI); + else log("URI has no URI for " + r); + fringe.add(r); + internalDomain.add(r); + } + } + + for(Resource unnamedChild : pair.second) { + if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) { + fringe.add(unnamedChild); + } + } + + /* + * This loop resolves the transitive closure of all p < IsRelatedTo such that p does not contain the SharedRange tag. + * Such resources are guaranteed to be internal. + */ + while(!fringe.isEmpty()) { + +// System.err.println("Process Fringe with " + fringe.size() + "."); + + Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; + +// System.err.println("-expand"); + + // Expand fringe + expand(graph, fringe, expansion); + + /* + * classify all p + * -IsRelatedTo + * -SharedRange + * -Internal / External + */ + +// System.err.println("-classify"); + + classifyPredicates(graph, expansion); + + /* + * for stms in [stms] + * if stms contains predicate PartOf => s is External + * else s is Internal + * for all stm=(s,p,o) in stms + * if p stm is included + * Fringe <- o + */ + +// System.err.println("-process"); + + processFringe(graph, expansion, otherStatementsOutput, valueOutput); + + } + + while(!sharedExternalFringe.isEmpty()) { + + Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; + expand(graph, sharedExternalFringe, expansion); + + for (Collection coll : expansion) + for (DirectStatements stms : coll) { + + Resource subject = stms.getSubject(); + ExtentStatus subjectStatus = status.get(subject); + + if(ExtentStatus.INTERNAL == subjectStatus) { + + if(internalDomain.contains(subject)) continue; + + status.put(subject, ExtentStatus.EXTERNAL); + sharedExternalReferences.add(subject); + + if(LOG) { + log("[SHARED EXTERNAL REFERENCE]: " + NameUtils.getSafeName(graph, subject)); + } + + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if (isRelatedToPredicates.contains(predicate)) { + sharedExternalFringe.add(stm.getObject()); + } + } + + } + } + + } + + if (PROFILE) { + System.out.println(composedObjectCounter + " " + fastInternalCounter + + " " + parentExternalCounter + " " + + fullExternalCounter + " " + fullInternalCounter); + } + + } + + } + + static class DomainProcessor { + + Serializer variantSerializer; + + int id = 0; + + Set predicates = null; + Set composedPredicates = null; + Set expansionSeeds = null; + Map ids = null; + Map status = null; + Map weakInverses = null; + + final Set advisors; + final ArrayList priorityList = new ArrayList(); + + private long composedObjectCounter = 0; + private long fastInternalCounter = 0; + private long parentExternalCounter = 0; + private long fullInternalCounter = 0; + private long fullExternalCounter = 0; + + private long startupTime = 0; + private long expandTime = 0; + private long fullResolveTime = 0; + private long fastResolveTime = 0; + private long otherStatementTime = 0; + private long parentResolveTime = 0; + private long extentSeedTime = 0; + private long composedPredicateTime = 0; + private long composedObjectTime = 0; + + public DomainProcessor(Set advisors) { + this.advisors = advisors; + HashSet prioritySet = new HashSet(); + for (SubgraphAdvisor advisor : advisors) + prioritySet.add(advisor.priority()); + priorityList.addAll(prioritySet); + Collections.sort(priorityList); + } + + public void expand(ReadGraph graph, Collection[] expansion, Set schedule) throws DatabaseException { + + long start = System.nanoTime(); + +// if (DEBUG) +// System.out.println("expanding " + expansionSeeds.size() + " resources."); + + QueryControl control = graph.getService(QueryControl.class); +// final Collection[] results = new ArrayList[control.getAmountOfQueryThreads()]; + final ArrayList[] listElements = new ArrayList[control.getAmountOfQueryThreads()]; + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { +// results[i] = new ArrayList(); + listElements[i] = new ArrayList(); + } + +// if(DEBUG) { +// for(Resource r : expansionSeeds) +// System.out.println("Expanding " + NameUtils.getSafeName(graph, r, true)); +// } + + graph.syncRequest(new Expansion(expansionSeeds, expansion, listElements)); + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { +// for (DirectStatements s : results[i]) { +// expansion.put(s.getSubject(), s); +// } + for (Resource s : listElements[i]) { + schedule.add(s); +// if(status.put(s, ExtentStatus.INTERNAL) == null) { +// ids.put(s, id++); +// } + } + } + + expandTime += (System.nanoTime() - start); + + } + + public void extractComposedPredicates(ReadGraph graph, final Collection[] expansion) throws DatabaseException { + + long start = System.nanoTime(); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + final ConcurrentLinkedQueue composedResult = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue singleResult = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue> singles = new ConcurrentLinkedQueue>(); + + final Set schedule = cs.createSet(); + + for (Collection coll : expansion) + for (DirectStatements stms : coll) + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if(predicates.add(predicate)) schedule.add(predicate); + } + + // Discover singles + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : schedule) { + + graph.forPossibleSuperrelation(predicate, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, final Resource single) { + singles.add(Pair.make(predicate, single)); + } + + }); + + } + + } + + }); + + // Determine singles + final Set singleSchedule = cs.createSet(); + for(Pair pair : singles) { + + Resource single = pair.second; + if(single != null && predicates.add(single)) singleSchedule.add(single); + + } + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : singleSchedule) { + + graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean composed) { + if (composed) singleResult.add(predicate); + } + + }); + + } + + } + + }); + + composedPredicates.addAll(singleResult); + + final Set specialSchedule = cs.createSet(); + + // Classify + for(Pair pair : singles) { + + Resource single = pair.second; + if(single != null) { + if(composedPredicates.contains(single)) { + composedPredicates.add(pair.first); + } + } else { + specialSchedule.add(pair.first); + } + + } + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource predicate : specialSchedule) { + + graph.forIsSubrelationOf(predicate, graph.getService(Layer0.class).IsComposedOf, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean composed) { + if (composed) composedResult.add(predicate); + } + + }); + + } + + } + + }); + + composedPredicates.addAll(composedResult); + + composedPredicateTime += (System.nanoTime() - start); + + } + + /* + * Composed objects are internal. Mark them for expansion. + */ + + public void collectComposedObjects(ReadGraph graph, Collection[] expansion, Set typeTodo, Set objectTodo, + Set predicateTodo) throws DatabaseException { + + long start = System.nanoTime(); + + Layer0 l0 = Layer0.getInstance(graph); + + for (Collection coll : expansion) + for (DirectStatements stms : coll) + for(Statement stm : stms) { + + Resource predicate = stm.getPredicate(); + Resource object = stm.getObject(); + + if (composedPredicates.contains(predicate)) { + + ExtentStatus existing = status.put(object, ExtentStatus.INTERNAL); + if(existing == null) { + ids.put(object, id++); + composedObjectCounter++; + expansionSeeds.add(object); +// System.err.println("internal: " + NameUtils.getSafeName(graph, object, true)); + if(LOG) log("[INTERNAL] (composed object) " + NameUtils.getSafeName(graph, object, true)); + } else if (existing == ExtentStatus.EXCLUDED) { + System.err.println("preExcluded: " + NameUtils.getSafeName(graph, object, true)); + status.put(object, ExtentStatus.EXCLUDED); + } else if (existing == ExtentStatus.EXTERNAL) { + System.err.println("preExternal: " + NameUtils.getSafeName(graph, object, true)); + status.put(object, ExtentStatus.EXTERNAL); + } + + } else { + +// System.err.println("internal2: " + NameUtils.getSafeName(graph, object, true)); + + if (!status.containsKey(object)) { + if (l0.InstanceOf.equalsResource(predicate)) { + typeTodo.add(object); + } else { + objectTodo.add(object); + } + } + + if (!status.containsKey(predicate)) { + predicateTodo.add(predicate); + } + + } + } + + composedObjectTime += System.nanoTime() - start; + + } + + public void writeOtherStatements(ReadGraph graph, Collection[]> expansion, ObjectOutputStream composedStatementsOutput, ObjectOutputStream otherStatementsOutput, + ObjectOutputStream valueOutput) throws DatabaseException { + + long start = System.nanoTime(); + + Layer0 l0 = Layer0.getInstance(graph); + SerialisationSupport support = graph.getService(SerialisationSupport.class); + TransferableGraphSupport tgs = graph.getService(TransferableGraphSupport.class); + + TIntArrayList other = new TIntArrayList(); + TIntArrayList composed = new TIntArrayList(); + + try { + + for (Collection[] colls : expansion) + for (Collection coll : colls) + for (DirectStatements stms : coll) { + + Resource subject = stms.getSubject(); + composed.resetQuick(); + + int sId = support.getTransientId(subject); + + composedStatementsOutput.writeInt(sId); + + if(graph.hasValue(subject)) { + Datatype dt = graph.getRelatedValue(subject, l0.HasDataType, Bindings.getBindingUnchecked(Datatype.class)); + Binding b = Bindings.getBinding(dt); + Object _value = graph.getValue(subject, b); + Variant variant = new Variant(b, _value); + byte[] value = variantSerializer.serialize(variant); + if(LOG) log("[VALUE] " + NameUtils.getSafeName(graph, subject)); + valueOutput.writeInt(sId); + valueOutput.writeInt(value.length); + assert (value.length > 0); + valueOutput.write(value); + } + + for (Statement s : stms) { + + Resource object = s.getObject(); + Resource predicate = s.getPredicate(); + + ExtentStatus objectStatus = status.get(object); + + if(objectStatus == ExtentStatus.INTERNAL) { + composed.add(support.getTransientId(predicate)); + composed.add(support.getTransientId(object)); + if(LOG) log("[COMPOSED] (internal object) " + NameUtils.toIdString(graph, s)); + } else if (l0.InstanceOf.equalsResource(predicate)) { + composed.add(support.getTransientId(predicate)); + composed.add(support.getTransientId(object)); + if(LOG) log("[COMPOSED] (instanceOf) " + NameUtils.toIdString(graph, s)); + } else if (l0.SubrelationOf.equalsResource(predicate)) { + composed.add(support.getTransientId(predicate)); + composed.add(support.getTransientId(object)); + if(LOG) log("[COMPOSED] (subrelationOf) " + NameUtils.toIdString(graph, s)); + } else { + if(objectStatus == ExtentStatus.EXTERNAL) { + if(DEBUG) + System.out.println("other " + NameUtils.toIdString(graph, s)); + //System.out.println("other.add " + predicate + " - " + object); + other.add(support.getTransientId(predicate)); + other.add(support.getTransientId(object)); + if(LOG) log("[OTHER] (object is external) " + NameUtils.toIdString(graph, s)); + } + } + + } + + if(!other.isEmpty()) { + otherStatementsOutput.writeInt(sId); + otherStatementsOutput.writeInt(other.size() / 2); + for (int i = 0; i < other.size(); i++) + otherStatementsOutput.writeInt(other.getQuick(i)); + other.resetQuick(); + } + + composedStatementsOutput.writeInt(composed.size() / 2); + for (int i = 0; i < composed.size(); i++) + composedStatementsOutput.writeInt(composed.getQuick(i)); + + } + + } catch (IOException e) { + e.printStackTrace(); + } + + otherStatementTime += (System.nanoTime() - start); + + } + + boolean hasStrictParents(ReadGraph g, Resource r) + throws DatabaseException { + if (g.getPossibleURI(r) != null) + return true; + return false; + } + + public boolean getExpansionSeedsFromExtents(ReadGraph graph, final Collection[] expansion) throws DatabaseException { + + long start = System.nanoTime(); + + final ConcurrentLinkedQueue accepts = new ConcurrentLinkedQueue(); + + /* + * Determine statements which could accept statements with todo + * objects + */ + search: for (Double priority : priorityList) { + + for (final SubgraphAdvisor advisor : advisors) { + + if (advisor.priority() > 0) + continue; + + if (advisor.priority() == priority) { + + graph.syncRequest(new ReadRequest() { + + @Override + public void run(ReadGraph graph) throws DatabaseException { + + for (Collection coll : expansion) + for (DirectStatements stms : coll) + for(final Statement stm : stms) { + + advisor.advise(graph, stm, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean accept) { + if (accept) { + accepts.add(stm.getObject()); + } + } + + }); + + } + + } + + }); + } + if (!accepts.isEmpty()) + break search; + } + } + + CollectionSupport cs = graph.getService(CollectionSupport.class); + Set schedule = cs.createSet(); + for (Resource r : accepts) { + if(!status.containsKey(r)) + schedule.add(r); + } + + extentSeedTime += (System.nanoTime() - start); + + if (schedule.isEmpty()) + return false; + + fastResolve(graph, schedule); + uriResolve(graph, schedule); + fullResolve(graph, schedule, "accepts"); + + return true; + + } + + ConcurrentLinkedQueue fastInternals = new ConcurrentLinkedQueue(); + + public void fastResolve(ReadGraph graph, final Set rs) + throws DatabaseException { + // This collects and resolves weaks + if(fastResolveLoop(graph, rs)) + // Weaks are now resolved + fastResolveLoop(graph, rs); + } + + public boolean fastResolveLoop(ReadGraph graph, final Set rs) + throws DatabaseException { + + long start = System.nanoTime(); + + final ConcurrentLinkedQueue weakSchedule = new ConcurrentLinkedQueue(); + + graph.syncRequest(new AsyncRead() { + + @Override + public int threadHash() { + return hashCode(); + } + + @Override + public int getFlags() { + return 0; + } + + @Override + public void perform(AsyncReadGraph graph, + AsyncProcedure procedure) { + + QueryControl control = graph.getService(QueryControl.class); + final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); + + int slice = (int) (rs.size() / control + .getAmountOfQueryThreads()) + 1; + + final Resource[] rootArray = rs.toArray(Resource.NONE); + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { + + final int start = i * slice; + final int end = Math.min(start + slice, + rootArray.length); + + control.schedule(graph, i, new ControlProcedure() { + + @Override + public void execute(AsyncReadGraph graph) { + + for (int index = start; index < end; index++) { + + final Resource r = rootArray[index]; + + //if (status.containsKey(r)) continue; + + graph.asyncRequest(new FastInternalRequest(dqs, r, status, weakInverses, weakSchedule),new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph,Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph,Boolean isInternal) { + if (isInternal) { + fastInternals.add(r); + } + } + + }); + + } + + } + + }); + + } + + procedure.execute(graph, true); + + } + + }); + + if (!weakSchedule.isEmpty()) { + THashSet weaks = new THashSet(weakSchedule); + if (CLASSIFY_LOG) + for (Resource p : weakSchedule) + System.out.println("classify " + + NameUtils.getSafeName(graph, p)); + graph.syncRequest(new ClassifyStatementsRequest(weaks, weakInverses)); + } + + for (Resource r : fastInternals) { + rs.remove(r); + if (status.put(r, ExtentStatus.INTERNAL) == null) { + if(LOG) log("[INTERNAL] (fast) " + NameUtils.getSafeName(graph, r, true)); + ids.put(r, id++); + fastInternalCounter++; + expansionSeeds.add(r); + } + } + + fastResolveTime += (System.nanoTime() - start); + + return !weakSchedule.isEmpty(); + + } + + private ExtentStatus resolveExtent(ReadGraph graph, Resource r, Map statuses, Set expansionSeeds, THashSet pending, + ArrayList stack) throws DatabaseException { + + ExtentStatus current = statuses.get(r); + if(current != null) return current; + + if (pending.contains(r)) + return ExtentStatus.PENDING; + + // In order to break cyclic dependencies + pending.add(r); + + if (PARENT_DEBUG) + System.out.println("resolveExtent " + + NameUtils.getSafeName(graph, r)); + + ExtentStatus status = ExtentStatus.INTERNAL; + for (Resource p : getParents(graph, r)) { + if (PARENT_DEBUG) { + ExtentStatus ps = statuses.get(p); + System.out.println(" parent " + NameUtils.getSafeName(graph, p) + "(" + ps + ")"); + } + switch (resolveExtent(graph, p, statuses, + expansionSeeds, pending, stack)) { + case EXTERNAL: + return ExtentStatus.EXTERNAL; + case PENDING: + status = ExtentStatus.PENDING; + } + } + if (status == ExtentStatus.INTERNAL) { + pending.remove(r); + stack.add(r); + if (DEBUG) + System.out.println(NameUtils.getSafeName(graph, r, true) + + " is internal."); + } + return status; + } + + public void uriResolve(ReadGraph graph, final Set todo) + throws DatabaseException { + + long start = System.nanoTime(); + + for(Resource r : todo) System.out.println("uriResolve " + + NameUtils.getSafeName(graph, r)); + + final ConcurrentSkipListSet found = new ConcurrentSkipListSet(); + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) { + + for (final Resource r : todo) { + + // System.out.println("uriresolve before " + r); + + if (status.containsKey(r)) continue; + + // System.out.println("uriresolve " + r); + + graph.forURI(r, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, String uri) { + + if (uri != null) { + + // System.out.println("uriresolve has uri " + // + r); + + if(found.add(r)) { + parentExternalCounter++; + } + + } else { + + // System.out.println("uriresolve ask inverse " + // + r); + + graph.forPossibleInverse(r, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, Resource inverse) { + + if (inverse != null) { + + graph.forURI(inverse, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void execute(AsyncReadGraph graph, String uri) { + + if (uri != null) { + + if(found.add(r)) { + parentExternalCounter++; + } + + } + + } + + }); + + } + + } + + }); + + } + } + + }); + + } + + } + + }); + + todo.removeAll(found); + for(Resource r : found) { + status.put(r, ExtentStatus.EXTERNAL); + if(LOG) log("[EXTERNAL] (uriResolve) " + NameUtils.getSafeName(graph, r, true)); + } + + parentResolveTime += System.nanoTime() - start; + + } + + public void fullResolve(ReadGraph graph, Collection rs, + String koss) throws DatabaseException { + + long start = System.nanoTime(); + + for (final Resource r : rs) { + + if(status.containsKey(r)) continue; + + THashSet pending = new THashSet(); + ArrayList stack = new ArrayList(); + + ExtentStatus s = resolveExtent(graph, r, status, expansionSeeds, pending, stack); + if (ExtentStatus.INTERNAL == s || ExtentStatus.PENDING == s) { + if (status.put(r, ExtentStatus.INTERNAL) == null) { + if(LOG) log("[INTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true)); + ids.put(r, id++); + fullInternalCounter++; + expansionSeeds.add(r); + } + } + if (ExtentStatus.EXTERNAL == s) { + if (status.put(r, ExtentStatus.EXTERNAL) == null) { + if(LOG) log("[EXTERNAL] (resolveExtent) " + NameUtils.getSafeName(graph, r, true)); + fullExternalCounter++; + } + } + + } + + fullResolveTime += (System.nanoTime() - start); + + } + + public void process(ReadGraph graph, + ObjectOutputStream composedStatementsOutput, + ObjectOutputStream otherStatementsOutput, + ObjectOutputStream valueOutput) + throws DatabaseException { + + this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + Set typeTodo = cs.createSet(); + Set objectTodo = cs.createSet(); + Set predicateTodo = cs.createSet(); + + Collection[]> fullExpansion = new ArrayList[]>(); + + do { + + QueryControl control = graph.getService(QueryControl.class); + Collection[] expansion = new ArrayList[control.getAmountOfQueryThreads()]; + for (int i = 0; i < control.getAmountOfQueryThreads(); i++) { + expansion[i] = new ArrayList(); + } + + // Expand expansionSeeds + expand(graph, expansion, objectTodo); + + // Start collecting new seeds + expansionSeeds = cs.createSet(); + + // Collect predicates which are roots, Map preStatus, + Map specials, + ObjectOutputStream otherStatementsOutput, + ObjectOutputStream valueOutput, + TreeMap extensions, + TIntHashSet excludedShared) throws DatabaseException { + + ITask task = ThreadLogger.getInstance().begin("getDomain2"); + + final DomainProcessor2 processor = new DomainProcessor2(); + + processor.startupTime = System.nanoTime(); + + Layer0 l0 = Layer0.getInstance(graph); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + SerialisationSupport support = graph.getService(SerialisationSupport.class); + + processor.ids = ids; + processor.specials = specials; + processor.status = cs.createMap(ExtentStatus.class); + processor.weakInverses = cs.createMap(WeakStatus.class); + processor.predicates = cs.createSet(); + processor.isRelatedToPredicates = cs.createSet(); + processor.sharedPredicates = cs.createSet(); +// processor.expansionSeeds = cs.createSet(); + + for(Map.Entry entry : preStatus.entrySet()) { + processor.status.put(entry.getKey(), entry.getValue()); + if(ExtentStatus.EXCLUDED.equals(entry.getValue())) processor.exclusions.add(entry.getKey()); + } + +// for (Resource r : excluded) { +// processor.status.put(r, ExtentStatus.EXCLUDED); +// } + + Resource rootLibrary = graph.getResource("http:/"); + + if (!roots.contains(rootLibrary)) + processor.status.put(rootLibrary, ExtentStatus.EXTERNAL); + + for (Resource root : roots) { + processor.status.put(root, ExtentStatus.INTERNAL); + //processor.ids.put(support.getTransientId(root), processor.ids.size()); + for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) { + processor.status.put(owner, ExtentStatus.EXTERNAL); + } + } + + processor.startupTime = System.nanoTime() - processor.startupTime; + + processor.fringe = new HashSet(); + processor.fringe.addAll(roots); + + processor.internalDomain.addAll(roots); + + processor.sharedExternalReferences = new HashSet(); + processor.sharedExternalFringe = new HashSet(); + + try { + + processor.process(graph, otherStatementsOutput, valueOutput); + + } catch (IOException e) { + e.printStackTrace(); + } + + for(Resource r : processor.sharedExternalReferences) excludedShared.add(support.getTransientId(r)); + + ClusteringSupport cls = graph.getService(ClusteringSupport.class); + TLongObjectHashMap clusterMap = new TLongObjectHashMap(); + for(Map.Entry entry : processor.status.entrySet()) { + if(ExtentStatus.INTERNAL == entry.getValue()) { + + long cluster = cls.getCluster(entry.getKey()); + TIntArrayList list = clusterMap.get(cluster); + if(list == null) { + list = new TIntArrayList(); + clusterMap.put(cluster, list); + } + list.add(support.getTransientId(entry.getKey())); + + } + } + final TIntArrayList clustering = new TIntArrayList(); + clusterMap.forEachEntry(new TLongObjectProcedure() { + + @Override + public boolean execute(long cluster, TIntArrayList b) { + clustering.add(b.size()); + b.forEach(new TIntProcedure() { + + @Override + public boolean execute(int rId) { + processor.ids.put(rId, processor.id++); + return true; + } + + }); + return true; + } + + }); + + extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray())); + + long total = processor.startupTime + processor.expandTime + + processor.composedPredicateTime + + processor.composedObjectTime + processor.extentSeedTime + + processor.fullResolveTime + processor.fastResolveTime + + + processor.parentResolveTime + processor.otherStatementTime; + + if (PROFILE) { + System.out.println("startup took " + 1e-9 * processor.startupTime + + "s."); + System.out.println("expand took " + 1e-9 * processor.expandTime + + "s."); + System.out.println("composedPredicates took " + 1e-9 + * processor.composedPredicateTime + "s."); + System.out.println("composedObjects took " + 1e-9 + * processor.composedObjectTime + "s."); + System.out.println("extentSeeding took " + 1e-9 + * processor.extentSeedTime + "s."); + System.out.println("fullResolve took " + 1e-9 + * processor.fullResolveTime + "s."); + System.out.println("fastResolve took " + 1e-9 + * processor.fastResolveTime + "s."); + System.out.println("parentResolve took " + 1e-9 + * processor.parentResolveTime + "s."); + System.out.println("otherStatements took " + 1e-9 + * processor.otherStatementTime + "s."); + System.out.println("total " + 1e-9 * total + "s."); + } + + task.finish(); + + + } + + + public static void getDomain(ReadGraph graph, Map ids, + Collection roots, Map preStatus, Set advisors, + ObjectOutputStream composedStatementsOutput, + ObjectOutputStream otherStatementsOutput, + ObjectOutputStream valueOutput) throws DatabaseException { + + ITask task = ThreadLogger.getInstance().begin("getDomain"); + + DomainProcessor processor = new DomainProcessor(advisors); + + processor.startupTime = System.nanoTime(); + + Layer0 l0 = Layer0.getInstance(graph); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + processor.ids = ids; + processor.status = cs.createMap(ExtentStatus.class); + processor.weakInverses = cs.createMap(WeakStatus.class); + processor.predicates = cs.createSet(); + processor.composedPredicates = cs.createSet(); + processor.expansionSeeds = cs.createSet(); + + for(Map.Entry entry : preStatus.entrySet()) { + processor.status.put(entry.getKey(), entry.getValue()); + } + +// for (Resource r : excluded) { +// processor.status.put(r, ExtentStatus.EXCLUDED); +// } + + if (!roots.contains(graph.getRootLibrary())) + processor.status.put(graph.getRootLibrary(), ExtentStatus.EXTERNAL); + + for (Resource root : roots) { + processor.status.put(root, ExtentStatus.INTERNAL); + processor.ids.put(root, processor.id++); + for (Resource owner : graph.getObjects(root, l0.IsOwnedBy)) { + processor.status.put(owner, ExtentStatus.EXTERNAL); + } + } + + + processor.expansionSeeds.addAll(roots); + + processor.startupTime = System.nanoTime() - processor.startupTime; + + while (!processor.expansionSeeds.isEmpty()) { + + processor.process(graph, composedStatementsOutput, + otherStatementsOutput, valueOutput); + + } + + long total = processor.startupTime + processor.expandTime + + processor.composedPredicateTime + + processor.composedObjectTime + processor.extentSeedTime + + processor.fullResolveTime + processor.fastResolveTime + + + processor.parentResolveTime + processor.otherStatementTime; + + if (PROFILE) { + System.out.println("startup took " + 1e-9 * processor.startupTime + + "s."); + System.out.println("expand took " + 1e-9 * processor.expandTime + + "s."); + System.out.println("composedPredicates took " + 1e-9 + * processor.composedPredicateTime + "s."); + System.out.println("composedObjects took " + 1e-9 + * processor.composedObjectTime + "s."); + System.out.println("extentSeeding took " + 1e-9 + * processor.extentSeedTime + "s."); + System.out.println("fullResolve took " + 1e-9 + * processor.fullResolveTime + "s."); + System.out.println("fastResolve took " + 1e-9 + * processor.fastResolveTime + "s."); + System.out.println("parentResolve took " + 1e-9 + * processor.parentResolveTime + "s."); + System.out.println("otherStatements took " + 1e-9 + * processor.otherStatementTime + "s."); + System.out.println("total " + 1e-9 * total + "s."); + } + + task.finish(); + + } + +}