1 package org.simantics.db.layer0.util;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
11 import java.util.TreeSet;
13 import org.simantics.databoard.Bindings;
14 import org.simantics.databoard.Databoard;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.serialization.Serializer;
17 import org.simantics.databoard.type.Datatype;
18 import org.simantics.db.DirectStatements;
19 import org.simantics.db.ReadGraph;
20 import org.simantics.db.Resource;
21 import org.simantics.db.ResourceMap;
22 import org.simantics.db.Statement;
23 import org.simantics.db.common.primitiverequest.Value;
24 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
25 import org.simantics.db.common.utils.NameUtils;
26 import org.simantics.db.exception.CancelTransactionException;
27 import org.simantics.db.exception.DatabaseException;
28 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
29 import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry;
30 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
31 import org.simantics.db.service.CollectionSupport;
32 import org.simantics.db.service.SerialisationSupport;
33 import org.simantics.db.service.TransferableGraphSupport;
34 import org.simantics.graph.db.TransferableGraphSource;
35 import org.simantics.layer0.Layer0;
36 import org.simantics.scl.runtime.function.Function1;
37 import org.simantics.utils.datastructures.Pair;
39 import gnu.trove.list.array.TIntArrayList;
40 import gnu.trove.map.hash.TIntIntHashMap;
42 public class DomainProcessor3 {
44 public enum ExclusionDecision {
45 INCLUDE, EXCLUDE_OBJECT
48 final static private boolean PROFILE = false;
50 Serializer variantSerializer;
51 Serializer datatypeSerializer;
52 Binding datatypeBinding;
53 boolean ignoreVirtual;
57 Set<Resource> fringe = null;
58 Set<Resource> exclusions = null;
59 Function1<Statement,ExclusionDecision> exclusionFunction = null;
60 Set<Resource> predicates = null;
61 Map<Resource,Boolean> isRelatedToPredicates = null;
62 Set<Resource> deadPredicates = null;
63 Set<Resource> strongInverseSet = null;
65 TIntIntHashMap ids = null;
66 ResourceMap<ExtentStatus> status = null;
67 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
68 final SerialisationSupport support;
69 final TransferableGraphConfiguration2 conf;
70 final TransferableGraphSupport tgs;
74 private long composedObjectCounter = 0;
75 private long fastInternalCounter = 0;
76 private long parentExternalCounter = 0;
77 private long fullInternalCounter = 0;
78 private long fullExternalCounter = 0;
82 long fullResolveTime = 0;
83 long fastResolveTime = 0;
84 long otherStatementTime = 0;
85 long parentResolveTime = 0;
86 long extentSeedTime = 0;
87 long classifyPredicateTime = 0;
88 long processFringeTime = 0;
89 long valueOutputTime = 0;
90 long statementOutputTime = 0;
92 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
94 this.L0 = Layer0.getInstance(graph);
95 this.tgs = graph.getService(TransferableGraphSupport.class);
97 this.support = graph.getService(SerialisationSupport.class);
98 this.ignoreVirtual = ignoreVirtual;
102 startupTime -= System.nanoTime();
104 CollectionSupport cs = graph.getService(CollectionSupport.class);
107 status = cs.createMap(ExtentStatus.class);
108 predicates = cs.createSet();
109 exclusions = cs.createSet();
110 exclusionFunction = conf.exclusionFunction;
111 fringe = new TreeSet<Resource>();
112 isRelatedToPredicates = cs.createMap(Boolean.class);
113 deadPredicates = cs.createSet();
114 strongInverseSet = cs.createSet();
116 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
117 status.put(entry.getKey(), entry.getValue());
118 if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
119 if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
123 startupTime += System.nanoTime();
125 // for(RootSpec p : conf.roots) {
127 // fringe.add(p.resource);
132 public ResourceMap<ExtentStatus> getStatus() {
136 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
138 CollectionSupport cs = graph.getService(CollectionSupport.class);
139 Collection<Resource> list = cs.createList();
140 Iterator<Resource> it = fringe.iterator();
141 for(int i=0;i<maxAmount;i++) {
142 if(!it.hasNext()) break;
147 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
151 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
154 expandTime -= System.nanoTime();
156 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
159 expandTime += System.nanoTime();
165 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
167 for(Resource predicate : schedule) {
169 Boolean isRelatedTo = Boolean.FALSE;
171 Resource single = graph.getPossibleSuperrelation(predicate);
174 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
175 if(singleIsRelatedTo == null) {
176 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
177 isRelatedToPredicates.put(single, singleIsRelatedTo);
180 isRelatedTo = singleIsRelatedTo;
184 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
185 isRelatedTo = Boolean.TRUE;
186 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
188 if (!graph.hasStatement(predicate)) {
189 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
190 deadPredicates.add(predicate);
191 // Prevents ModelTransferableGraphSource from
192 // trying to export these statements.
193 state.inverses.remove(support.getTransientId(predicate));
199 isRelatedToPredicates.put(predicate, isRelatedTo);
205 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
208 classifyPredicateTime -= System.nanoTime();
210 CollectionSupport cs = graph.getService(CollectionSupport.class);
211 final Set<Resource> schedule = cs.createSet();
212 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
214 for (DirectStatements stms : expansion) {
215 for(Statement stm : stms) {
217 Resource predicate = stm.getPredicate();
218 Resource object = stm.getObject();
220 if (exclusions.contains(object) || exclusions.contains(predicate))
223 if (exclusionFunction != null) {
224 ExclusionDecision decision = exclusionFunction.apply(stm);
225 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
226 status.put(object, ExtentStatus.EXCLUDED);
227 exclusions.add(object);
232 if(predicates.add(predicate)) {
233 Resource inverse = graph.getPossibleInverse(predicate);
234 schedule.add(predicate);
235 if(inverse != null) {
236 newPredicates.put(predicate, inverse);
237 if(predicates.add(inverse)) schedule.add(inverse);
238 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
239 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
240 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
242 state.inverses.put(support.getTransientId(predicate), 0);
243 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
251 classifyPredicates(graph, schedule, state);
253 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
254 // Inverse is strong => this has strong inverse
255 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
256 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
257 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
258 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
262 classifyPredicateTime += System.nanoTime();
267 * Composed objects are internal. Mark them for expansion.
270 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
271 Resource object = graph.getSingleObject(subject, L0.HasDataType);
272 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
275 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
276 final InputStream valueStream = tgs.getValueStream(graph, subject);
277 if (valueStream != null) {
278 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
279 state.valueOutput.writeInt(sId);
282 Datatype dt = getDatatype(graph, subject);
284 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
285 long rawVariantSizePos = 0;
286 state.valueOutput.writeByte(canWriteRawVariant
287 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
288 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
289 if (canWriteRawVariant) {
290 // Add space for raw variant byte size before the data
291 rawVariantSizePos = state.valueOutput.position();
292 state.valueOutput.writeInt(0);
295 byte[] typeBytes = bindings.get(dt);
296 if (typeBytes == null) {
297 typeBytes = datatypeSerializer.serialize(dt);
298 bindings.put(dt, typeBytes);
301 state.valueOutput.write(typeBytes);
302 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
303 s.skip(new InputStream() {
305 public int read() throws IOException {
306 int value = valueStream.read();
307 state.valueOutput.write(value);
312 if (canWriteRawVariant) {
313 long currentPos = state.valueOutput.position();
314 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
315 state.valueOutput.position(rawVariantSizePos);
316 state.valueOutput.writeInt(variantSize);
317 state.valueOutput.position(currentPos);
325 private TIntArrayList stream = new TIntArrayList();
327 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
328 stream.add(support.getTransientId(predicate));
329 stream.add(support.getTransientId(object));
332 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
334 Resource predicate = stm.getPredicate();
336 Resource object = stm.getObject();
338 ExtentStatus objectStatus = status.get(object);
341 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
342 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
344 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
346 addToStream(predicate, object);
348 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
349 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
356 if (deadPredicates.contains(predicate)) {
357 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
362 if(objectStatus == ExtentStatus.EXCLUDED) {
364 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
368 // The inverse is also weak (or there is no inverse)
369 if(!strongInverseSet.contains(predicate)) {
371 addToStream(predicate, object);
373 if(objectStatus == null) {
374 status.put(object, ExtentStatus.PENDING);
377 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
381 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
391 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
392 if(!stream.isEmpty()) {
393 state.statementsOutput.writeInt(sId);
394 int streamSize = stream.size();
395 int statementCount = stream.size() / 2;
396 state.statementsOutput.writeInt(statementCount);
397 for (int i = 0; i < streamSize; i++)
398 state.statementsOutput.writeInt(stream.getQuick(i));
399 state.statementCount += 2*streamSize;
404 // For progress monitor book-keeping
405 private long internalResourceNumber = 0;
406 private long startTime = 0;
407 private long lastUpdateTime = 0;
409 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
411 internalResourceNumber++;
413 // Update progress monitor with controlled frequency
414 long t = System.nanoTime();
415 long dt = t - lastUpdateTime;
416 if (dt > 200_000_000L) {
420 double totalTime = (t - startTime) * 1e-9;
422 long speed = Math.round((double)internalResourceNumber / totalTime);
423 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
427 status.put(subject, ExtentStatus.INTERNAL);
428 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
430 int sId = support.getTransientId(subject);
433 valueOutputTime -= System.nanoTime();
435 processValue(graph, subject, sId, state);
438 valueOutputTime += System.nanoTime();
441 statementOutputTime -= System.nanoTime();
443 for(Statement stm : stms) {
444 processStatement(graph, subject, stm);
447 flushStatementStream(sId, state);
450 statementOutputTime += System.nanoTime();
452 // Logarithmic progress monitor for unknown amount of work.
453 state.monitor.setWorkRemaining(100000);
454 state.monitor.worked(1);
457 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
460 processFringeTime -= System.nanoTime();
462 for (DirectStatements stms : expansion) {
464 Resource subject = stms.getSubject();
466 boolean partOf = false;
467 for(Statement stm : stms) {
468 Resource predicate = stm.getPredicate();
469 if(L0.PartOf.equals(predicate)) {
475 ExtentStatus subjectStatus = status.get(subject);
476 if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
477 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
478 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
480 status.put(subject, ExtentStatus.EXTERNAL);
481 if(ModelTransferableGraphSourceRequest.LOG) {
482 String uri = graph.getPossibleURI(subject);
483 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
484 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
489 processInternal(graph, subject, stms, state);
496 processFringeTime += System.nanoTime();
500 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
504 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
505 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
506 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
508 Pair<List<InternalEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual);
509 state.internalEntries = pair.first;
511 for(InternalEntry entry : state.internalEntries) {
512 Resource r = entry.resource;
513 if (status.put(r, ExtentStatus.INTERNAL) == null) {
514 if(ModelTransferableGraphSourceRequest.LOG) {
515 String URI = graph.getPossibleURI(r);
516 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
517 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
523 for(Resource unnamedChild : pair.second) {
524 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
525 fringe.add(unnamedChild);
529 if (state.monitor.isCanceled())
530 throw new CancelTransactionException();
532 while(!fringe.isEmpty()) {
534 Collection<DirectStatements> expansion = expand(graph);
535 classifyPredicates(graph, state, expansion);
536 processFringe(graph, expansion, state);
538 if (state.monitor.isCanceled())
539 throw new CancelTransactionException();
542 if (ModelTransferableGraphSourceRequest.PROFILE) {
543 System.out.println(composedObjectCounter + " " + fastInternalCounter
544 + " " + parentExternalCounter + " "
545 + fullExternalCounter + " " + fullInternalCounter);
548 } catch (IOException e) {
549 throw new DatabaseException(e);
554 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
555 if(ModelTransferableGraphSourceRequest.LOG) {
556 SerialisationSupport support = graph.getService(SerialisationSupport.class);
557 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
558 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
559 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
560 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
564 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
565 if(ModelTransferableGraphSourceRequest.LOG) {
566 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
567 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
568 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
569 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);