1 package org.simantics.db.layer0.util;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Iterator;
12 import java.util.TreeSet;
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.Databoard;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.serialization.Serializer;
18 import org.simantics.databoard.type.Datatype;
19 import org.simantics.db.DirectStatements;
20 import org.simantics.db.ReadGraph;
21 import org.simantics.db.Resource;
22 import org.simantics.db.ResourceMap;
23 import org.simantics.db.Statement;
24 import org.simantics.db.common.StandardStatement;
25 import org.simantics.db.common.primitiverequest.Value;
26 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
27 import org.simantics.db.common.utils.NameUtils;
28 import org.simantics.db.exception.CancelTransactionException;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
31 import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry;
32 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
33 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec;
34 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec.SeedSpecType;
35 import org.simantics.db.service.CollectionSupport;
36 import org.simantics.db.service.SerialisationSupport;
37 import org.simantics.db.service.TransferableGraphSupport;
38 import org.simantics.graph.db.TransferableGraphSource;
39 import org.simantics.layer0.Layer0;
40 import org.simantics.scl.runtime.function.Function1;
41 import org.simantics.utils.datastructures.Pair;
43 import gnu.trove.list.array.TIntArrayList;
44 import gnu.trove.map.hash.TIntIntHashMap;
46 public class DomainProcessor3 {
48 public enum ExclusionDecision {
49 INCLUDE, EXCLUDE_OBJECT
52 final static private boolean PROFILE = false;
54 Serializer variantSerializer;
55 Serializer datatypeSerializer;
56 Binding datatypeBinding;
57 boolean ignoreVirtual;
61 Set<Resource> fringe = null;
62 Function1<Statement,ExclusionDecision> exclusionFunction = null;
63 Set<Resource> predicates = null;
64 Map<Resource,Boolean> isRelatedToPredicates = null;
65 Set<Resource> deadPredicates = null;
66 Set<Resource> strongInverseSet = null;
67 List<Statement> unresolvedWeakLinks = new ArrayList<>();
68 TIntIntHashMap ids = null;
69 ResourceMap<ExtentStatus> status = null;
70 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
71 final SerialisationSupport support;
72 final TransferableGraphConfiguration2 conf;
73 final TransferableGraphSupport tgs;
77 private long composedObjectCounter = 0;
78 private long fastInternalCounter = 0;
79 private long parentExternalCounter = 0;
80 private long fullInternalCounter = 0;
81 private long fullExternalCounter = 0;
85 long fullResolveTime = 0;
86 long fastResolveTime = 0;
87 long otherStatementTime = 0;
88 long parentResolveTime = 0;
89 long extentSeedTime = 0;
90 long classifyPredicateTime = 0;
91 long processFringeTime = 0;
92 long valueOutputTime = 0;
93 long statementOutputTime = 0;
95 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
97 this.L0 = Layer0.getInstance(graph);
98 this.tgs = graph.getService(TransferableGraphSupport.class);
100 this.support = graph.getService(SerialisationSupport.class);
101 this.ignoreVirtual = ignoreVirtual;
105 startupTime -= System.nanoTime();
107 CollectionSupport cs = graph.getService(CollectionSupport.class);
110 status = cs.createMap(ExtentStatus.class);
111 predicates = cs.createSet();
112 exclusionFunction = conf.exclusionFunction;
113 fringe = new TreeSet<Resource>();
114 isRelatedToPredicates = cs.createMap(Boolean.class);
115 deadPredicates = cs.createSet();
116 strongInverseSet = cs.createSet();
118 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
119 // INTERNAL prestatus shall be ignored. Domain processor will initialize statuses based on seeds.
120 if (entry.getValue().equals(ExtentStatus.INTERNAL)) continue;
121 status.put(entry.getKey(), entry.getValue());
124 for(SeedSpec ss : conf.seeds) {
125 if(SeedSpecType.INTERNAL.equals(ss.specType)) continue;
126 // Non-internal resources are not reported as internals by ConsistsOfProcess so they are manually entered into fringe
127 fringe.add(ss.resource);
128 // Roots are classified in status as INTERNAL
129 status.put(ss.resource, ExtentStatus.INTERNAL);
133 startupTime += System.nanoTime();
137 public ResourceMap<ExtentStatus> getStatus() {
141 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
143 CollectionSupport cs = graph.getService(CollectionSupport.class);
144 Collection<Resource> list = cs.createList();
145 Iterator<Resource> it = fringe.iterator();
146 for(int i=0;i<maxAmount;i++) {
147 if(!it.hasNext()) break;
152 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
156 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
159 expandTime -= System.nanoTime();
161 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
164 expandTime += System.nanoTime();
170 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
172 for(Resource predicate : schedule) {
174 Boolean isRelatedTo = Boolean.FALSE;
176 Resource single = graph.getPossibleSuperrelation(predicate);
179 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
180 if(singleIsRelatedTo == null) {
181 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
182 isRelatedToPredicates.put(single, singleIsRelatedTo);
185 isRelatedTo = singleIsRelatedTo;
189 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
190 isRelatedTo = Boolean.TRUE;
191 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
193 if (!graph.hasStatement(predicate)) {
194 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
195 deadPredicates.add(predicate);
196 // Prevents ModelTransferableGraphSource from
197 // trying to export these statements.
198 state.inverses.remove(support.getTransientId(predicate));
204 isRelatedToPredicates.put(predicate, isRelatedTo);
210 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
213 classifyPredicateTime -= System.nanoTime();
215 CollectionSupport cs = graph.getService(CollectionSupport.class);
216 final Set<Resource> schedule = cs.createSet();
217 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
219 for (DirectStatements stms : expansion) {
220 for(Statement stm : stms) {
222 Resource predicate = stm.getPredicate();
223 Resource object = stm.getObject();
225 if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue;
226 if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue;
228 if (exclusionFunction != null) {
229 ExclusionDecision decision = exclusionFunction.apply(stm);
230 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
231 status.put(object, ExtentStatus.EXCLUDED);
236 if(predicates.add(predicate)) {
237 Resource inverse = graph.getPossibleInverse(predicate);
238 schedule.add(predicate);
239 if(inverse != null) {
240 newPredicates.put(predicate, inverse);
241 if(predicates.add(inverse)) schedule.add(inverse);
242 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
243 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
244 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
246 state.inverses.put(support.getTransientId(predicate), 0);
247 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
255 classifyPredicates(graph, schedule, state);
257 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
258 // Inverse is strong => this has strong inverse
259 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
260 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
261 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
262 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
266 classifyPredicateTime += System.nanoTime();
271 * Composed objects are internal. Mark them for expansion.
274 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
275 Resource object = graph.getSingleObject(subject, L0.HasDataType);
276 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
279 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
280 final InputStream valueStream = tgs.getValueStream(graph, subject);
281 if (valueStream != null) {
282 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
283 state.valueOutput.writeInt(sId);
286 Datatype dt = getDatatype(graph, subject);
288 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
289 long rawVariantSizePos = 0;
290 state.valueOutput.writeByte(canWriteRawVariant
291 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
292 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
293 if (canWriteRawVariant) {
294 // Add space for raw variant byte size before the data
295 rawVariantSizePos = state.valueOutput.position();
296 state.valueOutput.writeInt(0);
299 byte[] typeBytes = bindings.get(dt);
300 if (typeBytes == null) {
301 typeBytes = datatypeSerializer.serialize(dt);
302 bindings.put(dt, typeBytes);
305 state.valueOutput.write(typeBytes);
306 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
307 s.skip(new InputStream() {
309 public int read() throws IOException {
310 int value = valueStream.read();
311 state.valueOutput.write(value);
316 if (canWriteRawVariant) {
317 long currentPos = state.valueOutput.position();
318 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
319 state.valueOutput.position(rawVariantSizePos);
320 state.valueOutput.writeInt(variantSize);
321 state.valueOutput.position(currentPos);
329 private TIntArrayList stream = new TIntArrayList();
331 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
332 stream.add(support.getTransientId(predicate));
333 stream.add(support.getTransientId(object));
336 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
338 Resource predicate = stm.getPredicate();
340 Resource object = stm.getObject();
342 ExtentStatus objectStatus = status.get(object);
345 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
346 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
348 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
350 addToStream(predicate, object);
352 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
353 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
360 if (deadPredicates.contains(predicate)) {
361 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
366 if(objectStatus == ExtentStatus.EXCLUDED) {
368 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
372 // The inverse is also weak (or there is no inverse)
373 if(!strongInverseSet.contains(predicate)) {
375 unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
376 //addToStream(predicate, object);
378 if(objectStatus == null) {
379 status.put(object, ExtentStatus.PENDING);
382 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
386 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
396 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
397 if(!stream.isEmpty()) {
398 state.statementsOutput.writeInt(sId);
399 int streamSize = stream.size();
400 int statementCount = stream.size() / 2;
401 state.statementsOutput.writeInt(statementCount);
402 for (int i = 0; i < streamSize; i++)
403 state.statementsOutput.writeInt(stream.getQuick(i));
404 state.statementCount += 2*streamSize;
409 // For progress monitor book-keeping
410 private long internalResourceNumber = 0;
411 private long startTime = 0;
412 private long lastUpdateTime = 0;
414 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
416 internalResourceNumber++;
418 // Update progress monitor with controlled frequency
419 long t = System.nanoTime();
420 long dt = t - lastUpdateTime;
421 if (dt > 200_000_000L) {
425 double totalTime = (t - startTime) * 1e-9;
427 long speed = Math.round((double)internalResourceNumber / totalTime);
428 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
432 status.put(subject, ExtentStatus.INTERNAL);
433 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
435 int sId = support.getTransientId(subject);
438 valueOutputTime -= System.nanoTime();
440 processValue(graph, subject, sId, state);
443 valueOutputTime += System.nanoTime();
446 statementOutputTime -= System.nanoTime();
448 for(Statement stm : stms) {
449 processStatement(graph, subject, stm);
452 flushStatementStream(sId, state);
455 statementOutputTime += System.nanoTime();
457 // Logarithmic progress monitor for unknown amount of work.
458 state.monitor.setWorkRemaining(100000);
459 state.monitor.worked(1);
462 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
465 processFringeTime -= System.nanoTime();
467 for (DirectStatements stms : expansion) {
469 Resource subject = stms.getSubject();
471 boolean partOf = false;
472 for(Statement stm : stms) {
473 Resource predicate = stm.getPredicate();
474 if(L0.PartOf.equals(predicate)) {
480 ExtentStatus subjectStatus = status.get(subject);
481 if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
482 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
483 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
485 status.put(subject, ExtentStatus.EXTERNAL);
486 if(ModelTransferableGraphSourceRequest.DEBUG) {
487 String uri = graph.getPossibleURI(subject);
488 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
489 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
494 processInternal(graph, subject, stms, state);
501 processFringeTime += System.nanoTime();
505 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
509 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
510 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
511 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
513 Pair<List<ConsistsOfProcessEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual);
514 state.internalEntries = pair.first;
516 for(ConsistsOfProcessEntry entry : state.internalEntries) {
517 Resource r = entry.resource;
518 if (status.put(r, ExtentStatus.INTERNAL) == null) {
519 if(ModelTransferableGraphSourceRequest.DEBUG) {
520 String URI = graph.getPossibleURI(r);
521 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
522 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
528 for(Resource unnamedChild : pair.second) {
529 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
530 fringe.add(unnamedChild);
534 if (state.monitor.isCanceled())
535 throw new CancelTransactionException();
537 while(!fringe.isEmpty()) {
539 Collection<DirectStatements> expansion = expand(graph);
540 classifyPredicates(graph, state, expansion);
541 processFringe(graph, expansion, state);
543 if (state.monitor.isCanceled())
544 throw new CancelTransactionException();
547 if (ModelTransferableGraphSourceRequest.PROFILE) {
548 System.out.println(composedObjectCounter + " " + fastInternalCounter
549 + " " + parentExternalCounter + " "
550 + fullExternalCounter + " " + fullInternalCounter);
553 } catch (IOException e) {
554 throw new DatabaseException(e);
559 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
560 if(ModelTransferableGraphSourceRequest.DEBUG) {
561 SerialisationSupport support = graph.getService(SerialisationSupport.class);
562 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
563 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
564 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
565 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
569 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
570 if(ModelTransferableGraphSourceRequest.DEBUG) {
571 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
572 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
573 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
574 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);