package org.simantics.db.layer0.util; import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeSet; import org.simantics.databoard.Bindings; import org.simantics.databoard.Databoard; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.db.DirectStatements; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceMap; import org.simantics.db.Statement; import org.simantics.db.common.primitiverequest.Value; import org.simantics.db.common.procedure.adapter.TransientCacheListener; import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.db.service.TransferableGraphSupport; import org.simantics.graph.db.TransferableGraphSource; import org.simantics.layer0.Layer0; import org.simantics.scl.runtime.function.Function1; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; public class DomainProcessor3 { public enum ExclusionDecision { INCLUDE, EXCLUDE_OBJECT } final static private boolean PROFILE = false; Serializer variantSerializer; Serializer datatypeSerializer; Binding datatypeBinding; boolean ignoreVirtual; int id = 0; Set fringe = null; Set exclusions = null; Function1 exclusionFunction = null; Set predicates = null; Map isRelatedToPredicates = null; Set deadPredicates = null; Set strongInverseSet = null; TIntIntHashMap ids = null; ResourceMap status = null; Map bindings = new HashMap(); final SerialisationSupport support; final TransferableGraphConfiguration2 conf; final TransferableGraphSupport tgs; private Layer0 L0; private long composedObjectCounter = 0; private long fastInternalCounter = 0; private long parentExternalCounter = 0; private long fullInternalCounter = 0; private long fullExternalCounter = 0; long startupTime = 0; long expandTime = 0; long fullResolveTime = 0; long fastResolveTime = 0; long otherStatementTime = 0; long parentResolveTime = 0; long extentSeedTime = 0; long classifyPredicateTime = 0; long processFringeTime = 0; long valueOutputTime = 0; long statementOutputTime = 0; public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { this.L0 = Layer0.getInstance(graph); this.tgs = graph.getService(TransferableGraphSupport.class); this.support = graph.getService(SerialisationSupport.class); this.ignoreVirtual = ignoreVirtual; this.conf = conf; if(PROFILE) startupTime -= System.nanoTime(); CollectionSupport cs = graph.getService(CollectionSupport.class); ids = state.ids; status = cs.createMap(ExtentStatus.class); predicates = cs.createSet(); exclusions = cs.createSet(); exclusionFunction = conf.exclusionFunction; fringe = new TreeSet(); isRelatedToPredicates = cs.createMap(Boolean.class); deadPredicates = cs.createSet(); strongInverseSet = cs.createSet(); for(Map.Entry entry : conf.preStatus.entrySet()) { status.put(entry.getKey(), entry.getValue()); if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey()); if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey()); } if(PROFILE) startupTime += System.nanoTime(); // for(RootSpec p : conf.roots) { // if(p.internal) // fringe.add(p.resource); // } } public ResourceMap getStatus() { return status; } public Collection extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException { CollectionSupport cs = graph.getService(CollectionSupport.class); Collection list = cs.createList(); Iterator it = fringe.iterator(); for(int i=0;i expand(ReadGraph graph) throws DatabaseException { if(PROFILE) expandTime -= System.nanoTime(); Collection result = extractFromFringe(graph, 2<<12); if(PROFILE) expandTime += System.nanoTime(); return result; } public void classifyPredicates(ReadGraph graph, final Set schedule, DomainProcessorState state) throws DatabaseException { for(Resource predicate : schedule) { Boolean isRelatedTo = Boolean.FALSE; Resource single = graph.getPossibleSuperrelation(predicate); if(single != null) { Boolean singleIsRelatedTo = isRelatedToPredicates.get(single); if(singleIsRelatedTo == null) { singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo); isRelatedToPredicates.put(single, singleIsRelatedTo); } isRelatedTo = singleIsRelatedTo; } else { if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) { isRelatedTo = Boolean.TRUE; if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate)); } else { if (!graph.hasStatement(predicate)) { if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate); deadPredicates.add(predicate); // Prevents ModelTransferableGraphSource from // trying to export these statements. state.inverses.remove(support.getTransientId(predicate)); } } } isRelatedToPredicates.put(predicate, isRelatedTo); } } public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection expansion) throws DatabaseException { if(PROFILE) classifyPredicateTime -= System.nanoTime(); CollectionSupport cs = graph.getService(CollectionSupport.class); final Set schedule = cs.createSet(); final Map newPredicates = cs.createMap(Resource.class); for (DirectStatements stms : expansion) { for(Statement stm : stms) { Resource predicate = stm.getPredicate(); Resource object = stm.getObject(); if (exclusions.contains(object) || exclusions.contains(predicate)) continue; if (exclusionFunction != null) { ExclusionDecision decision = exclusionFunction.apply(stm); if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) { status.put(object, ExtentStatus.EXCLUDED); exclusions.add(object); continue; } } if(predicates.add(predicate)) { Resource inverse = graph.getPossibleInverse(predicate); schedule.add(predicate); if(inverse != null) { newPredicates.put(predicate, inverse); if(predicates.add(inverse)) schedule.add(inverse); state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse)); state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate)); if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse)); } else { state.inverses.put(support.getTransientId(predicate), 0); if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate)); } } } } classifyPredicates(graph, schedule, state); for(Map.Entry entry : newPredicates.entrySet()) { // Inverse is strong => this has strong inverse Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue()); if(isRelatedToValue) strongInverseSet.add(entry.getKey()); Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey()); if(isRelatedToKey) strongInverseSet.add(entry.getValue()); } if(PROFILE) classifyPredicateTime += System.nanoTime(); } /* * Composed objects are internal. Mark them for expansion. */ private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException { Resource object = graph.getSingleObject(subject, L0.HasDataType); return graph.syncRequest(new Value(object, datatypeBinding), TransientCacheListener.instance()); } public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException { final InputStream valueStream = tgs.getValueStream(graph, subject); if (valueStream != null) { if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true)); state.valueOutput.writeInt(sId); if (conf.values) { Datatype dt = getDatatype(graph, subject); boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt); long rawVariantSizePos = 0; state.valueOutput.writeByte(canWriteRawVariant ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE); if (canWriteRawVariant) { // Add space for raw variant byte size before the data rawVariantSizePos = state.valueOutput.position(); state.valueOutput.writeInt(0); } byte[] typeBytes = bindings.get(dt); if (typeBytes == null) { typeBytes = datatypeSerializer.serialize(dt); bindings.put(dt, typeBytes); } state.valueOutput.write(typeBytes); Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt)); s.skip(new InputStream() { @Override public int read() throws IOException { int value = valueStream.read(); state.valueOutput.write(value); return value; } }); if (canWriteRawVariant) { long currentPos = state.valueOutput.position(); int variantSize = (int)(currentPos - rawVariantSizePos - 4); state.valueOutput.position(rawVariantSizePos); state.valueOutput.writeInt(variantSize); state.valueOutput.position(currentPos); } } state.valueCount++; } } private TIntArrayList stream = new TIntArrayList(); public void addToStream(Resource predicate, Resource object) throws DatabaseException { stream.add(support.getTransientId(predicate)); stream.add(support.getTransientId(object)); } public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException { Resource predicate = stm.getPredicate(); Resource object = stm.getObject(); ExtentStatus objectStatus = status.get(object); // Strong predicate Boolean isRelatedTo = isRelatedToPredicates.get(predicate); if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) { if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object); addToStream(predicate, object); if(objectStatus == null || objectStatus == ExtentStatus.PENDING) { if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object)); fringe.add(object); } } else { // Dead predicate if (deadPredicates.contains(predicate)) { if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object); return; } // Weak predicate if(objectStatus == ExtentStatus.EXCLUDED) { if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object); } else { // The inverse is also weak (or there is no inverse) if(!strongInverseSet.contains(predicate)) { addToStream(predicate, object); if(objectStatus == null) { status.put(object, ExtentStatus.PENDING); } if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object); } else { if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object); } } } } public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { if(!stream.isEmpty()) { state.statementsOutput.writeInt(sId); int streamSize = stream.size(); int statementCount = stream.size() / 2; state.statementsOutput.writeInt(statementCount); for (int i = 0; i < streamSize; i++) state.statementsOutput.writeInt(stream.getQuick(i)); state.statementCount += 2*streamSize; stream.resetQuick(); } } // For progress monitor book-keeping private long internalResourceNumber = 0; private long startTime = 0; private long lastUpdateTime = 0; public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { internalResourceNumber++; // Update progress monitor with controlled frequency long t = System.nanoTime(); long dt = t - lastUpdateTime; if (dt > 200_000_000L) { if (startTime == 0) startTime = t; lastUpdateTime = t; double totalTime = (t - startTime) * 1e-9; if (totalTime > 0) { long speed = Math.round((double)internalResourceNumber / totalTime); state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)"); } } status.put(subject, ExtentStatus.INTERNAL); if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true)); int sId = support.getTransientId(subject); if(PROFILE) valueOutputTime -= System.nanoTime(); processValue(graph, subject, sId, state); if(PROFILE) valueOutputTime += System.nanoTime(); if(PROFILE) statementOutputTime -= System.nanoTime(); for(Statement stm : stms) { processStatement(graph, subject, stm); } flushStatementStream(sId, state); if(PROFILE) statementOutputTime += System.nanoTime(); // Logarithmic progress monitor for unknown amount of work. state.monitor.setWorkRemaining(100000); state.monitor.worked(1); } public void processFringe(ReadGraph graph, Collection expansion, final DomainProcessorState state) throws DatabaseException, IOException { if(PROFILE) processFringeTime -= System.nanoTime(); for (DirectStatements stms : expansion) { Resource subject = stms.getSubject(); boolean partOf = false; for(Statement stm : stms) { Resource predicate = stm.getPredicate(); if(L0.PartOf.equals(predicate)) { partOf = true; break; } } ExtentStatus subjectStatus = status.get(subject); if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus); if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue; if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) { status.put(subject, ExtentStatus.EXTERNAL); if(ModelTransferableGraphSourceRequest.LOG) { String uri = graph.getPossibleURI(subject); if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject); else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri); } } else { processInternal(graph, subject, stms, state); } } if(PROFILE) processFringeTime += System.nanoTime(); } public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException { try { this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT); this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class); this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding); for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) { if (status.put(r, ExtentStatus.INTERNAL) == null) { if(ModelTransferableGraphSourceRequest.LOG) { String URI = graph.getPossibleURI(r); if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI); else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r); } fringe.add(r); } } if (state.monitor.isCanceled()) throw new CancelTransactionException(); while(!fringe.isEmpty()) { Collection expansion = expand(graph); classifyPredicates(graph, state, expansion); processFringe(graph, expansion, state); if (state.monitor.isCanceled()) throw new CancelTransactionException(); } if (ModelTransferableGraphSourceRequest.PROFILE) { System.out.println(composedObjectCounter + " " + fastInternalCounter + " " + parentExternalCounter + " " + fullExternalCounter + " " + fullInternalCounter); } } catch (IOException e) { throw new DatabaseException(e); } } void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException { if(ModelTransferableGraphSourceRequest.LOG) { SerialisationSupport support = graph.getService(SerialisationSupport.class); String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId)); String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId)); String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId)); ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); } } void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException { if(ModelTransferableGraphSourceRequest.LOG) { String s = NameUtils.getURIOrSafeNameInternal(graph, sId); String p = NameUtils.getURIOrSafeNameInternal(graph, pId); String o = NameUtils.getURIOrSafeNameInternal(graph, oId); ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o); } } }