1 package org.simantics.db.layer0.util;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Iterator;
12 import java.util.TreeSet;
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.Databoard;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.serialization.Serializer;
18 import org.simantics.databoard.type.Datatype;
19 import org.simantics.db.DirectStatements;
20 import org.simantics.db.ReadGraph;
21 import org.simantics.db.Resource;
22 import org.simantics.db.ResourceMap;
23 import org.simantics.db.Statement;
24 import org.simantics.db.common.StandardStatement;
25 import org.simantics.db.common.primitiverequest.Value;
26 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
27 import org.simantics.db.common.utils.NameUtils;
28 import org.simantics.db.exception.CancelTransactionException;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
31 import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry;
32 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
33 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec;
34 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec.SeedSpecType;
35 import org.simantics.db.service.CollectionSupport;
36 import org.simantics.db.service.SerialisationSupport;
37 import org.simantics.db.service.TransferableGraphSupport;
38 import org.simantics.graph.db.TransferableGraphSource;
39 import org.simantics.layer0.Layer0;
40 import org.simantics.scl.runtime.function.Function1;
41 import org.simantics.utils.datastructures.Pair;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import gnu.trove.list.array.TIntArrayList;
46 import gnu.trove.map.hash.TIntIntHashMap;
48 public class DomainProcessor3 {
50 private static final Logger LOGGER = LoggerFactory.getLogger(DomainProcessor3.class);
52 public enum ExclusionDecision {
53 INCLUDE, EXCLUDE_OBJECT
56 final static private boolean PROFILE = false;
58 Serializer variantSerializer;
59 Serializer datatypeSerializer;
60 Binding datatypeBinding;
61 boolean ignoreVirtual;
65 Set<Resource> fringe = null;
66 Function1<Statement,ExclusionDecision> exclusionFunction = null;
67 Set<Resource> predicates = null;
68 Map<Resource,Boolean> isRelatedToPredicates = null;
69 Set<Resource> deadPredicates = null;
70 Set<Resource> strongInverseSet = null;
71 List<Statement> unresolvedWeakLinks = new ArrayList<>();
72 TIntIntHashMap ids = null;
73 ResourceMap<ExtentStatus> status = null;
74 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
75 final SerialisationSupport support;
76 final TransferableGraphConfiguration2 conf;
77 final TransferableGraphSupport tgs;
81 private long composedObjectCounter = 0;
82 private long fastInternalCounter = 0;
83 private long parentExternalCounter = 0;
84 private long fullInternalCounter = 0;
85 private long fullExternalCounter = 0;
89 long fullResolveTime = 0;
90 long fastResolveTime = 0;
91 long otherStatementTime = 0;
92 long parentResolveTime = 0;
93 long extentSeedTime = 0;
94 long classifyPredicateTime = 0;
95 long processFringeTime = 0;
96 long valueOutputTime = 0;
97 long statementOutputTime = 0;
99 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
101 this.L0 = Layer0.getInstance(graph);
102 this.tgs = graph.getService(TransferableGraphSupport.class);
104 this.support = graph.getService(SerialisationSupport.class);
105 this.ignoreVirtual = ignoreVirtual;
109 startupTime -= System.nanoTime();
111 CollectionSupport cs = graph.getService(CollectionSupport.class);
114 status = cs.createMap(ExtentStatus.class);
115 predicates = cs.createSet();
116 exclusionFunction = conf.exclusionFunction;
117 fringe = new TreeSet<Resource>();
118 isRelatedToPredicates = cs.createMap(Boolean.class);
119 deadPredicates = cs.createSet();
120 strongInverseSet = cs.createSet();
122 if(LOGGER.isDebugEnabled()) {
124 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
125 LOGGER.debug("prestatus: " + NameUtils.getSafeName(graph, entry.getKey()) + " " + entry.getValue());
128 for(SeedSpec ss : conf.seeds) {
129 LOGGER.debug("seed: " + NameUtils.getSafeName(graph, ss.resource) + " " + ss.name + " " + ss.specType + " " + ss.type);
134 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
135 // INTERNAL prestatus shall be ignored. Domain processor will initialize statuses based on seeds.
136 if (entry.getValue().equals(ExtentStatus.INTERNAL)) {
137 LOGGER.info("Unexpected INTERNAL preStatus in DomainProcessor3 " + entry.getKey());
139 status.put(entry.getKey(), entry.getValue());
143 for(SeedSpec ss : conf.seeds) {
144 ExtentStatus pre = status.get(ss.resource);
145 // INTERNAL seeds are feed into ConsistsOfProcess
146 if(SeedSpecType.INTERNAL.equals(ss.specType)) {
147 if(pre != null && !ExtentStatus.INTERNAL.equals(pre))
148 LOGGER.info("Internal seed preclassification problem, expected INTERNAL preclassification, got " + pre.name());
150 } else if(SeedSpecType.ROOT.equals(ss.specType)) {
151 // Non-internal resources are not reported as internals by ConsistsOfProcess so they are manually entered into fringe
152 fringe.add(ss.resource);
154 LOGGER.info("Root preclassification problem, expected no preclassification, got " + pre.name());
155 // Roots are classified in status as INTERNAL
156 status.put(ss.resource, ExtentStatus.INTERNAL);
157 } else if(SeedSpecType.ROOT.equals(ss.specType)) {
158 // Special roots e.g. %model are marked as EXTERNAL
159 if(pre != null && !ExtentStatus.EXTERNAL.equals(pre))
160 LOGGER.info("Special root preclassification problem, expected EXTERNAL preclassification, got " + pre.name());
161 status.put(ss.resource, ExtentStatus.EXTERNAL);
166 startupTime += System.nanoTime();
170 public ResourceMap<ExtentStatus> getStatus() {
174 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
176 CollectionSupport cs = graph.getService(CollectionSupport.class);
177 Collection<Resource> list = cs.createList();
178 Iterator<Resource> it = fringe.iterator();
179 for(int i=0;i<maxAmount;i++) {
180 if(!it.hasNext()) break;
185 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
189 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
192 expandTime -= System.nanoTime();
194 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
197 expandTime += System.nanoTime();
203 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
205 for(Resource predicate : schedule) {
207 Boolean isRelatedTo = Boolean.FALSE;
209 Resource single = graph.getPossibleSuperrelation(predicate);
212 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
213 if(singleIsRelatedTo == null) {
214 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
215 isRelatedToPredicates.put(single, singleIsRelatedTo);
218 isRelatedTo = singleIsRelatedTo;
222 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
223 isRelatedTo = Boolean.TRUE;
224 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
226 if (!graph.hasStatement(predicate)) {
227 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
228 deadPredicates.add(predicate);
229 // Prevents ModelTransferableGraphSource from
230 // trying to export these statements.
231 state.inverses.remove(support.getTransientId(predicate));
237 isRelatedToPredicates.put(predicate, isRelatedTo);
243 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
246 classifyPredicateTime -= System.nanoTime();
248 CollectionSupport cs = graph.getService(CollectionSupport.class);
249 final Set<Resource> schedule = cs.createSet();
250 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
252 for (DirectStatements stms : expansion) {
253 for(Statement stm : stms) {
255 Resource predicate = stm.getPredicate();
256 Resource object = stm.getObject();
258 if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue;
259 if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue;
261 if (exclusionFunction != null) {
262 ExclusionDecision decision = exclusionFunction.apply(stm);
263 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
264 status.put(object, ExtentStatus.EXCLUDED);
269 if(predicates.add(predicate)) {
270 Resource inverse = graph.getPossibleInverse(predicate);
271 schedule.add(predicate);
272 if(inverse != null) {
273 newPredicates.put(predicate, inverse);
274 if(predicates.add(inverse)) schedule.add(inverse);
275 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
276 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
277 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
279 state.inverses.put(support.getTransientId(predicate), 0);
280 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
288 classifyPredicates(graph, schedule, state);
290 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
291 // Inverse is strong => this has strong inverse
292 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
293 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
294 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
295 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
299 classifyPredicateTime += System.nanoTime();
304 * Composed objects are internal. Mark them for expansion.
307 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
308 Resource object = graph.getSingleObject(subject, L0.HasDataType);
309 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
312 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
313 final InputStream valueStream = tgs.getValueStream(graph, subject);
314 if (valueStream != null) {
315 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
316 state.valueOutput.writeInt(sId);
319 Datatype dt = getDatatype(graph, subject);
321 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
322 long rawVariantSizePos = 0;
323 state.valueOutput.writeByte(canWriteRawVariant
324 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
325 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
326 if (canWriteRawVariant) {
327 // Add space for raw variant byte size before the data
328 rawVariantSizePos = state.valueOutput.position();
329 state.valueOutput.writeInt(0);
332 byte[] typeBytes = bindings.get(dt);
333 if (typeBytes == null) {
334 typeBytes = datatypeSerializer.serialize(dt);
335 bindings.put(dt, typeBytes);
338 state.valueOutput.write(typeBytes);
339 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
340 s.skip(new InputStream() {
342 public int read() throws IOException {
343 int value = valueStream.read();
344 state.valueOutput.write(value);
349 if (canWriteRawVariant) {
350 long currentPos = state.valueOutput.position();
351 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
352 state.valueOutput.position(rawVariantSizePos);
353 state.valueOutput.writeInt(variantSize);
354 state.valueOutput.position(currentPos);
362 private TIntArrayList stream = new TIntArrayList();
364 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
365 stream.add(support.getTransientId(predicate));
366 stream.add(support.getTransientId(object));
369 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
371 Resource predicate = stm.getPredicate();
373 Resource object = stm.getObject();
375 ExtentStatus objectStatus = status.get(object);
378 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
379 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
381 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
383 addToStream(predicate, object);
385 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
386 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
393 if (deadPredicates.contains(predicate)) {
394 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
399 if(objectStatus == ExtentStatus.EXCLUDED) {
401 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
405 // The inverse is also weak (or there is no inverse)
406 if(!strongInverseSet.contains(predicate)) {
408 unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
409 //addToStream(predicate, object);
411 if(objectStatus == null) {
412 status.put(object, ExtentStatus.PENDING);
415 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
419 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
429 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
430 if(!stream.isEmpty()) {
431 state.statementsOutput.writeInt(sId);
432 int streamSize = stream.size();
433 int statementCount = stream.size() / 2;
434 state.statementsOutput.writeInt(statementCount);
435 for (int i = 0; i < streamSize; i++)
436 state.statementsOutput.writeInt(stream.getQuick(i));
437 state.statementCount += 2*streamSize;
442 // For progress monitor book-keeping
443 private long internalResourceNumber = 0;
444 private long startTime = 0;
445 private long lastUpdateTime = 0;
447 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
449 internalResourceNumber++;
451 // Update progress monitor with controlled frequency
452 long t = System.nanoTime();
453 long dt = t - lastUpdateTime;
454 if (dt > 200_000_000L) {
458 double totalTime = (t - startTime) * 1e-9;
460 long speed = Math.round((double)internalResourceNumber / totalTime);
461 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
465 status.put(subject, ExtentStatus.INTERNAL);
466 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
468 int sId = support.getTransientId(subject);
471 valueOutputTime -= System.nanoTime();
473 processValue(graph, subject, sId, state);
476 valueOutputTime += System.nanoTime();
479 statementOutputTime -= System.nanoTime();
481 for(Statement stm : stms) {
482 processStatement(graph, subject, stm);
485 flushStatementStream(sId, state);
488 statementOutputTime += System.nanoTime();
490 // Logarithmic progress monitor for unknown amount of work.
491 state.monitor.setWorkRemaining(100000);
492 state.monitor.worked(1);
495 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
498 processFringeTime -= System.nanoTime();
500 for (DirectStatements stms : expansion) {
502 Resource subject = stms.getSubject();
504 boolean partOf = false;
505 for(Statement stm : stms) {
506 Resource predicate = stm.getPredicate();
507 if(L0.PartOf.equals(predicate)) {
513 ExtentStatus subjectStatus = status.get(subject);
514 if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
515 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
516 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
518 status.put(subject, ExtentStatus.EXTERNAL);
519 if(ModelTransferableGraphSourceRequest.DEBUG) {
520 String uri = graph.getPossibleURI(subject);
521 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
522 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
527 processInternal(graph, subject, stms, state);
534 processFringeTime += System.nanoTime();
538 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
542 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
543 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
544 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
546 Pair<List<ConsistsOfProcessEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual);
547 state.internalEntries = pair.first;
549 for(ConsistsOfProcessEntry entry : state.internalEntries) {
550 Resource r = entry.resource;
551 if (status.put(r, ExtentStatus.INTERNAL) == null) {
552 if(ModelTransferableGraphSourceRequest.DEBUG) {
553 String URI = graph.getPossibleURI(r);
554 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
555 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
561 for(Resource unnamedChild : pair.second) {
562 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
563 fringe.add(unnamedChild);
567 if (state.monitor.isCanceled())
568 throw new CancelTransactionException();
570 while(!fringe.isEmpty()) {
572 Collection<DirectStatements> expansion = expand(graph);
573 classifyPredicates(graph, state, expansion);
574 processFringe(graph, expansion, state);
576 if (state.monitor.isCanceled())
577 throw new CancelTransactionException();
580 if (ModelTransferableGraphSourceRequest.PROFILE) {
581 System.out.println(composedObjectCounter + " " + fastInternalCounter
582 + " " + parentExternalCounter + " "
583 + fullExternalCounter + " " + fullInternalCounter);
586 } catch (IOException e) {
587 throw new DatabaseException(e);
592 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
593 if(ModelTransferableGraphSourceRequest.DEBUG) {
594 SerialisationSupport support = graph.getService(SerialisationSupport.class);
595 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
596 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
597 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
598 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
602 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
603 if(ModelTransferableGraphSourceRequest.DEBUG) {
604 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
605 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
606 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
607 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);