X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.layer0%2Fsrc%2Forg%2Fsimantics%2Fdb%2Flayer0%2Futil%2FDomainProcessor3.java;h=7a0a5f06bee35eb5c4eebf87691dd8e8bdf97c3a;hp=fb4f04dee0bd6667546a9a4ed539319f9398b915;hb=e19c37f84fd1ce2d946578f7c05f3e45444ba67a;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java index fb4f04dee..7a0a5f06b 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java @@ -1,560 +1,573 @@ -package org.simantics.db.layer0.util; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Databoard; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.type.Datatype; -import org.simantics.db.DirectStatements; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.ResourceMap; -import org.simantics.db.Statement; -import org.simantics.db.common.primitiverequest.Value; -import org.simantics.db.common.procedure.adapter.TransientCacheListener; -import org.simantics.db.common.utils.NameUtils; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; -import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3; -import org.simantics.db.service.CollectionSupport; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.db.service.TransferableGraphSupport; -import org.simantics.graph.db.TransferableGraphSource; -import org.simantics.layer0.Layer0; -import org.simantics.scl.runtime.function.Function1; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntIntHashMap; - -public class DomainProcessor3 { - - public enum ExclusionDecision { - INCLUDE, EXCLUDE_OBJECT - } - - final static private boolean PROFILE = false; - - Serializer variantSerializer; - Serializer datatypeSerializer; - Binding datatypeBinding; - boolean ignoreVirtual; - - int id = 0; - - Set fringe = null; - Set exclusions = null; - Function1 exclusionFunction = null; - Set predicates = null; - Map isRelatedToPredicates = null; - Set deadPredicates = null; - Set strongInverseSet = null; - - TIntIntHashMap ids = null; - ResourceMap status = null; - Map bindings = new HashMap(); - final SerialisationSupport support; - final TransferableGraphConfiguration2 conf; - final TransferableGraphSupport tgs; - - private Layer0 L0; - - private long composedObjectCounter = 0; - private long fastInternalCounter = 0; - private long parentExternalCounter = 0; - private long fullInternalCounter = 0; - private long fullExternalCounter = 0; - - long startupTime = 0; - long expandTime = 0; - long fullResolveTime = 0; - long fastResolveTime = 0; - long otherStatementTime = 0; - long parentResolveTime = 0; - long extentSeedTime = 0; - long classifyPredicateTime = 0; - long processFringeTime = 0; - long valueOutputTime = 0; - long statementOutputTime = 0; - - public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { - - this.L0 = Layer0.getInstance(graph); - this.tgs = graph.getService(TransferableGraphSupport.class); - - this.support = graph.getService(SerialisationSupport.class); - this.ignoreVirtual = ignoreVirtual; - this.conf = conf; - - if(PROFILE) - startupTime -= System.nanoTime(); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - - ids = state.ids; - status = cs.createMap(ExtentStatus.class); - predicates = cs.createSet(); - exclusions = cs.createSet(); - exclusionFunction = conf.exclusionFunction; - fringe = new TreeSet(); - isRelatedToPredicates = cs.createMap(Boolean.class); - deadPredicates = cs.createSet(); - strongInverseSet = cs.createSet(); - - for(Map.Entry entry : conf.preStatus.entrySet()) { - status.put(entry.getKey(), entry.getValue()); - if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey()); - if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey()); - } - - if(PROFILE) - startupTime += System.nanoTime(); - -// for(RootSpec p : conf.roots) { -// if(p.internal) -// fringe.add(p.resource); -// } - - } - - public ResourceMap getStatus() { - return status; - } - - public Collection extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException { - - CollectionSupport cs = graph.getService(CollectionSupport.class); - Collection list = cs.createList(); - Iterator it = fringe.iterator(); - for(int i=0;i expand(ReadGraph graph) throws DatabaseException { - - if(PROFILE) - expandTime -= System.nanoTime(); - - Collection result = extractFromFringe(graph, 2<<12); - - if(PROFILE) - expandTime += System.nanoTime(); - - return result; - - } - - public void classifyPredicates(ReadGraph graph, final Set schedule, DomainProcessorState state) throws DatabaseException { - - for(Resource predicate : schedule) { - - Boolean isRelatedTo = Boolean.FALSE; - - Resource single = graph.getPossibleSuperrelation(predicate); - if(single != null) { - - Boolean singleIsRelatedTo = isRelatedToPredicates.get(single); - if(singleIsRelatedTo == null) { - singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo); - isRelatedToPredicates.put(single, singleIsRelatedTo); - } - - isRelatedTo = singleIsRelatedTo; - - } else { - - if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) { - isRelatedTo = Boolean.TRUE; - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate)); - } else { - if (!graph.hasStatement(predicate)) { - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate); - deadPredicates.add(predicate); - // Prevents ModelTransferableGraphSource from - // trying to export these statements. - state.inverses.remove(support.getTransientId(predicate)); - } - } - - } - - isRelatedToPredicates.put(predicate, isRelatedTo); - - } - - } - - public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection expansion) throws DatabaseException { - - if(PROFILE) - classifyPredicateTime -= System.nanoTime(); - - CollectionSupport cs = graph.getService(CollectionSupport.class); - final Set schedule = cs.createSet(); - final Map newPredicates = cs.createMap(Resource.class); - - for (DirectStatements stms : expansion) { - for(Statement stm : stms) { - - Resource predicate = stm.getPredicate(); - Resource object = stm.getObject(); - - if (exclusions.contains(object) || exclusions.contains(predicate)) - continue; - - if (exclusionFunction != null) { - ExclusionDecision decision = exclusionFunction.apply(stm); - if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) { - status.put(object, ExtentStatus.EXCLUDED); - exclusions.add(object); - continue; - } - } - - if(predicates.add(predicate)) { - Resource inverse = graph.getPossibleInverse(predicate); - schedule.add(predicate); - if(inverse != null) { - newPredicates.put(predicate, inverse); - if(predicates.add(inverse)) schedule.add(inverse); - state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse)); - state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate)); - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse)); - } else { - state.inverses.put(support.getTransientId(predicate), 0); - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate)); - } - - } - - } - } - - classifyPredicates(graph, schedule, state); - - for(Map.Entry entry : newPredicates.entrySet()) { - // Inverse is strong => this has strong inverse - Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue()); - if(isRelatedToValue) strongInverseSet.add(entry.getKey()); - Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey()); - if(isRelatedToKey) strongInverseSet.add(entry.getValue()); - } - - if(PROFILE) - classifyPredicateTime += System.nanoTime(); - - } - - /* - * Composed objects are internal. Mark them for expansion. - */ - - private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException { - Resource object = graph.getSingleObject(subject, L0.HasDataType); - return graph.syncRequest(new Value(object, datatypeBinding), TransientCacheListener.instance()); - } - - public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException { - final InputStream valueStream = tgs.getValueStream(graph, subject); - if (valueStream != null) { - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true)); - state.valueOutput.writeInt(sId); - - if (conf.values) { - Datatype dt = getDatatype(graph, subject); - - boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt); - long rawVariantSizePos = 0; - state.valueOutput.writeByte(canWriteRawVariant - ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE - : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE); - if (canWriteRawVariant) { - // Add space for raw variant byte size before the data - rawVariantSizePos = state.valueOutput.position(); - state.valueOutput.writeInt(0); - } - - byte[] typeBytes = bindings.get(dt); - if (typeBytes == null) { - typeBytes = datatypeSerializer.serialize(dt); - bindings.put(dt, typeBytes); - } - - state.valueOutput.write(typeBytes); - Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt)); - s.skip(new InputStream() { - @Override - public int read() throws IOException { - int value = valueStream.read(); - state.valueOutput.write(value); - return value; - } - }); - - if (canWriteRawVariant) { - long currentPos = state.valueOutput.position(); - int variantSize = (int)(currentPos - rawVariantSizePos - 4); - state.valueOutput.position(rawVariantSizePos); - state.valueOutput.writeInt(variantSize); - state.valueOutput.position(currentPos); - } - } - - state.valueCount++; - } - } - - private TIntArrayList stream = new TIntArrayList(); - - public void addToStream(Resource predicate, Resource object) throws DatabaseException { - stream.add(support.getTransientId(predicate)); - stream.add(support.getTransientId(object)); - } - - public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException { - - Resource predicate = stm.getPredicate(); - - Resource object = stm.getObject(); - - ExtentStatus objectStatus = status.get(object); - - // Strong predicate - Boolean isRelatedTo = isRelatedToPredicates.get(predicate); - if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) { - - if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object); - - addToStream(predicate, object); - - if(objectStatus == null || objectStatus == ExtentStatus.PENDING) { - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object)); - fringe.add(object); - } - - } else { - - // Dead predicate - if (deadPredicates.contains(predicate)) { - if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object); - return; - } - - // Weak predicate - if(objectStatus == ExtentStatus.EXCLUDED) { - - if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object); - - } else { - - // The inverse is also weak (or there is no inverse) - if(!strongInverseSet.contains(predicate)) { - - addToStream(predicate, object); - - if(objectStatus == null) { - status.put(object, ExtentStatus.PENDING); - } - - if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object); - - } else { - - if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object); - - } - - } - - } - - } - - public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { - if(!stream.isEmpty()) { - state.statementsOutput.writeInt(sId); - int streamSize = stream.size(); - int statementCount = stream.size() / 2; - state.statementsOutput.writeInt(statementCount); - for (int i = 0; i < streamSize; i++) - state.statementsOutput.writeInt(stream.getQuick(i)); - state.statementCount += 2*streamSize; - stream.resetQuick(); - } - } - - // For progress monitor book-keeping - private long internalResourceNumber = 0; - private long startTime = 0; - private long lastUpdateTime = 0; - - public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { - - internalResourceNumber++; - - // Update progress monitor with controlled frequency - long t = System.nanoTime(); - long dt = t - lastUpdateTime; - if (dt > 200_000_000L) { - if (startTime == 0) - startTime = t; - lastUpdateTime = t; - double totalTime = (t - startTime) * 1e-9; - if (totalTime > 0) { - long speed = Math.round((double)internalResourceNumber / totalTime); - state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)"); - } - } - - status.put(subject, ExtentStatus.INTERNAL); - if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true)); - - int sId = support.getTransientId(subject); - - if(PROFILE) - valueOutputTime -= System.nanoTime(); - - processValue(graph, subject, sId, state); - - if(PROFILE) - valueOutputTime += System.nanoTime(); - - if(PROFILE) - statementOutputTime -= System.nanoTime(); - - for(Statement stm : stms) { - processStatement(graph, subject, stm); - } - - flushStatementStream(sId, state); - - if(PROFILE) - statementOutputTime += System.nanoTime(); - - // Logarithmic progress monitor for unknown amount of work. - state.monitor.setWorkRemaining(100000); - state.monitor.worked(1); - } - - public void processFringe(ReadGraph graph, Collection expansion, final DomainProcessorState state) throws DatabaseException, IOException { - - if(PROFILE) - processFringeTime -= System.nanoTime(); - - for (DirectStatements stms : expansion) { - - Resource subject = stms.getSubject(); - - boolean partOf = false; - for(Statement stm : stms) { - Resource predicate = stm.getPredicate(); - if(L0.PartOf.equals(predicate)) { - partOf = true; - break; - } - } - - ExtentStatus subjectStatus = status.get(subject); - if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); - if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue; - if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) { - - status.put(subject, ExtentStatus.EXTERNAL); - if(ModelTransferableGraphSourceRequest.LOG) { - String uri = graph.getPossibleURI(subject); - if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject); - else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri); - } - - } else { - - processInternal(graph, subject, stms, state); - - } - - } - - if(PROFILE) - processFringeTime += System.nanoTime(); - - } - - public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException { - - try { - - this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); - this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class); - this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding); - - for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) { - if (status.put(r, ExtentStatus.INTERNAL) == null) { - if(ModelTransferableGraphSourceRequest.LOG) { - String URI = graph.getPossibleURI(r); - if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI); - else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r); - } - fringe.add(r); - } - } - - if (state.monitor.isCanceled()) - throw new CancelTransactionException(); - - while(!fringe.isEmpty()) { - - Collection expansion = expand(graph); - classifyPredicates(graph, state, expansion); - processFringe(graph, expansion, state); - - if (state.monitor.isCanceled()) - throw new CancelTransactionException(); - } - - if (ModelTransferableGraphSourceRequest.PROFILE) { - System.out.println(composedObjectCounter + " " + fastInternalCounter - + " " + parentExternalCounter + " " - + fullExternalCounter + " " + fullInternalCounter); - } - - } catch (IOException e) { - throw new DatabaseException(e); - } - - } - - void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException { - if(ModelTransferableGraphSourceRequest.LOG) { - SerialisationSupport support = graph.getService(SerialisationSupport.class); - String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId)); - String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId)); - String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId)); - ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); - } - } - - void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException { - if(ModelTransferableGraphSourceRequest.LOG) { - String s = NameUtils.getURIOrSafeNameInternal(graph, sId); - String p = NameUtils.getURIOrSafeNameInternal(graph, pId); - String o = NameUtils.getURIOrSafeNameInternal(graph, oId); - ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); - } - } - +package org.simantics.db.layer0.util; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Databoard; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Datatype; +import org.simantics.db.DirectStatements; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.ResourceMap; +import org.simantics.db.Statement; +import org.simantics.db.common.primitiverequest.Value; +import org.simantics.db.common.procedure.adapter.TransientCacheListener; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; +import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry; +import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.TransferableGraphSupport; +import org.simantics.graph.db.TransferableGraphSource; +import org.simantics.layer0.Layer0; +import org.simantics.scl.runtime.function.Function1; +import org.simantics.utils.datastructures.Pair; + +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.map.hash.TIntIntHashMap; + +public class DomainProcessor3 { + + public enum ExclusionDecision { + INCLUDE, EXCLUDE_OBJECT + } + + final static private boolean PROFILE = false; + + Serializer variantSerializer; + Serializer datatypeSerializer; + Binding datatypeBinding; + boolean ignoreVirtual; + + int id = 0; + + Set fringe = null; + Set exclusions = null; + Function1 exclusionFunction = null; + Set predicates = null; + Map isRelatedToPredicates = null; + Set deadPredicates = null; + Set strongInverseSet = null; + + TIntIntHashMap ids = null; + ResourceMap status = null; + Map bindings = new HashMap(); + final SerialisationSupport support; + final TransferableGraphConfiguration2 conf; + final TransferableGraphSupport tgs; + + private Layer0 L0; + + private long composedObjectCounter = 0; + private long fastInternalCounter = 0; + private long parentExternalCounter = 0; + private long fullInternalCounter = 0; + private long fullExternalCounter = 0; + + long startupTime = 0; + long expandTime = 0; + long fullResolveTime = 0; + long fastResolveTime = 0; + long otherStatementTime = 0; + long parentResolveTime = 0; + long extentSeedTime = 0; + long classifyPredicateTime = 0; + long processFringeTime = 0; + long valueOutputTime = 0; + long statementOutputTime = 0; + + public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { + + this.L0 = Layer0.getInstance(graph); + this.tgs = graph.getService(TransferableGraphSupport.class); + + this.support = graph.getService(SerialisationSupport.class); + this.ignoreVirtual = ignoreVirtual; + this.conf = conf; + + if(PROFILE) + startupTime -= System.nanoTime(); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + ids = state.ids; + status = cs.createMap(ExtentStatus.class); + predicates = cs.createSet(); + exclusions = cs.createSet(); + exclusionFunction = conf.exclusionFunction; + fringe = new TreeSet(); + isRelatedToPredicates = cs.createMap(Boolean.class); + deadPredicates = cs.createSet(); + strongInverseSet = cs.createSet(); + + for(Map.Entry entry : conf.preStatus.entrySet()) { + status.put(entry.getKey(), entry.getValue()); + if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey()); + if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey()); + } + + if(PROFILE) + startupTime += System.nanoTime(); + +// for(RootSpec p : conf.roots) { +// if(p.internal) +// fringe.add(p.resource); +// } + + } + + public ResourceMap getStatus() { + return status; + } + + public Collection extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException { + + CollectionSupport cs = graph.getService(CollectionSupport.class); + Collection list = cs.createList(); + Iterator it = fringe.iterator(); + for(int i=0;i expand(ReadGraph graph) throws DatabaseException { + + if(PROFILE) + expandTime -= System.nanoTime(); + + Collection result = extractFromFringe(graph, 2<<12); + + if(PROFILE) + expandTime += System.nanoTime(); + + return result; + + } + + public void classifyPredicates(ReadGraph graph, final Set schedule, DomainProcessorState state) throws DatabaseException { + + for(Resource predicate : schedule) { + + Boolean isRelatedTo = Boolean.FALSE; + + Resource single = graph.getPossibleSuperrelation(predicate); + if(single != null) { + + Boolean singleIsRelatedTo = isRelatedToPredicates.get(single); + if(singleIsRelatedTo == null) { + singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo); + isRelatedToPredicates.put(single, singleIsRelatedTo); + } + + isRelatedTo = singleIsRelatedTo; + + } else { + + if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) { + isRelatedTo = Boolean.TRUE; + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate)); + } else { + if (!graph.hasStatement(predicate)) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate); + deadPredicates.add(predicate); + // Prevents ModelTransferableGraphSource from + // trying to export these statements. + state.inverses.remove(support.getTransientId(predicate)); + } + } + + } + + isRelatedToPredicates.put(predicate, isRelatedTo); + + } + + } + + public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection expansion) throws DatabaseException { + + if(PROFILE) + classifyPredicateTime -= System.nanoTime(); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + final Set schedule = cs.createSet(); + final Map newPredicates = cs.createMap(Resource.class); + + for (DirectStatements stms : expansion) { + for(Statement stm : stms) { + + Resource predicate = stm.getPredicate(); + Resource object = stm.getObject(); + + if (exclusions.contains(object) || exclusions.contains(predicate)) + continue; + + if (exclusionFunction != null) { + ExclusionDecision decision = exclusionFunction.apply(stm); + if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) { + status.put(object, ExtentStatus.EXCLUDED); + exclusions.add(object); + continue; + } + } + + if(predicates.add(predicate)) { + Resource inverse = graph.getPossibleInverse(predicate); + schedule.add(predicate); + if(inverse != null) { + newPredicates.put(predicate, inverse); + if(predicates.add(inverse)) schedule.add(inverse); + state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse)); + state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate)); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse)); + } else { + state.inverses.put(support.getTransientId(predicate), 0); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate)); + } + + } + + } + } + + classifyPredicates(graph, schedule, state); + + for(Map.Entry entry : newPredicates.entrySet()) { + // Inverse is strong => this has strong inverse + Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue()); + if(isRelatedToValue) strongInverseSet.add(entry.getKey()); + Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey()); + if(isRelatedToKey) strongInverseSet.add(entry.getValue()); + } + + if(PROFILE) + classifyPredicateTime += System.nanoTime(); + + } + + /* + * Composed objects are internal. Mark them for expansion. + */ + + private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException { + Resource object = graph.getSingleObject(subject, L0.HasDataType); + return graph.syncRequest(new Value(object, datatypeBinding), TransientCacheListener.instance()); + } + + public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException { + final InputStream valueStream = tgs.getValueStream(graph, subject); + if (valueStream != null) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true)); + state.valueOutput.writeInt(sId); + + if (conf.values) { + Datatype dt = getDatatype(graph, subject); + + boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt); + long rawVariantSizePos = 0; + state.valueOutput.writeByte(canWriteRawVariant + ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE + : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE); + if (canWriteRawVariant) { + // Add space for raw variant byte size before the data + rawVariantSizePos = state.valueOutput.position(); + state.valueOutput.writeInt(0); + } + + byte[] typeBytes = bindings.get(dt); + if (typeBytes == null) { + typeBytes = datatypeSerializer.serialize(dt); + bindings.put(dt, typeBytes); + } + + state.valueOutput.write(typeBytes); + Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt)); + s.skip(new InputStream() { + @Override + public int read() throws IOException { + int value = valueStream.read(); + state.valueOutput.write(value); + return value; + } + }); + + if (canWriteRawVariant) { + long currentPos = state.valueOutput.position(); + int variantSize = (int)(currentPos - rawVariantSizePos - 4); + state.valueOutput.position(rawVariantSizePos); + state.valueOutput.writeInt(variantSize); + state.valueOutput.position(currentPos); + } + } + + state.valueCount++; + } + } + + private TIntArrayList stream = new TIntArrayList(); + + public void addToStream(Resource predicate, Resource object) throws DatabaseException { + stream.add(support.getTransientId(predicate)); + stream.add(support.getTransientId(object)); + } + + public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException { + + Resource predicate = stm.getPredicate(); + + Resource object = stm.getObject(); + + ExtentStatus objectStatus = status.get(object); + + // Strong predicate + Boolean isRelatedTo = isRelatedToPredicates.get(predicate); + if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object); + + addToStream(predicate, object); + + if(objectStatus == null || objectStatus == ExtentStatus.PENDING) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object)); + fringe.add(object); + } + + } else { + + // Dead predicate + if (deadPredicates.contains(predicate)) { + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object); + return; + } + + // Weak predicate + if(objectStatus == ExtentStatus.EXCLUDED) { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object); + + } else { + + // The inverse is also weak (or there is no inverse) + if(!strongInverseSet.contains(predicate)) { + + addToStream(predicate, object); + + if(objectStatus == null) { + status.put(object, ExtentStatus.PENDING); + } + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object); + + } else { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object); + + } + + } + + } + + } + + public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { + if(!stream.isEmpty()) { + state.statementsOutput.writeInt(sId); + int streamSize = stream.size(); + int statementCount = stream.size() / 2; + state.statementsOutput.writeInt(statementCount); + for (int i = 0; i < streamSize; i++) + state.statementsOutput.writeInt(stream.getQuick(i)); + state.statementCount += 2*streamSize; + stream.resetQuick(); + } + } + + // For progress monitor book-keeping + private long internalResourceNumber = 0; + private long startTime = 0; + private long lastUpdateTime = 0; + + public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { + + internalResourceNumber++; + + // Update progress monitor with controlled frequency + long t = System.nanoTime(); + long dt = t - lastUpdateTime; + if (dt > 200_000_000L) { + if (startTime == 0) + startTime = t; + lastUpdateTime = t; + double totalTime = (t - startTime) * 1e-9; + if (totalTime > 0) { + long speed = Math.round((double)internalResourceNumber / totalTime); + state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)"); + } + } + + status.put(subject, ExtentStatus.INTERNAL); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true)); + + int sId = support.getTransientId(subject); + + if(PROFILE) + valueOutputTime -= System.nanoTime(); + + processValue(graph, subject, sId, state); + + if(PROFILE) + valueOutputTime += System.nanoTime(); + + if(PROFILE) + statementOutputTime -= System.nanoTime(); + + for(Statement stm : stms) { + processStatement(graph, subject, stm); + } + + flushStatementStream(sId, state); + + if(PROFILE) + statementOutputTime += System.nanoTime(); + + // Logarithmic progress monitor for unknown amount of work. + state.monitor.setWorkRemaining(100000); + state.monitor.worked(1); + } + + public void processFringe(ReadGraph graph, Collection expansion, final DomainProcessorState state) throws DatabaseException, IOException { + + if(PROFILE) + processFringeTime -= System.nanoTime(); + + for (DirectStatements stms : expansion) { + + Resource subject = stms.getSubject(); + + boolean partOf = false; + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if(L0.PartOf.equals(predicate)) { + partOf = true; + break; + } + } + + ExtentStatus subjectStatus = status.get(subject); + if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); + if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue; + if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) { + + status.put(subject, ExtentStatus.EXTERNAL); + if(ModelTransferableGraphSourceRequest.LOG) { + String uri = graph.getPossibleURI(subject); + if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject); + else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri); + } + + } else { + + processInternal(graph, subject, stms, state); + + } + + } + + if(PROFILE) + processFringeTime += System.nanoTime(); + + } + + public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException { + + try { + + this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); + this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class); + this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding); + + Pair,Set> pair = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual); + state.internalEntries = pair.first; + + for(InternalEntry entry : state.internalEntries) { + Resource r = entry.resource; + if (status.put(r, ExtentStatus.INTERNAL) == null) { + if(ModelTransferableGraphSourceRequest.LOG) { + String URI = graph.getPossibleURI(r); + if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI); + else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r); + } + fringe.add(r); + } + } + + for(Resource unnamedChild : pair.second) { + if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) { + fringe.add(unnamedChild); + } + } + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + + while(!fringe.isEmpty()) { + + Collection expansion = expand(graph); + classifyPredicates(graph, state, expansion); + processFringe(graph, expansion, state); + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + } + + if (ModelTransferableGraphSourceRequest.PROFILE) { + System.out.println(composedObjectCounter + " " + fastInternalCounter + + " " + parentExternalCounter + " " + + fullExternalCounter + " " + fullInternalCounter); + } + + } catch (IOException e) { + throw new DatabaseException(e); + } + + } + + void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException { + if(ModelTransferableGraphSourceRequest.LOG) { + SerialisationSupport support = graph.getService(SerialisationSupport.class); + String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId)); + String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId)); + String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId)); + ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); + } + } + + void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException { + if(ModelTransferableGraphSourceRequest.LOG) { + String s = NameUtils.getURIOrSafeNameInternal(graph, sId); + String p = NameUtils.getURIOrSafeNameInternal(graph, pId); + String o = NameUtils.getURIOrSafeNameInternal(graph, oId); + ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); + } + } + } \ No newline at end of file