1 package org.simantics.db.layer0.util;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
10 import java.util.TreeSet;
12 import org.simantics.databoard.Bindings;
13 import org.simantics.databoard.Databoard;
14 import org.simantics.databoard.binding.Binding;
15 import org.simantics.databoard.serialization.Serializer;
16 import org.simantics.databoard.type.Datatype;
17 import org.simantics.db.DirectStatements;
18 import org.simantics.db.ReadGraph;
19 import org.simantics.db.Resource;
20 import org.simantics.db.ResourceMap;
21 import org.simantics.db.Statement;
22 import org.simantics.db.common.primitiverequest.Value;
23 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
24 import org.simantics.db.common.utils.NameUtils;
25 import org.simantics.db.exception.CancelTransactionException;
26 import org.simantics.db.exception.DatabaseException;
27 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
28 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
29 import org.simantics.db.service.CollectionSupport;
30 import org.simantics.db.service.SerialisationSupport;
31 import org.simantics.db.service.TransferableGraphSupport;
32 import org.simantics.graph.db.TransferableGraphSource;
33 import org.simantics.layer0.Layer0;
34 import org.simantics.scl.runtime.function.Function1;
36 import gnu.trove.list.array.TIntArrayList;
37 import gnu.trove.map.hash.TIntIntHashMap;
39 public class DomainProcessor3 {
41 public enum ExclusionDecision {
42 INCLUDE, EXCLUDE_OBJECT
45 final static private boolean PROFILE = false;
47 Serializer variantSerializer;
48 Serializer datatypeSerializer;
49 Binding datatypeBinding;
50 boolean ignoreVirtual;
54 Set<Resource> fringe = null;
55 Set<Resource> exclusions = null;
56 Function1<Statement,ExclusionDecision> exclusionFunction = null;
57 Set<Resource> predicates = null;
58 Map<Resource,Boolean> isRelatedToPredicates = null;
59 Set<Resource> deadPredicates = null;
60 Set<Resource> strongInverseSet = null;
62 TIntIntHashMap ids = null;
63 ResourceMap<ExtentStatus> status = null;
64 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
65 final SerialisationSupport support;
66 final TransferableGraphConfiguration2 conf;
67 final TransferableGraphSupport tgs;
71 private long composedObjectCounter = 0;
72 private long fastInternalCounter = 0;
73 private long parentExternalCounter = 0;
74 private long fullInternalCounter = 0;
75 private long fullExternalCounter = 0;
79 long fullResolveTime = 0;
80 long fastResolveTime = 0;
81 long otherStatementTime = 0;
82 long parentResolveTime = 0;
83 long extentSeedTime = 0;
84 long classifyPredicateTime = 0;
85 long processFringeTime = 0;
86 long valueOutputTime = 0;
87 long statementOutputTime = 0;
89 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
91 this.L0 = Layer0.getInstance(graph);
92 this.tgs = graph.getService(TransferableGraphSupport.class);
94 this.support = graph.getService(SerialisationSupport.class);
95 this.ignoreVirtual = ignoreVirtual;
99 startupTime -= System.nanoTime();
101 CollectionSupport cs = graph.getService(CollectionSupport.class);
104 status = cs.createMap(ExtentStatus.class);
105 predicates = cs.createSet();
106 exclusions = cs.createSet();
107 exclusionFunction = conf.exclusionFunction;
108 fringe = new TreeSet<Resource>();
109 isRelatedToPredicates = cs.createMap(Boolean.class);
110 deadPredicates = cs.createSet();
111 strongInverseSet = cs.createSet();
113 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
114 status.put(entry.getKey(), entry.getValue());
115 if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
116 if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
120 startupTime += System.nanoTime();
122 // for(RootSpec p : conf.roots) {
124 // fringe.add(p.resource);
129 public ResourceMap<ExtentStatus> getStatus() {
133 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
135 CollectionSupport cs = graph.getService(CollectionSupport.class);
136 Collection<Resource> list = cs.createList();
137 Iterator<Resource> it = fringe.iterator();
138 for(int i=0;i<maxAmount;i++) {
139 if(!it.hasNext()) break;
144 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
148 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
151 expandTime -= System.nanoTime();
153 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
156 expandTime += System.nanoTime();
162 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
164 for(Resource predicate : schedule) {
166 Boolean isRelatedTo = Boolean.FALSE;
168 Resource single = graph.getPossibleSuperrelation(predicate);
171 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
172 if(singleIsRelatedTo == null) {
173 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
174 isRelatedToPredicates.put(single, singleIsRelatedTo);
177 isRelatedTo = singleIsRelatedTo;
181 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
182 isRelatedTo = Boolean.TRUE;
183 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
185 if (!graph.hasStatement(predicate)) {
186 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
187 deadPredicates.add(predicate);
188 // Prevents ModelTransferableGraphSource from
189 // trying to export these statements.
190 state.inverses.remove(support.getTransientId(predicate));
196 isRelatedToPredicates.put(predicate, isRelatedTo);
202 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
205 classifyPredicateTime -= System.nanoTime();
207 CollectionSupport cs = graph.getService(CollectionSupport.class);
208 final Set<Resource> schedule = cs.createSet();
209 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
211 for (DirectStatements stms : expansion) {
212 for(Statement stm : stms) {
214 Resource predicate = stm.getPredicate();
215 Resource object = stm.getObject();
217 if (exclusions.contains(object) || exclusions.contains(predicate))
220 if (exclusionFunction != null) {
221 ExclusionDecision decision = exclusionFunction.apply(stm);
222 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
223 status.put(object, ExtentStatus.EXCLUDED);
224 exclusions.add(object);
229 if(predicates.add(predicate)) {
230 Resource inverse = graph.getPossibleInverse(predicate);
231 schedule.add(predicate);
232 if(inverse != null) {
233 newPredicates.put(predicate, inverse);
234 if(predicates.add(inverse)) schedule.add(inverse);
235 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
236 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
237 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
239 state.inverses.put(support.getTransientId(predicate), 0);
240 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
248 classifyPredicates(graph, schedule, state);
250 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
251 // Inverse is strong => this has strong inverse
252 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
253 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
254 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
255 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
259 classifyPredicateTime += System.nanoTime();
264 * Composed objects are internal. Mark them for expansion.
267 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
268 Resource object = graph.getSingleObject(subject, L0.HasDataType);
269 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
272 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
273 final InputStream valueStream = tgs.getValueStream(graph, subject);
274 if (valueStream != null) {
275 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
276 state.valueOutput.writeInt(sId);
279 Datatype dt = getDatatype(graph, subject);
281 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
282 long rawVariantSizePos = 0;
283 state.valueOutput.writeByte(canWriteRawVariant
284 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
285 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
286 if (canWriteRawVariant) {
287 // Add space for raw variant byte size before the data
288 rawVariantSizePos = state.valueOutput.position();
289 state.valueOutput.writeInt(0);
292 byte[] typeBytes = bindings.get(dt);
293 if (typeBytes == null) {
294 typeBytes = datatypeSerializer.serialize(dt);
295 bindings.put(dt, typeBytes);
298 state.valueOutput.write(typeBytes);
299 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
300 s.skip(new InputStream() {
302 public int read() throws IOException {
303 int value = valueStream.read();
304 state.valueOutput.write(value);
309 if (canWriteRawVariant) {
310 long currentPos = state.valueOutput.position();
311 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
312 state.valueOutput.position(rawVariantSizePos);
313 state.valueOutput.writeInt(variantSize);
314 state.valueOutput.position(currentPos);
322 private TIntArrayList stream = new TIntArrayList();
324 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
325 stream.add(support.getTransientId(predicate));
326 stream.add(support.getTransientId(object));
329 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
331 Resource predicate = stm.getPredicate();
333 Resource object = stm.getObject();
335 ExtentStatus objectStatus = status.get(object);
338 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
339 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
341 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
343 addToStream(predicate, object);
345 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
346 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
353 if (deadPredicates.contains(predicate)) {
354 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
359 if(objectStatus == ExtentStatus.EXCLUDED) {
361 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
365 // The inverse is also weak (or there is no inverse)
366 if(!strongInverseSet.contains(predicate)) {
368 addToStream(predicate, object);
370 if(objectStatus == null) {
371 status.put(object, ExtentStatus.PENDING);
374 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
378 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
388 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
389 if(!stream.isEmpty()) {
390 state.statementsOutput.writeInt(sId);
391 int streamSize = stream.size();
392 int statementCount = stream.size() / 2;
393 state.statementsOutput.writeInt(statementCount);
394 for (int i = 0; i < streamSize; i++)
395 state.statementsOutput.writeInt(stream.getQuick(i));
396 state.statementCount += 2*streamSize;
401 // For progress monitor book-keeping
402 private long internalResourceNumber = 0;
403 private long startTime = 0;
404 private long lastUpdateTime = 0;
406 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
408 internalResourceNumber++;
410 // Update progress monitor with controlled frequency
411 long t = System.nanoTime();
412 long dt = t - lastUpdateTime;
413 if (dt > 200_000_000L) {
417 double totalTime = (t - startTime) * 1e-9;
419 long speed = Math.round((double)internalResourceNumber / totalTime);
420 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
424 status.put(subject, ExtentStatus.INTERNAL);
425 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
427 int sId = support.getTransientId(subject);
430 valueOutputTime -= System.nanoTime();
432 processValue(graph, subject, sId, state);
435 valueOutputTime += System.nanoTime();
438 statementOutputTime -= System.nanoTime();
440 for(Statement stm : stms) {
441 processStatement(graph, subject, stm);
444 flushStatementStream(sId, state);
447 statementOutputTime += System.nanoTime();
449 // Logarithmic progress monitor for unknown amount of work.
450 state.monitor.setWorkRemaining(100000);
451 state.monitor.worked(1);
454 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
457 processFringeTime -= System.nanoTime();
459 for (DirectStatements stms : expansion) {
461 Resource subject = stms.getSubject();
463 boolean partOf = false;
464 for(Statement stm : stms) {
465 Resource predicate = stm.getPredicate();
466 if(L0.PartOf.equals(predicate)) {
472 ExtentStatus subjectStatus = status.get(subject);
473 if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
474 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
475 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
477 status.put(subject, ExtentStatus.EXTERNAL);
478 if(ModelTransferableGraphSourceRequest.LOG) {
479 String uri = graph.getPossibleURI(subject);
480 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
481 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
486 processInternal(graph, subject, stms, state);
493 processFringeTime += System.nanoTime();
497 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
501 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
502 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
503 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
505 for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) {
506 if (status.put(r, ExtentStatus.INTERNAL) == null) {
507 if(ModelTransferableGraphSourceRequest.LOG) {
508 String URI = graph.getPossibleURI(r);
509 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
510 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
516 if (state.monitor.isCanceled())
517 throw new CancelTransactionException();
519 while(!fringe.isEmpty()) {
521 Collection<DirectStatements> expansion = expand(graph);
522 classifyPredicates(graph, state, expansion);
523 processFringe(graph, expansion, state);
525 if (state.monitor.isCanceled())
526 throw new CancelTransactionException();
529 if (ModelTransferableGraphSourceRequest.PROFILE) {
530 System.out.println(composedObjectCounter + " " + fastInternalCounter
531 + " " + parentExternalCounter + " "
532 + fullExternalCounter + " " + fullInternalCounter);
535 } catch (IOException e) {
536 throw new DatabaseException(e);
541 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
542 if(ModelTransferableGraphSourceRequest.LOG) {
543 SerialisationSupport support = graph.getService(SerialisationSupport.class);
544 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
545 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
546 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
547 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
551 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
552 if(ModelTransferableGraphSourceRequest.LOG) {
553 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
554 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
555 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
556 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);