-package org.simantics.db.layer0.util;\r
-\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.util.Collection;\r
-import java.util.HashMap;\r
-import java.util.Iterator;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.TreeSet;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.Databoard;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.db.DirectStatements;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.ResourceMap;\r
-import org.simantics.db.Statement;\r
-import org.simantics.db.common.primitiverequest.Value;\r
-import org.simantics.db.common.procedure.adapter.TransientCacheListener;\r
-import org.simantics.db.common.utils.NameUtils;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;\r
-import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;\r
-import org.simantics.db.service.CollectionSupport;\r
-import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.db.service.TransferableGraphSupport;\r
-import org.simantics.graph.db.TransferableGraphSource;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.scl.runtime.function.Function1;\r
-\r
-import gnu.trove.list.array.TIntArrayList;\r
-import gnu.trove.map.hash.TIntIntHashMap;\r
-\r
-public class DomainProcessor3 {\r
- \r
- public enum ExclusionDecision {\r
- INCLUDE, EXCLUDE_OBJECT\r
- }\r
-\r
- final static private boolean PROFILE = false;\r
-\r
- Serializer variantSerializer;\r
- Serializer datatypeSerializer;\r
- Binding datatypeBinding;\r
- boolean ignoreVirtual;\r
-\r
- int id = 0;\r
-\r
- Set<Resource> fringe = null;\r
- Set<Resource> exclusions = null;\r
- Function1<Statement,ExclusionDecision> exclusionFunction = null;\r
- Set<Resource> predicates = null;\r
- Map<Resource,Boolean> isRelatedToPredicates = null;\r
- Set<Resource> deadPredicates = null;\r
- Set<Resource> strongInverseSet = null;\r
-\r
- TIntIntHashMap ids = null;\r
- ResourceMap<ExtentStatus> status = null;\r
- Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();\r
- final SerialisationSupport support;\r
- final TransferableGraphConfiguration2 conf;\r
- final TransferableGraphSupport tgs;\r
-\r
- private Layer0 L0;\r
-\r
- private long composedObjectCounter = 0;\r
- private long fastInternalCounter = 0;\r
- private long parentExternalCounter = 0;\r
- private long fullInternalCounter = 0;\r
- private long fullExternalCounter = 0;\r
-\r
- long startupTime = 0;\r
- long expandTime = 0;\r
- long fullResolveTime = 0;\r
- long fastResolveTime = 0;\r
- long otherStatementTime = 0;\r
- long parentResolveTime = 0;\r
- long extentSeedTime = 0;\r
- long classifyPredicateTime = 0;\r
- long processFringeTime = 0;\r
- long valueOutputTime = 0;\r
- long statementOutputTime = 0;\r
-\r
- public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {\r
-\r
- this.L0 = Layer0.getInstance(graph);\r
- this.tgs = graph.getService(TransferableGraphSupport.class);\r
-\r
- this.support = graph.getService(SerialisationSupport.class);\r
- this.ignoreVirtual = ignoreVirtual;\r
- this.conf = conf;\r
-\r
- if(PROFILE)\r
- startupTime -= System.nanoTime();\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
-\r
- ids = state.ids;\r
- status = cs.createMap(ExtentStatus.class);\r
- predicates = cs.createSet();\r
- exclusions = cs.createSet();\r
- exclusionFunction = conf.exclusionFunction;\r
- fringe = new TreeSet<Resource>();\r
- isRelatedToPredicates = cs.createMap(Boolean.class);\r
- deadPredicates = cs.createSet();\r
- strongInverseSet = cs.createSet();\r
-\r
- for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {\r
- status.put(entry.getKey(), entry.getValue());\r
- if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());\r
- if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());\r
- }\r
-\r
- if(PROFILE)\r
- startupTime += System.nanoTime();\r
- \r
-// for(RootSpec p : conf.roots) {\r
-// if(p.internal)\r
-// fringe.add(p.resource);\r
-// }\r
-\r
- }\r
- \r
- public ResourceMap<ExtentStatus> getStatus() {\r
- return status;\r
- }\r
-\r
- public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- Collection<Resource> list = cs.createList();\r
- Iterator<Resource> it = fringe.iterator();\r
- for(int i=0;i<maxAmount;i++) {\r
- if(!it.hasNext()) break;\r
- list.add(it.next());\r
- it.remove();\r
- }\r
-\r
- return graph.syncRequest(new Expansion3(list, ignoreVirtual));\r
-\r
- }\r
-\r
- public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {\r
-\r
- if(PROFILE)\r
- expandTime -= System.nanoTime();\r
-\r
- Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);\r
-\r
- if(PROFILE)\r
- expandTime += System.nanoTime();\r
-\r
- return result;\r
-\r
- }\r
-\r
- public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {\r
-\r
- for(Resource predicate : schedule) {\r
- \r
- Boolean isRelatedTo = Boolean.FALSE;\r
- \r
- Resource single = graph.getPossibleSuperrelation(predicate);\r
- if(single != null) {\r
- \r
- Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);\r
- if(singleIsRelatedTo == null) {\r
- singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);\r
- isRelatedToPredicates.put(single, singleIsRelatedTo);\r
- }\r
-\r
- isRelatedTo = singleIsRelatedTo;\r
- \r
- } else {\r
- \r
- if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {\r
- isRelatedTo = Boolean.TRUE;\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));\r
- } else {\r
- if (!graph.hasStatement(predicate)) {\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);\r
- deadPredicates.add(predicate);\r
- // Prevents ModelTransferableGraphSource from\r
- // trying to export these statements.\r
- state.inverses.remove(support.getTransientId(predicate));\r
- }\r
- }\r
- \r
- }\r
- \r
- isRelatedToPredicates.put(predicate, isRelatedTo);\r
-\r
- }\r
-\r
- }\r
-\r
- public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {\r
-\r
- if(PROFILE)\r
- classifyPredicateTime -= System.nanoTime();\r
-\r
- CollectionSupport cs = graph.getService(CollectionSupport.class);\r
- final Set<Resource> schedule = cs.createSet();\r
- final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);\r
-\r
- for (DirectStatements stms : expansion) {\r
- for(Statement stm : stms) {\r
-\r
- Resource predicate = stm.getPredicate();\r
- Resource object = stm.getObject();\r
-\r
- if (exclusions.contains(object) || exclusions.contains(predicate))\r
- continue;\r
- \r
- if (exclusionFunction != null) {\r
- ExclusionDecision decision = exclusionFunction.apply(stm);\r
- if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {\r
- status.put(object, ExtentStatus.EXCLUDED);\r
- exclusions.add(object);\r
- continue;\r
- }\r
- }\r
-\r
- if(predicates.add(predicate)) {\r
- Resource inverse = graph.getPossibleInverse(predicate);\r
- schedule.add(predicate);\r
- if(inverse != null) {\r
- newPredicates.put(predicate, inverse);\r
- if(predicates.add(inverse)) schedule.add(inverse);\r
- state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));\r
- state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));\r
- } else {\r
- state.inverses.put(support.getTransientId(predicate), 0);\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));\r
- }\r
-\r
- }\r
-\r
- }\r
- }\r
-\r
- classifyPredicates(graph, schedule, state);\r
-\r
- for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {\r
- // Inverse is strong => this has strong inverse\r
- Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());\r
- if(isRelatedToValue) strongInverseSet.add(entry.getKey());\r
- Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());\r
- if(isRelatedToKey) strongInverseSet.add(entry.getValue());\r
- }\r
-\r
- if(PROFILE)\r
- classifyPredicateTime += System.nanoTime();\r
-\r
- }\r
-\r
- /*\r
- * Composed objects are internal. Mark them for expansion.\r
- */\r
-\r
- private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {\r
- Resource object = graph.getSingleObject(subject, L0.HasDataType);\r
- return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());\r
- }\r
-\r
- public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {\r
- final InputStream valueStream = tgs.getValueStream(graph, subject);\r
- if (valueStream != null) {\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));\r
- state.valueOutput.writeInt(sId);\r
-\r
- if (conf.values) {\r
- Datatype dt = getDatatype(graph, subject);\r
-\r
- boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);\r
- long rawVariantSizePos = 0;\r
- state.valueOutput.writeByte(canWriteRawVariant\r
- ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE\r
- : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);\r
- if (canWriteRawVariant) {\r
- // Add space for raw variant byte size before the data\r
- rawVariantSizePos = state.valueOutput.position();\r
- state.valueOutput.writeInt(0);\r
- }\r
-\r
- byte[] typeBytes = bindings.get(dt);\r
- if (typeBytes == null) {\r
- typeBytes = datatypeSerializer.serialize(dt);\r
- bindings.put(dt, typeBytes);\r
- }\r
-\r
- state.valueOutput.write(typeBytes);\r
- Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));\r
- s.skip(new InputStream() {\r
- @Override\r
- public int read() throws IOException {\r
- int value = valueStream.read();\r
- state.valueOutput.write(value);\r
- return value;\r
- }\r
- });\r
-\r
- if (canWriteRawVariant) {\r
- long currentPos = state.valueOutput.position();\r
- int variantSize = (int)(currentPos - rawVariantSizePos - 4);\r
- state.valueOutput.position(rawVariantSizePos);\r
- state.valueOutput.writeInt(variantSize);\r
- state.valueOutput.position(currentPos);\r
- }\r
- }\r
-\r
- state.valueCount++;\r
- }\r
- }\r
-\r
- private TIntArrayList stream = new TIntArrayList();\r
-\r
- public void addToStream(Resource predicate, Resource object) throws DatabaseException {\r
- stream.add(support.getTransientId(predicate));\r
- stream.add(support.getTransientId(object));\r
- }\r
-\r
- public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {\r
-\r
- Resource predicate = stm.getPredicate();\r
-\r
- Resource object = stm.getObject();\r
-\r
- ExtentStatus objectStatus = status.get(object);\r
-\r
- // Strong predicate\r
- Boolean isRelatedTo = isRelatedToPredicates.get(predicate);\r
- if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {\r
-\r
- if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);\r
-\r
- addToStream(predicate, object);\r
-\r
- if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));\r
- fringe.add(object);\r
- }\r
-\r
- } else {\r
-\r
- // Dead predicate\r
- if (deadPredicates.contains(predicate)) {\r
- if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);\r
- return;\r
- }\r
-\r
- // Weak predicate\r
- if(objectStatus == ExtentStatus.EXCLUDED) {\r
-\r
- if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);\r
-\r
- } else {\r
-\r
- // The inverse is also weak (or there is no inverse)\r
- if(!strongInverseSet.contains(predicate)) {\r
-\r
- addToStream(predicate, object);\r
-\r
- if(objectStatus == null) {\r
- status.put(object, ExtentStatus.PENDING);\r
- }\r
-\r
- if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);\r
-\r
- } else {\r
-\r
- if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);\r
-\r
- }\r
-\r
- }\r
-\r
- }\r
- \r
- }\r
- \r
- public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {\r
- if(!stream.isEmpty()) {\r
- state.statementsOutput.writeInt(sId);\r
- int streamSize = stream.size();\r
- int statementCount = stream.size() / 2;\r
- state.statementsOutput.writeInt(statementCount);\r
- for (int i = 0; i < streamSize; i++)\r
- state.statementsOutput.writeInt(stream.getQuick(i));\r
- state.statementCount += 2*streamSize;\r
- stream.resetQuick();\r
- }\r
- }\r
-\r
- // For progress monitor book-keeping\r
- private long internalResourceNumber = 0;\r
- private long startTime = 0;\r
- private long lastUpdateTime = 0;\r
-\r
- public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {\r
-\r
- internalResourceNumber++;\r
-\r
- // Update progress monitor with controlled frequency\r
- long t = System.nanoTime();\r
- long dt = t - lastUpdateTime;\r
- if (dt > 200_000_000L) {\r
- if (startTime == 0)\r
- startTime = t;\r
- lastUpdateTime = t;\r
- double totalTime = (t - startTime) * 1e-9;\r
- if (totalTime > 0) {\r
- long speed = Math.round((double)internalResourceNumber / totalTime);\r
- state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");\r
- }\r
- }\r
-\r
- status.put(subject, ExtentStatus.INTERNAL);\r
- if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));\r
-\r
- int sId = support.getTransientId(subject);\r
-\r
- if(PROFILE)\r
- valueOutputTime -= System.nanoTime();\r
-\r
- processValue(graph, subject, sId, state);\r
-\r
- if(PROFILE)\r
- valueOutputTime += System.nanoTime();\r
-\r
- if(PROFILE)\r
- statementOutputTime -= System.nanoTime();\r
-\r
- for(Statement stm : stms) {\r
- processStatement(graph, subject, stm);\r
- }\r
-\r
- flushStatementStream(sId, state);\r
-\r
- if(PROFILE)\r
- statementOutputTime += System.nanoTime();\r
-\r
- // Logarithmic progress monitor for unknown amount of work.\r
- state.monitor.setWorkRemaining(100000);\r
- state.monitor.worked(1);\r
- }\r
-\r
- public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {\r
-\r
- if(PROFILE)\r
- processFringeTime -= System.nanoTime();\r
-\r
- for (DirectStatements stms : expansion) {\r
-\r
- Resource subject = stms.getSubject();\r
-\r
- boolean partOf = false;\r
- for(Statement stm : stms) {\r
- Resource predicate = stm.getPredicate();\r
- if(L0.PartOf.equals(predicate)) {\r
- partOf = true;\r
- break;\r
- }\r
- }\r
-\r
- ExtentStatus subjectStatus = status.get(subject);\r
- if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);\r
- if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;\r
- if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {\r
-\r
- status.put(subject, ExtentStatus.EXTERNAL);\r
- if(ModelTransferableGraphSourceRequest.LOG) {\r
- String uri = graph.getPossibleURI(subject);\r
- if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);\r
- else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);\r
- }\r
-\r
- } else {\r
-\r
- processInternal(graph, subject, stms, state);\r
- \r
- }\r
-\r
- }\r
-\r
- if(PROFILE)\r
- processFringeTime += System.nanoTime();\r
-\r
- }\r
-\r
- public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {\r
-\r
- try {\r
-\r
- this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);\r
- this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);\r
- this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);\r
-\r
- for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) {\r
- if (status.put(r, ExtentStatus.INTERNAL) == null) {\r
- if(ModelTransferableGraphSourceRequest.LOG) {\r
- String URI = graph.getPossibleURI(r);\r
- if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);\r
- else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);\r
- }\r
- fringe.add(r);\r
- }\r
- }\r
-\r
- if (state.monitor.isCanceled())\r
- throw new CancelTransactionException();\r
-\r
- while(!fringe.isEmpty()) {\r
-\r
- Collection<DirectStatements> expansion = expand(graph);\r
- classifyPredicates(graph, state, expansion);\r
- processFringe(graph, expansion, state);\r
-\r
- if (state.monitor.isCanceled())\r
- throw new CancelTransactionException();\r
- }\r
-\r
- if (ModelTransferableGraphSourceRequest.PROFILE) {\r
- System.out.println(composedObjectCounter + " " + fastInternalCounter\r
- + " " + parentExternalCounter + " "\r
- + fullExternalCounter + " " + fullInternalCounter);\r
- }\r
-\r
- } catch (IOException e) {\r
- throw new DatabaseException(e);\r
- }\r
-\r
- }\r
-\r
- void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {\r
- if(ModelTransferableGraphSourceRequest.LOG) {\r
- SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
- String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));\r
- String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));\r
- String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));\r
- ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);\r
- }\r
- }\r
-\r
- void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {\r
- if(ModelTransferableGraphSourceRequest.LOG) {\r
- String s = NameUtils.getURIOrSafeNameInternal(graph, sId);\r
- String p = NameUtils.getURIOrSafeNameInternal(graph, pId);\r
- String o = NameUtils.getURIOrSafeNameInternal(graph, oId);\r
- ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);\r
- }\r
- }\r
-\r
+package org.simantics.db.layer0.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.Databoard;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.db.DirectStatements;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.ResourceMap;
+import org.simantics.db.Statement;
+import org.simantics.db.common.StandardStatement;
+import org.simantics.db.common.primitiverequest.Value;
+import org.simantics.db.common.procedure.adapter.TransientCacheListener;
+import org.simantics.db.common.utils.NameUtils;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
+import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry;
+import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
+import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec;
+import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec.SeedSpecType;
+import org.simantics.db.service.CollectionSupport;
+import org.simantics.db.service.SerialisationSupport;
+import org.simantics.db.service.TransferableGraphSupport;
+import org.simantics.graph.db.TransferableGraphSource;
+import org.simantics.layer0.Layer0;
+import org.simantics.scl.runtime.function.Function1;
+import org.simantics.utils.datastructures.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.list.array.TIntArrayList;
+import gnu.trove.map.hash.TIntIntHashMap;
+
+public class DomainProcessor3 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DomainProcessor3.class);
+
+ public enum ExclusionDecision {
+ INCLUDE, EXCLUDE_OBJECT
+ }
+
+ final static private boolean PROFILE = false;
+
+ Serializer variantSerializer;
+ Serializer datatypeSerializer;
+ Binding datatypeBinding;
+ boolean ignoreVirtual;
+
+ int id = 0;
+
+ Set<Resource> fringe = null;
+ Function1<Statement,ExclusionDecision> exclusionFunction = null;
+ Set<Resource> predicates = null;
+ Map<Resource,Boolean> isRelatedToPredicates = null;
+ Set<Resource> deadPredicates = null;
+ Set<Resource> strongInverseSet = null;
+ List<Statement> unresolvedWeakLinks = new ArrayList<>();
+ TIntIntHashMap ids = null;
+ ResourceMap<ExtentStatus> status = null;
+ Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
+ final SerialisationSupport support;
+ final TransferableGraphConfiguration2 conf;
+ final TransferableGraphSupport tgs;
+
+ private Layer0 L0;
+
+ private long composedObjectCounter = 0;
+ private long fastInternalCounter = 0;
+ private long parentExternalCounter = 0;
+ private long fullInternalCounter = 0;
+ private long fullExternalCounter = 0;
+
+ long startupTime = 0;
+ long expandTime = 0;
+ long fullResolveTime = 0;
+ long fastResolveTime = 0;
+ long otherStatementTime = 0;
+ long parentResolveTime = 0;
+ long extentSeedTime = 0;
+ long classifyPredicateTime = 0;
+ long processFringeTime = 0;
+ long valueOutputTime = 0;
+ long statementOutputTime = 0;
+
+ public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
+
+ this.L0 = Layer0.getInstance(graph);
+ this.tgs = graph.getService(TransferableGraphSupport.class);
+
+ this.support = graph.getService(SerialisationSupport.class);
+ this.ignoreVirtual = ignoreVirtual;
+ this.conf = conf;
+
+ if(PROFILE)
+ startupTime -= System.nanoTime();
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+
+ ids = state.ids;
+ status = cs.createMap(ExtentStatus.class);
+ predicates = cs.createSet();
+ exclusionFunction = conf.exclusionFunction;
+ fringe = new TreeSet<Resource>();
+ isRelatedToPredicates = cs.createMap(Boolean.class);
+ deadPredicates = cs.createSet();
+ strongInverseSet = cs.createSet();
+
+ if(LOGGER.isDebugEnabled()) {
+
+ for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
+ LOGGER.debug("prestatus: " + NameUtils.getSafeName(graph, entry.getKey()) + " " + entry.getValue());
+ }
+
+ for(SeedSpec ss : conf.seeds) {
+ LOGGER.debug("seed: " + NameUtils.getSafeName(graph, ss.resource) + " " + ss.name + " " + ss.specType + " " + ss.type);
+ }
+
+ }
+
+ for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
+ // INTERNAL prestatus shall be ignored. Domain processor will initialize statuses based on seeds.
+ if (entry.getValue().equals(ExtentStatus.INTERNAL)) {
+ LOGGER.info("Unexpected INTERNAL preStatus in DomainProcessor3 " + entry.getKey());
+ } else {
+ status.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ for(SeedSpec ss : conf.seeds) {
+ ExtentStatus pre = status.get(ss.resource);
+ // INTERNAL seeds are feed into ConsistsOfProcess
+ if(SeedSpecType.INTERNAL.equals(ss.specType)) {
+ if(pre != null && !ExtentStatus.INTERNAL.equals(pre))
+ LOGGER.info("Internal seed preclassification problem, expected INTERNAL preclassification, got " + pre.name());
+ continue;
+ } else if(SeedSpecType.ROOT.equals(ss.specType)) {
+ // Non-internal resources are not reported as internals by ConsistsOfProcess so they are manually entered into fringe
+ fringe.add(ss.resource);
+ if(pre != null)
+ LOGGER.info("Root preclassification problem, expected no preclassification, got " + pre.name());
+ // Roots are classified in status as INTERNAL
+ status.put(ss.resource, ExtentStatus.INTERNAL);
+ } else if(SeedSpecType.ROOT.equals(ss.specType)) {
+ // Special roots e.g. %model are marked as EXTERNAL
+ if(pre != null && !ExtentStatus.EXTERNAL.equals(pre))
+ LOGGER.info("Special root preclassification problem, expected EXTERNAL preclassification, got " + pre.name());
+ status.put(ss.resource, ExtentStatus.EXTERNAL);
+ }
+ }
+
+ if(PROFILE)
+ startupTime += System.nanoTime();
+
+ }
+
+ public ResourceMap<ExtentStatus> getStatus() {
+ return status;
+ }
+
+ public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ Collection<Resource> list = cs.createList();
+ Iterator<Resource> it = fringe.iterator();
+ for(int i=0;i<maxAmount;i++) {
+ if(!it.hasNext()) break;
+ list.add(it.next());
+ it.remove();
+ }
+
+ return graph.syncRequest(new Expansion3(list, ignoreVirtual));
+
+ }
+
+ public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
+
+ if(PROFILE)
+ expandTime -= System.nanoTime();
+
+ Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
+
+ if(PROFILE)
+ expandTime += System.nanoTime();
+
+ return result;
+
+ }
+
+ public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
+
+ for(Resource predicate : schedule) {
+
+ Boolean isRelatedTo = Boolean.FALSE;
+
+ Resource single = graph.getPossibleSuperrelation(predicate);
+ if(single != null) {
+
+ Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
+ if(singleIsRelatedTo == null) {
+ singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
+ isRelatedToPredicates.put(single, singleIsRelatedTo);
+ }
+
+ isRelatedTo = singleIsRelatedTo;
+
+ } else {
+
+ if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
+ isRelatedTo = Boolean.TRUE;
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
+ } else {
+ if (!graph.hasStatement(predicate)) {
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
+ deadPredicates.add(predicate);
+ // Prevents ModelTransferableGraphSource from
+ // trying to export these statements.
+ state.inverses.remove(support.getTransientId(predicate));
+ }
+ }
+
+ }
+
+ isRelatedToPredicates.put(predicate, isRelatedTo);
+
+ }
+
+ }
+
+ public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
+
+ if(PROFILE)
+ classifyPredicateTime -= System.nanoTime();
+
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ final Set<Resource> schedule = cs.createSet();
+ final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
+
+ for (DirectStatements stms : expansion) {
+ for(Statement stm : stms) {
+
+ Resource predicate = stm.getPredicate();
+ Resource object = stm.getObject();
+
+ if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue;
+ if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue;
+
+ if (exclusionFunction != null) {
+ ExclusionDecision decision = exclusionFunction.apply(stm);
+ if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
+ status.put(object, ExtentStatus.EXCLUDED);
+ continue;
+ }
+ }
+
+ if(predicates.add(predicate)) {
+ Resource inverse = graph.getPossibleInverse(predicate);
+ schedule.add(predicate);
+ if(inverse != null) {
+ newPredicates.put(predicate, inverse);
+ if(predicates.add(inverse)) schedule.add(inverse);
+ state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
+ state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
+ } else {
+ state.inverses.put(support.getTransientId(predicate), 0);
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
+ }
+
+ }
+
+ }
+ }
+
+ classifyPredicates(graph, schedule, state);
+
+ for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
+ // Inverse is strong => this has strong inverse
+ Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
+ if(isRelatedToValue) strongInverseSet.add(entry.getKey());
+ Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
+ if(isRelatedToKey) strongInverseSet.add(entry.getValue());
+ }
+
+ if(PROFILE)
+ classifyPredicateTime += System.nanoTime();
+
+ }
+
+ /*
+ * Composed objects are internal. Mark them for expansion.
+ */
+
+ private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
+ Resource object = graph.getSingleObject(subject, L0.HasDataType);
+ return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
+ }
+
+ public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
+ final InputStream valueStream = tgs.getValueStream(graph, subject);
+ if (valueStream != null) {
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
+ state.valueOutput.writeInt(sId);
+
+ if (conf.values) {
+ Datatype dt = getDatatype(graph, subject);
+
+ boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
+ long rawVariantSizePos = 0;
+ state.valueOutput.writeByte(canWriteRawVariant
+ ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
+ : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
+ if (canWriteRawVariant) {
+ // Add space for raw variant byte size before the data
+ rawVariantSizePos = state.valueOutput.position();
+ state.valueOutput.writeInt(0);
+ }
+
+ byte[] typeBytes = bindings.get(dt);
+ if (typeBytes == null) {
+ typeBytes = datatypeSerializer.serialize(dt);
+ bindings.put(dt, typeBytes);
+ }
+
+ state.valueOutput.write(typeBytes);
+ Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
+ s.skip(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ int value = valueStream.read();
+ state.valueOutput.write(value);
+ return value;
+ }
+ });
+
+ if (canWriteRawVariant) {
+ long currentPos = state.valueOutput.position();
+ int variantSize = (int)(currentPos - rawVariantSizePos - 4);
+ state.valueOutput.position(rawVariantSizePos);
+ state.valueOutput.writeInt(variantSize);
+ state.valueOutput.position(currentPos);
+ }
+ }
+
+ state.valueCount++;
+ }
+ }
+
+ private TIntArrayList stream = new TIntArrayList();
+
+ public void addToStream(Resource predicate, Resource object) throws DatabaseException {
+ stream.add(support.getTransientId(predicate));
+ stream.add(support.getTransientId(object));
+ }
+
+ public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
+
+ Resource predicate = stm.getPredicate();
+
+ Resource object = stm.getObject();
+
+ ExtentStatus objectStatus = status.get(object);
+
+ // Strong predicate
+ Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
+ if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
+
+ if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
+
+ addToStream(predicate, object);
+
+ if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
+ fringe.add(object);
+ }
+
+ } else {
+
+ // Dead predicate
+ if (deadPredicates.contains(predicate)) {
+ if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
+ return;
+ }
+
+ // Weak predicate
+ if(objectStatus == ExtentStatus.EXCLUDED) {
+
+ if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
+
+ } else {
+
+ // The inverse is also weak (or there is no inverse)
+ if(!strongInverseSet.contains(predicate)) {
+
+ unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
+ //addToStream(predicate, object);
+
+ if(objectStatus == null) {
+ status.put(object, ExtentStatus.PENDING);
+ }
+
+ if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
+
+ } else {
+
+ if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
+
+ }
+
+ }
+
+ }
+
+ }
+
+ public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
+ if(!stream.isEmpty()) {
+ state.statementsOutput.writeInt(sId);
+ int streamSize = stream.size();
+ int statementCount = stream.size() / 2;
+ state.statementsOutput.writeInt(statementCount);
+ for (int i = 0; i < streamSize; i++)
+ state.statementsOutput.writeInt(stream.getQuick(i));
+ state.statementCount += 2*streamSize;
+ stream.resetQuick();
+ }
+ }
+
+ // For progress monitor book-keeping
+ private long internalResourceNumber = 0;
+ private long startTime = 0;
+ private long lastUpdateTime = 0;
+
+ public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
+
+ internalResourceNumber++;
+
+ // Update progress monitor with controlled frequency
+ long t = System.nanoTime();
+ long dt = t - lastUpdateTime;
+ if (dt > 200_000_000L) {
+ if (startTime == 0)
+ startTime = t;
+ lastUpdateTime = t;
+ double totalTime = (t - startTime) * 1e-9;
+ if (totalTime > 0) {
+ long speed = Math.round((double)internalResourceNumber / totalTime);
+ state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
+ }
+ }
+
+ status.put(subject, ExtentStatus.INTERNAL);
+ if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
+
+ int sId = support.getTransientId(subject);
+
+ if(PROFILE)
+ valueOutputTime -= System.nanoTime();
+
+ processValue(graph, subject, sId, state);
+
+ if(PROFILE)
+ valueOutputTime += System.nanoTime();
+
+ if(PROFILE)
+ statementOutputTime -= System.nanoTime();
+
+ for(Statement stm : stms) {
+ processStatement(graph, subject, stm);
+ }
+
+ flushStatementStream(sId, state);
+
+ if(PROFILE)
+ statementOutputTime += System.nanoTime();
+
+ // Logarithmic progress monitor for unknown amount of work.
+ state.monitor.setWorkRemaining(100000);
+ state.monitor.worked(1);
+ }
+
+ public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
+
+ if(PROFILE)
+ processFringeTime -= System.nanoTime();
+
+ for (DirectStatements stms : expansion) {
+
+ Resource subject = stms.getSubject();
+
+ boolean partOf = false;
+ for(Statement stm : stms) {
+ Resource predicate = stm.getPredicate();
+ if(L0.PartOf.equals(predicate)) {
+ partOf = true;
+ break;
+ }
+ }
+
+ ExtentStatus subjectStatus = status.get(subject);
+ if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
+ if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
+ if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
+
+ status.put(subject, ExtentStatus.EXTERNAL);
+ if(ModelTransferableGraphSourceRequest.DEBUG) {
+ String uri = graph.getPossibleURI(subject);
+ if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
+ else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
+ }
+
+ } else {
+
+ processInternal(graph, subject, stms, state);
+
+ }
+
+ }
+
+ if(PROFILE)
+ processFringeTime += System.nanoTime();
+
+ }
+
+ public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
+
+ try {
+
+ this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
+ this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
+ this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
+
+ Pair<List<ConsistsOfProcessEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual);
+ state.internalEntries = pair.first;
+
+ for(ConsistsOfProcessEntry entry : state.internalEntries) {
+ Resource r = entry.resource;
+ if (status.put(r, ExtentStatus.INTERNAL) == null) {
+ if(ModelTransferableGraphSourceRequest.DEBUG) {
+ String URI = graph.getPossibleURI(r);
+ if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
+ else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
+ }
+ fringe.add(r);
+ }
+ }
+
+ for(Resource unnamedChild : pair.second) {
+ if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
+ fringe.add(unnamedChild);
+ }
+ }
+
+ if (state.monitor.isCanceled())
+ throw new CancelTransactionException();
+
+ while(!fringe.isEmpty()) {
+
+ Collection<DirectStatements> expansion = expand(graph);
+ classifyPredicates(graph, state, expansion);
+ processFringe(graph, expansion, state);
+
+ if (state.monitor.isCanceled())
+ throw new CancelTransactionException();
+ }
+
+ if (ModelTransferableGraphSourceRequest.PROFILE) {
+ System.out.println(composedObjectCounter + " " + fastInternalCounter
+ + " " + parentExternalCounter + " "
+ + fullExternalCounter + " " + fullInternalCounter);
+ }
+
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ }
+
+ }
+
+ void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
+ if(ModelTransferableGraphSourceRequest.DEBUG) {
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
+ String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
+ String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
+ String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
+ ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
+ }
+ }
+
+ void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
+ if(ModelTransferableGraphSourceRequest.DEBUG) {
+ String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
+ String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
+ String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
+ ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
+ }
+ }
+