1 package org.simantics.db.layer0.util;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
11 import java.util.TreeSet;
13 import org.simantics.databoard.Bindings;
14 import org.simantics.databoard.Databoard;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.serialization.Serializer;
17 import org.simantics.databoard.type.Datatype;
18 import org.simantics.db.DirectStatements;
19 import org.simantics.db.ReadGraph;
20 import org.simantics.db.Resource;
21 import org.simantics.db.ResourceMap;
22 import org.simantics.db.Statement;
23 import org.simantics.db.common.primitiverequest.Value;
24 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
25 import org.simantics.db.common.utils.NameUtils;
26 import org.simantics.db.exception.CancelTransactionException;
27 import org.simantics.db.exception.DatabaseException;
28 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
29 import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry;
30 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
31 import org.simantics.db.service.CollectionSupport;
32 import org.simantics.db.service.SerialisationSupport;
33 import org.simantics.db.service.TransferableGraphSupport;
34 import org.simantics.graph.db.TransferableGraphSource;
35 import org.simantics.layer0.Layer0;
36 import org.simantics.scl.runtime.function.Function1;
38 import gnu.trove.list.array.TIntArrayList;
39 import gnu.trove.map.hash.TIntIntHashMap;
41 public class DomainProcessor3 {
43 public enum ExclusionDecision {
44 INCLUDE, EXCLUDE_OBJECT
47 final static private boolean PROFILE = false;
49 Serializer variantSerializer;
50 Serializer datatypeSerializer;
51 Binding datatypeBinding;
52 boolean ignoreVirtual;
56 Set<Resource> fringe = null;
57 Set<Resource> exclusions = null;
58 Function1<Statement,ExclusionDecision> exclusionFunction = null;
59 Set<Resource> predicates = null;
60 Map<Resource,Boolean> isRelatedToPredicates = null;
61 Set<Resource> deadPredicates = null;
62 Set<Resource> strongInverseSet = null;
64 TIntIntHashMap ids = null;
65 ResourceMap<ExtentStatus> status = null;
66 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
67 final SerialisationSupport support;
68 final TransferableGraphConfiguration2 conf;
69 final TransferableGraphSupport tgs;
73 private long composedObjectCounter = 0;
74 private long fastInternalCounter = 0;
75 private long parentExternalCounter = 0;
76 private long fullInternalCounter = 0;
77 private long fullExternalCounter = 0;
81 long fullResolveTime = 0;
82 long fastResolveTime = 0;
83 long otherStatementTime = 0;
84 long parentResolveTime = 0;
85 long extentSeedTime = 0;
86 long classifyPredicateTime = 0;
87 long processFringeTime = 0;
88 long valueOutputTime = 0;
89 long statementOutputTime = 0;
91 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
93 this.L0 = Layer0.getInstance(graph);
94 this.tgs = graph.getService(TransferableGraphSupport.class);
96 this.support = graph.getService(SerialisationSupport.class);
97 this.ignoreVirtual = ignoreVirtual;
101 startupTime -= System.nanoTime();
103 CollectionSupport cs = graph.getService(CollectionSupport.class);
106 status = cs.createMap(ExtentStatus.class);
107 predicates = cs.createSet();
108 exclusions = cs.createSet();
109 exclusionFunction = conf.exclusionFunction;
110 fringe = new TreeSet<Resource>();
111 isRelatedToPredicates = cs.createMap(Boolean.class);
112 deadPredicates = cs.createSet();
113 strongInverseSet = cs.createSet();
115 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
116 status.put(entry.getKey(), entry.getValue());
117 if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
118 if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
122 startupTime += System.nanoTime();
124 // for(RootSpec p : conf.roots) {
126 // fringe.add(p.resource);
131 public ResourceMap<ExtentStatus> getStatus() {
135 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
137 CollectionSupport cs = graph.getService(CollectionSupport.class);
138 Collection<Resource> list = cs.createList();
139 Iterator<Resource> it = fringe.iterator();
140 for(int i=0;i<maxAmount;i++) {
141 if(!it.hasNext()) break;
146 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
150 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
153 expandTime -= System.nanoTime();
155 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
158 expandTime += System.nanoTime();
164 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
166 for(Resource predicate : schedule) {
168 Boolean isRelatedTo = Boolean.FALSE;
170 Resource single = graph.getPossibleSuperrelation(predicate);
173 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
174 if(singleIsRelatedTo == null) {
175 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
176 isRelatedToPredicates.put(single, singleIsRelatedTo);
179 isRelatedTo = singleIsRelatedTo;
183 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
184 isRelatedTo = Boolean.TRUE;
185 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
187 if (!graph.hasStatement(predicate)) {
188 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
189 deadPredicates.add(predicate);
190 // Prevents ModelTransferableGraphSource from
191 // trying to export these statements.
192 state.inverses.remove(support.getTransientId(predicate));
198 isRelatedToPredicates.put(predicate, isRelatedTo);
204 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
207 classifyPredicateTime -= System.nanoTime();
209 CollectionSupport cs = graph.getService(CollectionSupport.class);
210 final Set<Resource> schedule = cs.createSet();
211 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
213 for (DirectStatements stms : expansion) {
214 for(Statement stm : stms) {
216 Resource predicate = stm.getPredicate();
217 Resource object = stm.getObject();
219 if (exclusions.contains(object) || exclusions.contains(predicate))
222 if (exclusionFunction != null) {
223 ExclusionDecision decision = exclusionFunction.apply(stm);
224 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
225 status.put(object, ExtentStatus.EXCLUDED);
226 exclusions.add(object);
231 if(predicates.add(predicate)) {
232 Resource inverse = graph.getPossibleInverse(predicate);
233 schedule.add(predicate);
234 if(inverse != null) {
235 newPredicates.put(predicate, inverse);
236 if(predicates.add(inverse)) schedule.add(inverse);
237 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
238 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
239 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
241 state.inverses.put(support.getTransientId(predicate), 0);
242 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
250 classifyPredicates(graph, schedule, state);
252 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
253 // Inverse is strong => this has strong inverse
254 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
255 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
256 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
257 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
261 classifyPredicateTime += System.nanoTime();
266 * Composed objects are internal. Mark them for expansion.
269 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
270 Resource object = graph.getSingleObject(subject, L0.HasDataType);
271 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
274 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
275 final InputStream valueStream = tgs.getValueStream(graph, subject);
276 if (valueStream != null) {
277 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
278 state.valueOutput.writeInt(sId);
281 Datatype dt = getDatatype(graph, subject);
283 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
284 long rawVariantSizePos = 0;
285 state.valueOutput.writeByte(canWriteRawVariant
286 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
287 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
288 if (canWriteRawVariant) {
289 // Add space for raw variant byte size before the data
290 rawVariantSizePos = state.valueOutput.position();
291 state.valueOutput.writeInt(0);
294 byte[] typeBytes = bindings.get(dt);
295 if (typeBytes == null) {
296 typeBytes = datatypeSerializer.serialize(dt);
297 bindings.put(dt, typeBytes);
300 state.valueOutput.write(typeBytes);
301 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
302 s.skip(new InputStream() {
304 public int read() throws IOException {
305 int value = valueStream.read();
306 state.valueOutput.write(value);
311 if (canWriteRawVariant) {
312 long currentPos = state.valueOutput.position();
313 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
314 state.valueOutput.position(rawVariantSizePos);
315 state.valueOutput.writeInt(variantSize);
316 state.valueOutput.position(currentPos);
324 private TIntArrayList stream = new TIntArrayList();
326 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
327 stream.add(support.getTransientId(predicate));
328 stream.add(support.getTransientId(object));
331 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
333 Resource predicate = stm.getPredicate();
335 Resource object = stm.getObject();
337 ExtentStatus objectStatus = status.get(object);
340 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
341 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
343 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
345 addToStream(predicate, object);
347 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
348 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
355 if (deadPredicates.contains(predicate)) {
356 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
361 if(objectStatus == ExtentStatus.EXCLUDED) {
363 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
367 // The inverse is also weak (or there is no inverse)
368 if(!strongInverseSet.contains(predicate)) {
370 addToStream(predicate, object);
372 if(objectStatus == null) {
373 status.put(object, ExtentStatus.PENDING);
376 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
380 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
390 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
391 if(!stream.isEmpty()) {
392 state.statementsOutput.writeInt(sId);
393 int streamSize = stream.size();
394 int statementCount = stream.size() / 2;
395 state.statementsOutput.writeInt(statementCount);
396 for (int i = 0; i < streamSize; i++)
397 state.statementsOutput.writeInt(stream.getQuick(i));
398 state.statementCount += 2*streamSize;
403 // For progress monitor book-keeping
404 private long internalResourceNumber = 0;
405 private long startTime = 0;
406 private long lastUpdateTime = 0;
408 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
410 internalResourceNumber++;
412 // Update progress monitor with controlled frequency
413 long t = System.nanoTime();
414 long dt = t - lastUpdateTime;
415 if (dt > 200_000_000L) {
419 double totalTime = (t - startTime) * 1e-9;
421 long speed = Math.round((double)internalResourceNumber / totalTime);
422 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
426 status.put(subject, ExtentStatus.INTERNAL);
427 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
429 int sId = support.getTransientId(subject);
432 valueOutputTime -= System.nanoTime();
434 processValue(graph, subject, sId, state);
437 valueOutputTime += System.nanoTime();
440 statementOutputTime -= System.nanoTime();
442 for(Statement stm : stms) {
443 processStatement(graph, subject, stm);
446 flushStatementStream(sId, state);
449 statementOutputTime += System.nanoTime();
451 // Logarithmic progress monitor for unknown amount of work.
452 state.monitor.setWorkRemaining(100000);
453 state.monitor.worked(1);
456 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
459 processFringeTime -= System.nanoTime();
461 for (DirectStatements stms : expansion) {
463 Resource subject = stms.getSubject();
465 boolean partOf = false;
466 for(Statement stm : stms) {
467 Resource predicate = stm.getPredicate();
468 if(L0.PartOf.equals(predicate)) {
474 ExtentStatus subjectStatus = status.get(subject);
475 if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
476 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
477 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
479 status.put(subject, ExtentStatus.EXTERNAL);
480 if(ModelTransferableGraphSourceRequest.LOG) {
481 String uri = graph.getPossibleURI(subject);
482 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
483 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
488 processInternal(graph, subject, stms, state);
495 processFringeTime += System.nanoTime();
499 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
503 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
504 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
505 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
507 state.internalEntries = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual);
509 for(InternalEntry entry : state.internalEntries) {
510 Resource r = entry.resource;
511 if (status.put(r, ExtentStatus.INTERNAL) == null) {
512 if(ModelTransferableGraphSourceRequest.LOG) {
513 String URI = graph.getPossibleURI(r);
514 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
515 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
521 if (state.monitor.isCanceled())
522 throw new CancelTransactionException();
524 while(!fringe.isEmpty()) {
526 Collection<DirectStatements> expansion = expand(graph);
527 classifyPredicates(graph, state, expansion);
528 processFringe(graph, expansion, state);
530 if (state.monitor.isCanceled())
531 throw new CancelTransactionException();
534 if (ModelTransferableGraphSourceRequest.PROFILE) {
535 System.out.println(composedObjectCounter + " " + fastInternalCounter
536 + " " + parentExternalCounter + " "
537 + fullExternalCounter + " " + fullInternalCounter);
540 } catch (IOException e) {
541 throw new DatabaseException(e);
546 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
547 if(ModelTransferableGraphSourceRequest.LOG) {
548 SerialisationSupport support = graph.getService(SerialisationSupport.class);
549 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
550 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
551 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
552 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
556 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
557 if(ModelTransferableGraphSourceRequest.LOG) {
558 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
559 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
560 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
561 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);