package org.simantics.db.layer0.util; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import org.simantics.databoard.Bindings; import org.simantics.databoard.Databoard; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.db.DirectStatements; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceMap; import org.simantics.db.Statement; import org.simantics.db.common.StandardStatement; import org.simantics.db.common.primitiverequest.Value; import org.simantics.db.common.procedure.adapter.TransientCacheListener; import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry; import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3; import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec; import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec.SeedSpecType; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.db.service.TransferableGraphSupport; import org.simantics.graph.db.TransferableGraphSource; import org.simantics.layer0.Layer0; import org.simantics.scl.runtime.function.Function1; import org.simantics.utils.datastructures.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; public class DomainProcessor3 { private static final Logger LOGGER = LoggerFactory.getLogger(DomainProcessor3.class); public enum ExclusionDecision { INCLUDE, EXCLUDE_OBJECT } final static private boolean PROFILE = false; Serializer variantSerializer; Serializer datatypeSerializer; Binding datatypeBinding; boolean ignoreVirtual; int id = 0; Set fringe = null; Function1 exclusionFunction = null; Set predicates = null; Map isRelatedToPredicates = null; Set deadPredicates = null; Set strongInverseSet = null; List unresolvedWeakLinks = new ArrayList<>(); TIntIntHashMap ids = null; ResourceMap status = null; Map bindings = new HashMap(); final SerialisationSupport support; final TransferableGraphConfiguration2 conf; final TransferableGraphSupport tgs; private Layer0 L0; private long composedObjectCounter = 0; private long fastInternalCounter = 0; private long parentExternalCounter = 0; private long fullInternalCounter = 0; private long fullExternalCounter = 0; long startupTime = 0; long expandTime = 0; long fullResolveTime = 0; long fastResolveTime = 0; long otherStatementTime = 0; long parentResolveTime = 0; long extentSeedTime = 0; long classifyPredicateTime = 0; long processFringeTime = 0; long valueOutputTime = 0; long statementOutputTime = 0; public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { this.L0 = Layer0.getInstance(graph); this.tgs = graph.getService(TransferableGraphSupport.class); this.support = graph.getService(SerialisationSupport.class); this.ignoreVirtual = ignoreVirtual; this.conf = conf; if(PROFILE) startupTime -= System.nanoTime(); CollectionSupport cs = graph.getService(CollectionSupport.class); ids = state.ids; status = cs.createMap(ExtentStatus.class); predicates = cs.createSet(); exclusionFunction = conf.exclusionFunction; fringe = new TreeSet(); isRelatedToPredicates = cs.createMap(Boolean.class); deadPredicates = cs.createSet(); strongInverseSet = cs.createSet(); if(LOGGER.isDebugEnabled()) { for(Map.Entry entry : conf.preStatus.entrySet()) { LOGGER.debug("prestatus: " + NameUtils.getSafeName(graph, entry.getKey()) + " " + entry.getValue()); } for(SeedSpec ss : conf.seeds) { LOGGER.debug("seed: " + NameUtils.getSafeName(graph, ss.resource) + " " + ss.name + " " + ss.specType + " " + ss.type); } } for(Map.Entry entry : conf.preStatus.entrySet()) { // INTERNAL prestatus shall be ignored. Domain processor will initialize statuses based on seeds. if (entry.getValue().equals(ExtentStatus.INTERNAL)) { LOGGER.info("Unexpected INTERNAL preStatus in DomainProcessor3 " + entry.getKey()); } else { status.put(entry.getKey(), entry.getValue()); } } for(SeedSpec ss : conf.seeds) { ExtentStatus pre = status.get(ss.resource); // INTERNAL seeds are feed into ConsistsOfProcess if(SeedSpecType.INTERNAL.equals(ss.specType)) { if(pre != null && !ExtentStatus.INTERNAL.equals(pre)) LOGGER.info("Internal seed preclassification problem, expected INTERNAL preclassification, got " + pre.name()); continue; } else if(SeedSpecType.ROOT.equals(ss.specType)) { // Non-internal resources are not reported as internals by ConsistsOfProcess so they are manually entered into fringe fringe.add(ss.resource); if(pre != null) LOGGER.info("Root preclassification problem, expected no preclassification, got " + pre.name()); // Roots are classified in status as INTERNAL status.put(ss.resource, ExtentStatus.INTERNAL); } else if(SeedSpecType.SPECIAL_ROOT.equals(ss.specType)) { // Special roots e.g. %model are marked as EXTERNAL if(pre != null && !ExtentStatus.EXTERNAL.equals(pre)) LOGGER.info("Special root preclassification problem, expected EXTERNAL preclassification, got " + pre.name()); status.put(ss.resource, ExtentStatus.EXTERNAL); } } if(PROFILE) startupTime += System.nanoTime(); } public ResourceMap getStatus() { return status; } public Collection extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException { CollectionSupport cs = graph.getService(CollectionSupport.class); Collection list = cs.createList(); Iterator it = fringe.iterator(); for(int i=0;i expand(ReadGraph graph) throws DatabaseException { if(PROFILE) expandTime -= System.nanoTime(); Collection result = extractFromFringe(graph, 2<<12); if(PROFILE) expandTime += System.nanoTime(); return result; } public void classifyPredicates(ReadGraph graph, final Set schedule, DomainProcessorState state) throws DatabaseException { for(Resource predicate : schedule) { Boolean isRelatedTo = Boolean.FALSE; Resource single = graph.getPossibleSuperrelation(predicate); if(single != null) { Boolean singleIsRelatedTo = isRelatedToPredicates.get(single); if(singleIsRelatedTo == null) { singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo); isRelatedToPredicates.put(single, singleIsRelatedTo); } isRelatedTo = singleIsRelatedTo; } else { if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) { isRelatedTo = Boolean.TRUE; if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate)); } else { if (!graph.hasStatement(predicate)) { if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate); deadPredicates.add(predicate); // Prevents ModelTransferableGraphSource from // trying to export these statements. state.inverses.remove(support.getTransientId(predicate)); } } } isRelatedToPredicates.put(predicate, isRelatedTo); } } public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection expansion) throws DatabaseException { if(PROFILE) classifyPredicateTime -= System.nanoTime(); CollectionSupport cs = graph.getService(CollectionSupport.class); final Set schedule = cs.createSet(); final Map newPredicates = cs.createMap(Resource.class); for (DirectStatements stms : expansion) { for(Statement stm : stms) { Resource predicate = stm.getPredicate(); Resource object = stm.getObject(); if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue; if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue; if (exclusionFunction != null) { ExclusionDecision decision = exclusionFunction.apply(stm); if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) { status.put(object, ExtentStatus.EXCLUDED); continue; } } if(predicates.add(predicate)) { Resource inverse = graph.getPossibleInverse(predicate); schedule.add(predicate); if(inverse != null) { newPredicates.put(predicate, inverse); if(predicates.add(inverse)) schedule.add(inverse); state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse)); state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate)); if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse)); } else { state.inverses.put(support.getTransientId(predicate), 0); if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate)); } } } } classifyPredicates(graph, schedule, state); for(Map.Entry entry : newPredicates.entrySet()) { // Inverse is strong => this has strong inverse Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue()); if(isRelatedToValue) strongInverseSet.add(entry.getKey()); Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey()); if(isRelatedToKey) strongInverseSet.add(entry.getValue()); } if(PROFILE) classifyPredicateTime += System.nanoTime(); } /* * Composed objects are internal. Mark them for expansion. */ private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException { Resource object = graph.getSingleObject(subject, L0.HasDataType); return graph.syncRequest(new Value(object, datatypeBinding), TransientCacheListener.instance()); } public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException { final InputStream valueStream = tgs.getValueStream(graph, subject); if (valueStream != null) { if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true)); state.valueOutput.writeInt(sId); if (conf.values) { Datatype dt = getDatatype(graph, subject); boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt); long rawVariantSizePos = 0; state.valueOutput.writeByte(canWriteRawVariant ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE); if (canWriteRawVariant) { // Add space for raw variant byte size before the data rawVariantSizePos = state.valueOutput.position(); state.valueOutput.writeInt(0); } byte[] typeBytes = bindings.get(dt); if (typeBytes == null) { typeBytes = datatypeSerializer.serialize(dt); bindings.put(dt, typeBytes); } state.valueOutput.write(typeBytes); Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt)); s.skip(new InputStream() { @Override public int read() throws IOException { int value = valueStream.read(); state.valueOutput.write(value); return value; } }); if (canWriteRawVariant) { long currentPos = state.valueOutput.position(); int variantSize = (int)(currentPos - rawVariantSizePos - 4); state.valueOutput.position(rawVariantSizePos); state.valueOutput.writeInt(variantSize); state.valueOutput.position(currentPos); } } state.valueCount++; } } private TIntArrayList stream = new TIntArrayList(); public void addToStream(Resource predicate, Resource object) throws DatabaseException { stream.add(support.getTransientId(predicate)); stream.add(support.getTransientId(object)); } public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException { Resource predicate = stm.getPredicate(); Resource object = stm.getObject(); ExtentStatus objectStatus = status.get(object); // Strong predicate Boolean isRelatedTo = isRelatedToPredicates.get(predicate); if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) { if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object); addToStream(predicate, object); if(objectStatus == null || objectStatus == ExtentStatus.PENDING) { if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object)); fringe.add(object); } } else { // Dead predicate if (deadPredicates.contains(predicate)) { if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object); return; } // Weak predicate if(objectStatus == ExtentStatus.EXCLUDED) { if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object); } else { // The inverse is also weak (or there is no inverse) if(!strongInverseSet.contains(predicate)) { unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object)); //addToStream(predicate, object); if(objectStatus == null) { status.put(object, ExtentStatus.PENDING); } if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object); } else { if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object); } } } } public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { if(!stream.isEmpty()) { state.statementsOutput.writeInt(sId); int streamSize = stream.size(); int statementCount = stream.size() / 2; state.statementsOutput.writeInt(statementCount); for (int i = 0; i < streamSize; i++) state.statementsOutput.writeInt(stream.getQuick(i)); state.statementCount += 2*streamSize; stream.resetQuick(); } } // For progress monitor book-keeping private long internalResourceNumber = 0; private long startTime = 0; private long lastUpdateTime = 0; public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { internalResourceNumber++; // Update progress monitor with controlled frequency long t = System.nanoTime(); long dt = t - lastUpdateTime; if (dt > 200_000_000L) { if (startTime == 0) startTime = t; lastUpdateTime = t; double totalTime = (t - startTime) * 1e-9; if (totalTime > 0) { long speed = Math.round((double)internalResourceNumber / totalTime); state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)"); } } status.put(subject, ExtentStatus.INTERNAL); if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true)); int sId = support.getTransientId(subject); if(PROFILE) valueOutputTime -= System.nanoTime(); processValue(graph, subject, sId, state); if(PROFILE) valueOutputTime += System.nanoTime(); if(PROFILE) statementOutputTime -= System.nanoTime(); for(Statement stm : stms) { processStatement(graph, subject, stm); } flushStatementStream(sId, state); if(PROFILE) statementOutputTime += System.nanoTime(); // Logarithmic progress monitor for unknown amount of work. state.monitor.setWorkRemaining(100000); state.monitor.worked(1); } public void processFringe(ReadGraph graph, Collection expansion, final DomainProcessorState state) throws DatabaseException, IOException { if(PROFILE) processFringeTime -= System.nanoTime(); for (DirectStatements stms : expansion) { Resource subject = stms.getSubject(); boolean partOf = false; for(Statement stm : stms) { Resource predicate = stm.getPredicate(); if(L0.PartOf.equals(predicate)) { partOf = true; break; } } ExtentStatus subjectStatus = status.get(subject); if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue; if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) { status.put(subject, ExtentStatus.EXTERNAL); if(ModelTransferableGraphSourceRequest.DEBUG) { String uri = graph.getPossibleURI(subject); if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject); else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri); } } else { processInternal(graph, subject, stms, state); } } if(PROFILE) processFringeTime += System.nanoTime(); } public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException { try { this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class); this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding); Pair,Set> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual); state.internalEntries = pair.first; for(ConsistsOfProcessEntry entry : state.internalEntries) { Resource r = entry.resource; if (status.put(r, ExtentStatus.INTERNAL) == null) { if(ModelTransferableGraphSourceRequest.DEBUG) { String URI = graph.getPossibleURI(r); if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI); else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r); } fringe.add(r); } } for(Resource unnamedChild : pair.second) { if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) { fringe.add(unnamedChild); } } if (state.monitor.isCanceled()) throw new CancelTransactionException(); while(!fringe.isEmpty()) { Collection expansion = expand(graph); classifyPredicates(graph, state, expansion); processFringe(graph, expansion, state); if (state.monitor.isCanceled()) throw new CancelTransactionException(); } if (ModelTransferableGraphSourceRequest.PROFILE) { System.out.println(composedObjectCounter + " " + fastInternalCounter + " " + parentExternalCounter + " " + fullExternalCounter + " " + fullInternalCounter); } } catch (IOException e) { throw new DatabaseException(e); } } void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException { if(ModelTransferableGraphSourceRequest.DEBUG) { SerialisationSupport support = graph.getService(SerialisationSupport.class); String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId)); String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId)); String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId)); ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); } } void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException { if(ModelTransferableGraphSourceRequest.DEBUG) { String s = NameUtils.getURIOrSafeNameInternal(graph, sId); String p = NameUtils.getURIOrSafeNameInternal(graph, pId); String o = NameUtils.getURIOrSafeNameInternal(graph, oId); ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); } } }