X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.layer0%2Fsrc%2Forg%2Fsimantics%2Fdb%2Flayer0%2Futil%2FDomainProcessor3.java;fp=bundles%2Forg.simantics.db.layer0%2Fsrc%2Forg%2Fsimantics%2Fdb%2Flayer0%2Futil%2FDomainProcessor3.java;h=fb4f04dee0bd6667546a9a4ed539319f9398b915;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java new file mode 100644 index 000000000..fb4f04dee --- /dev/null +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java @@ -0,0 +1,560 @@ +package org.simantics.db.layer0.util; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Databoard; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Datatype; +import org.simantics.db.DirectStatements; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.ResourceMap; +import org.simantics.db.Statement; +import org.simantics.db.common.primitiverequest.Value; +import org.simantics.db.common.procedure.adapter.TransientCacheListener; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; +import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.TransferableGraphSupport; +import org.simantics.graph.db.TransferableGraphSource; +import org.simantics.layer0.Layer0; +import org.simantics.scl.runtime.function.Function1; + +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.map.hash.TIntIntHashMap; + +public class DomainProcessor3 { + + public enum ExclusionDecision { + INCLUDE, EXCLUDE_OBJECT + } + + final static private boolean PROFILE = false; + + Serializer variantSerializer; + Serializer datatypeSerializer; + Binding datatypeBinding; + boolean ignoreVirtual; + + int id = 0; + + Set fringe = null; + Set exclusions = null; + Function1 exclusionFunction = null; + Set predicates = null; + Map isRelatedToPredicates = null; + Set deadPredicates = null; + Set strongInverseSet = null; + + TIntIntHashMap ids = null; + ResourceMap status = null; + Map bindings = new HashMap(); + final SerialisationSupport support; + final TransferableGraphConfiguration2 conf; + final TransferableGraphSupport tgs; + + private Layer0 L0; + + private long composedObjectCounter = 0; + private long fastInternalCounter = 0; + private long parentExternalCounter = 0; + private long fullInternalCounter = 0; + private long fullExternalCounter = 0; + + long startupTime = 0; + long expandTime = 0; + long fullResolveTime = 0; + long fastResolveTime = 0; + long otherStatementTime = 0; + long parentResolveTime = 0; + long extentSeedTime = 0; + long classifyPredicateTime = 0; + long processFringeTime = 0; + long valueOutputTime = 0; + long statementOutputTime = 0; + + public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { + + this.L0 = Layer0.getInstance(graph); + this.tgs = graph.getService(TransferableGraphSupport.class); + + this.support = graph.getService(SerialisationSupport.class); + this.ignoreVirtual = ignoreVirtual; + this.conf = conf; + + if(PROFILE) + startupTime -= System.nanoTime(); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + + ids = state.ids; + status = cs.createMap(ExtentStatus.class); + predicates = cs.createSet(); + exclusions = cs.createSet(); + exclusionFunction = conf.exclusionFunction; + fringe = new TreeSet(); + isRelatedToPredicates = cs.createMap(Boolean.class); + deadPredicates = cs.createSet(); + strongInverseSet = cs.createSet(); + + for(Map.Entry entry : conf.preStatus.entrySet()) { + status.put(entry.getKey(), entry.getValue()); + if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey()); + if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey()); + } + + if(PROFILE) + startupTime += System.nanoTime(); + +// for(RootSpec p : conf.roots) { +// if(p.internal) +// fringe.add(p.resource); +// } + + } + + public ResourceMap getStatus() { + return status; + } + + public Collection extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException { + + CollectionSupport cs = graph.getService(CollectionSupport.class); + Collection list = cs.createList(); + Iterator it = fringe.iterator(); + for(int i=0;i expand(ReadGraph graph) throws DatabaseException { + + if(PROFILE) + expandTime -= System.nanoTime(); + + Collection result = extractFromFringe(graph, 2<<12); + + if(PROFILE) + expandTime += System.nanoTime(); + + return result; + + } + + public void classifyPredicates(ReadGraph graph, final Set schedule, DomainProcessorState state) throws DatabaseException { + + for(Resource predicate : schedule) { + + Boolean isRelatedTo = Boolean.FALSE; + + Resource single = graph.getPossibleSuperrelation(predicate); + if(single != null) { + + Boolean singleIsRelatedTo = isRelatedToPredicates.get(single); + if(singleIsRelatedTo == null) { + singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo); + isRelatedToPredicates.put(single, singleIsRelatedTo); + } + + isRelatedTo = singleIsRelatedTo; + + } else { + + if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) { + isRelatedTo = Boolean.TRUE; + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate)); + } else { + if (!graph.hasStatement(predicate)) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate); + deadPredicates.add(predicate); + // Prevents ModelTransferableGraphSource from + // trying to export these statements. + state.inverses.remove(support.getTransientId(predicate)); + } + } + + } + + isRelatedToPredicates.put(predicate, isRelatedTo); + + } + + } + + public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection expansion) throws DatabaseException { + + if(PROFILE) + classifyPredicateTime -= System.nanoTime(); + + CollectionSupport cs = graph.getService(CollectionSupport.class); + final Set schedule = cs.createSet(); + final Map newPredicates = cs.createMap(Resource.class); + + for (DirectStatements stms : expansion) { + for(Statement stm : stms) { + + Resource predicate = stm.getPredicate(); + Resource object = stm.getObject(); + + if (exclusions.contains(object) || exclusions.contains(predicate)) + continue; + + if (exclusionFunction != null) { + ExclusionDecision decision = exclusionFunction.apply(stm); + if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) { + status.put(object, ExtentStatus.EXCLUDED); + exclusions.add(object); + continue; + } + } + + if(predicates.add(predicate)) { + Resource inverse = graph.getPossibleInverse(predicate); + schedule.add(predicate); + if(inverse != null) { + newPredicates.put(predicate, inverse); + if(predicates.add(inverse)) schedule.add(inverse); + state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse)); + state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate)); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse)); + } else { + state.inverses.put(support.getTransientId(predicate), 0); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate)); + } + + } + + } + } + + classifyPredicates(graph, schedule, state); + + for(Map.Entry entry : newPredicates.entrySet()) { + // Inverse is strong => this has strong inverse + Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue()); + if(isRelatedToValue) strongInverseSet.add(entry.getKey()); + Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey()); + if(isRelatedToKey) strongInverseSet.add(entry.getValue()); + } + + if(PROFILE) + classifyPredicateTime += System.nanoTime(); + + } + + /* + * Composed objects are internal. Mark them for expansion. + */ + + private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException { + Resource object = graph.getSingleObject(subject, L0.HasDataType); + return graph.syncRequest(new Value(object, datatypeBinding), TransientCacheListener.instance()); + } + + public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException { + final InputStream valueStream = tgs.getValueStream(graph, subject); + if (valueStream != null) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true)); + state.valueOutput.writeInt(sId); + + if (conf.values) { + Datatype dt = getDatatype(graph, subject); + + boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt); + long rawVariantSizePos = 0; + state.valueOutput.writeByte(canWriteRawVariant + ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE + : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE); + if (canWriteRawVariant) { + // Add space for raw variant byte size before the data + rawVariantSizePos = state.valueOutput.position(); + state.valueOutput.writeInt(0); + } + + byte[] typeBytes = bindings.get(dt); + if (typeBytes == null) { + typeBytes = datatypeSerializer.serialize(dt); + bindings.put(dt, typeBytes); + } + + state.valueOutput.write(typeBytes); + Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt)); + s.skip(new InputStream() { + @Override + public int read() throws IOException { + int value = valueStream.read(); + state.valueOutput.write(value); + return value; + } + }); + + if (canWriteRawVariant) { + long currentPos = state.valueOutput.position(); + int variantSize = (int)(currentPos - rawVariantSizePos - 4); + state.valueOutput.position(rawVariantSizePos); + state.valueOutput.writeInt(variantSize); + state.valueOutput.position(currentPos); + } + } + + state.valueCount++; + } + } + + private TIntArrayList stream = new TIntArrayList(); + + public void addToStream(Resource predicate, Resource object) throws DatabaseException { + stream.add(support.getTransientId(predicate)); + stream.add(support.getTransientId(object)); + } + + public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException { + + Resource predicate = stm.getPredicate(); + + Resource object = stm.getObject(); + + ExtentStatus objectStatus = status.get(object); + + // Strong predicate + Boolean isRelatedTo = isRelatedToPredicates.get(predicate); + if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object); + + addToStream(predicate, object); + + if(objectStatus == null || objectStatus == ExtentStatus.PENDING) { + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object)); + fringe.add(object); + } + + } else { + + // Dead predicate + if (deadPredicates.contains(predicate)) { + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object); + return; + } + + // Weak predicate + if(objectStatus == ExtentStatus.EXCLUDED) { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object); + + } else { + + // The inverse is also weak (or there is no inverse) + if(!strongInverseSet.contains(predicate)) { + + addToStream(predicate, object); + + if(objectStatus == null) { + status.put(object, ExtentStatus.PENDING); + } + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object); + + } else { + + if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object); + + } + + } + + } + + } + + public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { + if(!stream.isEmpty()) { + state.statementsOutput.writeInt(sId); + int streamSize = stream.size(); + int statementCount = stream.size() / 2; + state.statementsOutput.writeInt(statementCount); + for (int i = 0; i < streamSize; i++) + state.statementsOutput.writeInt(stream.getQuick(i)); + state.statementCount += 2*streamSize; + stream.resetQuick(); + } + } + + // For progress monitor book-keeping + private long internalResourceNumber = 0; + private long startTime = 0; + private long lastUpdateTime = 0; + + public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { + + internalResourceNumber++; + + // Update progress monitor with controlled frequency + long t = System.nanoTime(); + long dt = t - lastUpdateTime; + if (dt > 200_000_000L) { + if (startTime == 0) + startTime = t; + lastUpdateTime = t; + double totalTime = (t - startTime) * 1e-9; + if (totalTime > 0) { + long speed = Math.round((double)internalResourceNumber / totalTime); + state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)"); + } + } + + status.put(subject, ExtentStatus.INTERNAL); + if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true)); + + int sId = support.getTransientId(subject); + + if(PROFILE) + valueOutputTime -= System.nanoTime(); + + processValue(graph, subject, sId, state); + + if(PROFILE) + valueOutputTime += System.nanoTime(); + + if(PROFILE) + statementOutputTime -= System.nanoTime(); + + for(Statement stm : stms) { + processStatement(graph, subject, stm); + } + + flushStatementStream(sId, state); + + if(PROFILE) + statementOutputTime += System.nanoTime(); + + // Logarithmic progress monitor for unknown amount of work. + state.monitor.setWorkRemaining(100000); + state.monitor.worked(1); + } + + public void processFringe(ReadGraph graph, Collection expansion, final DomainProcessorState state) throws DatabaseException, IOException { + + if(PROFILE) + processFringeTime -= System.nanoTime(); + + for (DirectStatements stms : expansion) { + + Resource subject = stms.getSubject(); + + boolean partOf = false; + for(Statement stm : stms) { + Resource predicate = stm.getPredicate(); + if(L0.PartOf.equals(predicate)) { + partOf = true; + break; + } + } + + ExtentStatus subjectStatus = status.get(subject); + if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); + if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue; + if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) { + + status.put(subject, ExtentStatus.EXTERNAL); + if(ModelTransferableGraphSourceRequest.LOG) { + String uri = graph.getPossibleURI(subject); + if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject); + else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri); + } + + } else { + + processInternal(graph, subject, stms, state); + + } + + } + + if(PROFILE) + processFringeTime += System.nanoTime(); + + } + + public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException { + + try { + + this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); + this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class); + this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding); + + for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) { + if (status.put(r, ExtentStatus.INTERNAL) == null) { + if(ModelTransferableGraphSourceRequest.LOG) { + String URI = graph.getPossibleURI(r); + if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI); + else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r); + } + fringe.add(r); + } + } + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + + while(!fringe.isEmpty()) { + + Collection expansion = expand(graph); + classifyPredicates(graph, state, expansion); + processFringe(graph, expansion, state); + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + } + + if (ModelTransferableGraphSourceRequest.PROFILE) { + System.out.println(composedObjectCounter + " " + fastInternalCounter + + " " + parentExternalCounter + " " + + fullExternalCounter + " " + fullInternalCounter); + } + + } catch (IOException e) { + throw new DatabaseException(e); + } + + } + + void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException { + if(ModelTransferableGraphSourceRequest.LOG) { + SerialisationSupport support = graph.getService(SerialisationSupport.class); + String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId)); + String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId)); + String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId)); + ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); + } + } + + void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException { + if(ModelTransferableGraphSourceRequest.LOG) { + String s = NameUtils.getURIOrSafeNameInternal(graph, sId); + String p = NameUtils.getURIOrSafeNameInternal(graph, pId); + String o = NameUtils.getURIOrSafeNameInternal(graph, oId); + ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); + } + } + +} \ No newline at end of file