1 package org.simantics.db.layer0.util;
\r
3 import java.io.IOException;
\r
4 import java.io.InputStream;
\r
5 import java.util.Collection;
\r
6 import java.util.HashMap;
\r
7 import java.util.Iterator;
\r
8 import java.util.Map;
\r
9 import java.util.Set;
\r
10 import java.util.TreeSet;
\r
12 import org.simantics.databoard.Bindings;
\r
13 import org.simantics.databoard.Databoard;
\r
14 import org.simantics.databoard.binding.Binding;
\r
15 import org.simantics.databoard.serialization.Serializer;
\r
16 import org.simantics.databoard.type.Datatype;
\r
17 import org.simantics.db.DirectStatements;
\r
18 import org.simantics.db.ReadGraph;
\r
19 import org.simantics.db.Resource;
\r
20 import org.simantics.db.ResourceMap;
\r
21 import org.simantics.db.Statement;
\r
22 import org.simantics.db.common.primitiverequest.Value;
\r
23 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
\r
24 import org.simantics.db.common.utils.NameUtils;
\r
25 import org.simantics.db.exception.CancelTransactionException;
\r
26 import org.simantics.db.exception.DatabaseException;
\r
27 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
\r
28 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
\r
29 import org.simantics.db.service.CollectionSupport;
\r
30 import org.simantics.db.service.SerialisationSupport;
\r
31 import org.simantics.db.service.TransferableGraphSupport;
\r
32 import org.simantics.graph.db.TransferableGraphSource;
\r
33 import org.simantics.layer0.Layer0;
\r
34 import org.simantics.scl.runtime.function.Function1;
\r
36 import gnu.trove.list.array.TIntArrayList;
\r
37 import gnu.trove.map.hash.TIntIntHashMap;
\r
39 public class DomainProcessor3 {
\r
41 public enum ExclusionDecision {
\r
42 INCLUDE, EXCLUDE_OBJECT
\r
45 final static private boolean PROFILE = false;
\r
47 Serializer variantSerializer;
\r
48 Serializer datatypeSerializer;
\r
49 Binding datatypeBinding;
\r
50 boolean ignoreVirtual;
\r
54 Set<Resource> fringe = null;
\r
55 Set<Resource> exclusions = null;
\r
56 Function1<Statement,ExclusionDecision> exclusionFunction = null;
\r
57 Set<Resource> predicates = null;
\r
58 Map<Resource,Boolean> isRelatedToPredicates = null;
\r
59 Set<Resource> deadPredicates = null;
\r
60 Set<Resource> strongInverseSet = null;
\r
62 TIntIntHashMap ids = null;
\r
63 ResourceMap<ExtentStatus> status = null;
\r
64 Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
\r
65 final SerialisationSupport support;
\r
66 final TransferableGraphConfiguration2 conf;
\r
67 final TransferableGraphSupport tgs;
\r
71 private long composedObjectCounter = 0;
\r
72 private long fastInternalCounter = 0;
\r
73 private long parentExternalCounter = 0;
\r
74 private long fullInternalCounter = 0;
\r
75 private long fullExternalCounter = 0;
\r
77 long startupTime = 0;
\r
78 long expandTime = 0;
\r
79 long fullResolveTime = 0;
\r
80 long fastResolveTime = 0;
\r
81 long otherStatementTime = 0;
\r
82 long parentResolveTime = 0;
\r
83 long extentSeedTime = 0;
\r
84 long classifyPredicateTime = 0;
\r
85 long processFringeTime = 0;
\r
86 long valueOutputTime = 0;
\r
87 long statementOutputTime = 0;
\r
89 public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
\r
91 this.L0 = Layer0.getInstance(graph);
\r
92 this.tgs = graph.getService(TransferableGraphSupport.class);
\r
94 this.support = graph.getService(SerialisationSupport.class);
\r
95 this.ignoreVirtual = ignoreVirtual;
\r
99 startupTime -= System.nanoTime();
\r
101 CollectionSupport cs = graph.getService(CollectionSupport.class);
\r
104 status = cs.createMap(ExtentStatus.class);
\r
105 predicates = cs.createSet();
\r
106 exclusions = cs.createSet();
\r
107 exclusionFunction = conf.exclusionFunction;
\r
108 fringe = new TreeSet<Resource>();
\r
109 isRelatedToPredicates = cs.createMap(Boolean.class);
\r
110 deadPredicates = cs.createSet();
\r
111 strongInverseSet = cs.createSet();
\r
113 for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
\r
114 status.put(entry.getKey(), entry.getValue());
\r
115 if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
\r
116 if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
\r
120 startupTime += System.nanoTime();
\r
122 // for(RootSpec p : conf.roots) {
\r
124 // fringe.add(p.resource);
\r
129 public ResourceMap<ExtentStatus> getStatus() {
\r
133 public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
\r
135 CollectionSupport cs = graph.getService(CollectionSupport.class);
\r
136 Collection<Resource> list = cs.createList();
\r
137 Iterator<Resource> it = fringe.iterator();
\r
138 for(int i=0;i<maxAmount;i++) {
\r
139 if(!it.hasNext()) break;
\r
140 list.add(it.next());
\r
144 return graph.syncRequest(new Expansion3(list, ignoreVirtual));
\r
148 public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
\r
151 expandTime -= System.nanoTime();
\r
153 Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
\r
156 expandTime += System.nanoTime();
\r
162 public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
\r
164 for(Resource predicate : schedule) {
\r
166 Boolean isRelatedTo = Boolean.FALSE;
\r
168 Resource single = graph.getPossibleSuperrelation(predicate);
\r
169 if(single != null) {
\r
171 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
\r
172 if(singleIsRelatedTo == null) {
\r
173 singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
\r
174 isRelatedToPredicates.put(single, singleIsRelatedTo);
\r
177 isRelatedTo = singleIsRelatedTo;
\r
181 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
\r
182 isRelatedTo = Boolean.TRUE;
\r
183 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
\r
185 if (!graph.hasStatement(predicate)) {
\r
186 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
\r
187 deadPredicates.add(predicate);
\r
188 // Prevents ModelTransferableGraphSource from
\r
189 // trying to export these statements.
\r
190 state.inverses.remove(support.getTransientId(predicate));
\r
196 isRelatedToPredicates.put(predicate, isRelatedTo);
\r
202 public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
\r
205 classifyPredicateTime -= System.nanoTime();
\r
207 CollectionSupport cs = graph.getService(CollectionSupport.class);
\r
208 final Set<Resource> schedule = cs.createSet();
\r
209 final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
\r
211 for (DirectStatements stms : expansion) {
\r
212 for(Statement stm : stms) {
\r
214 Resource predicate = stm.getPredicate();
\r
215 Resource object = stm.getObject();
\r
217 if (exclusions.contains(object) || exclusions.contains(predicate))
\r
220 if (exclusionFunction != null) {
\r
221 ExclusionDecision decision = exclusionFunction.apply(stm);
\r
222 if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
\r
223 status.put(object, ExtentStatus.EXCLUDED);
\r
224 exclusions.add(object);
\r
229 if(predicates.add(predicate)) {
\r
230 Resource inverse = graph.getPossibleInverse(predicate);
\r
231 schedule.add(predicate);
\r
232 if(inverse != null) {
\r
233 newPredicates.put(predicate, inverse);
\r
234 if(predicates.add(inverse)) schedule.add(inverse);
\r
235 state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
\r
236 state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
\r
237 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
\r
239 state.inverses.put(support.getTransientId(predicate), 0);
\r
240 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
\r
248 classifyPredicates(graph, schedule, state);
\r
250 for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
\r
251 // Inverse is strong => this has strong inverse
\r
252 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
\r
253 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
\r
254 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
\r
255 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
\r
259 classifyPredicateTime += System.nanoTime();
\r
264 * Composed objects are internal. Mark them for expansion.
\r
267 private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
\r
268 Resource object = graph.getSingleObject(subject, L0.HasDataType);
\r
269 return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
\r
272 public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
\r
273 final InputStream valueStream = tgs.getValueStream(graph, subject);
\r
274 if (valueStream != null) {
\r
275 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
\r
276 state.valueOutput.writeInt(sId);
\r
279 Datatype dt = getDatatype(graph, subject);
\r
281 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
\r
282 long rawVariantSizePos = 0;
\r
283 state.valueOutput.writeByte(canWriteRawVariant
\r
284 ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
\r
285 : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
\r
286 if (canWriteRawVariant) {
\r
287 // Add space for raw variant byte size before the data
\r
288 rawVariantSizePos = state.valueOutput.position();
\r
289 state.valueOutput.writeInt(0);
\r
292 byte[] typeBytes = bindings.get(dt);
\r
293 if (typeBytes == null) {
\r
294 typeBytes = datatypeSerializer.serialize(dt);
\r
295 bindings.put(dt, typeBytes);
\r
298 state.valueOutput.write(typeBytes);
\r
299 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
\r
300 s.skip(new InputStream() {
\r
302 public int read() throws IOException {
\r
303 int value = valueStream.read();
\r
304 state.valueOutput.write(value);
\r
309 if (canWriteRawVariant) {
\r
310 long currentPos = state.valueOutput.position();
\r
311 int variantSize = (int)(currentPos - rawVariantSizePos - 4);
\r
312 state.valueOutput.position(rawVariantSizePos);
\r
313 state.valueOutput.writeInt(variantSize);
\r
314 state.valueOutput.position(currentPos);
\r
318 state.valueCount++;
\r
322 private TIntArrayList stream = new TIntArrayList();
\r
324 public void addToStream(Resource predicate, Resource object) throws DatabaseException {
\r
325 stream.add(support.getTransientId(predicate));
\r
326 stream.add(support.getTransientId(object));
\r
329 public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
\r
331 Resource predicate = stm.getPredicate();
\r
333 Resource object = stm.getObject();
\r
335 ExtentStatus objectStatus = status.get(object);
\r
337 // Strong predicate
\r
338 Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
\r
339 if ((objectStatus != ExtentStatus.EXCLUDED) && isRelatedTo) {
\r
341 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
\r
343 addToStream(predicate, object);
\r
345 if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
\r
346 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
\r
347 fringe.add(object);
\r
353 if (deadPredicates.contains(predicate)) {
\r
354 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
\r
359 if(objectStatus == ExtentStatus.EXCLUDED) {
\r
361 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
\r
365 // The inverse is also weak (or there is no inverse)
\r
366 if(!strongInverseSet.contains(predicate)) {
\r
368 addToStream(predicate, object);
\r
370 if(objectStatus == null) {
\r
371 status.put(object, ExtentStatus.PENDING);
\r
374 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
\r
378 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
\r
388 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
\r
389 if(!stream.isEmpty()) {
\r
390 state.statementsOutput.writeInt(sId);
\r
391 int streamSize = stream.size();
\r
392 int statementCount = stream.size() / 2;
\r
393 state.statementsOutput.writeInt(statementCount);
\r
394 for (int i = 0; i < streamSize; i++)
\r
395 state.statementsOutput.writeInt(stream.getQuick(i));
\r
396 state.statementCount += 2*streamSize;
\r
397 stream.resetQuick();
\r
401 // For progress monitor book-keeping
\r
402 private long internalResourceNumber = 0;
\r
403 private long startTime = 0;
\r
404 private long lastUpdateTime = 0;
\r
406 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
\r
408 internalResourceNumber++;
\r
410 // Update progress monitor with controlled frequency
\r
411 long t = System.nanoTime();
\r
412 long dt = t - lastUpdateTime;
\r
413 if (dt > 200_000_000L) {
\r
414 if (startTime == 0)
\r
416 lastUpdateTime = t;
\r
417 double totalTime = (t - startTime) * 1e-9;
\r
418 if (totalTime > 0) {
\r
419 long speed = Math.round((double)internalResourceNumber / totalTime);
\r
420 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
\r
424 status.put(subject, ExtentStatus.INTERNAL);
\r
425 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
\r
427 int sId = support.getTransientId(subject);
\r
430 valueOutputTime -= System.nanoTime();
\r
432 processValue(graph, subject, sId, state);
\r
435 valueOutputTime += System.nanoTime();
\r
438 statementOutputTime -= System.nanoTime();
\r
440 for(Statement stm : stms) {
\r
441 processStatement(graph, subject, stm);
\r
444 flushStatementStream(sId, state);
\r
447 statementOutputTime += System.nanoTime();
\r
449 // Logarithmic progress monitor for unknown amount of work.
\r
450 state.monitor.setWorkRemaining(100000);
\r
451 state.monitor.worked(1);
\r
454 public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
\r
457 processFringeTime -= System.nanoTime();
\r
459 for (DirectStatements stms : expansion) {
\r
461 Resource subject = stms.getSubject();
\r
463 boolean partOf = false;
\r
464 for(Statement stm : stms) {
\r
465 Resource predicate = stm.getPredicate();
\r
466 if(L0.PartOf.equals(predicate)) {
\r
472 ExtentStatus subjectStatus = status.get(subject);
\r
473 if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
\r
474 if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
\r
475 if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
\r
477 status.put(subject, ExtentStatus.EXTERNAL);
\r
478 if(ModelTransferableGraphSourceRequest.LOG) {
\r
479 String uri = graph.getPossibleURI(subject);
\r
480 if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
\r
481 else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
\r
486 processInternal(graph, subject, stms, state);
\r
493 processFringeTime += System.nanoTime();
\r
497 public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
\r
501 this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
\r
502 this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
\r
503 this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
\r
505 for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) {
\r
506 if (status.put(r, ExtentStatus.INTERNAL) == null) {
\r
507 if(ModelTransferableGraphSourceRequest.LOG) {
\r
508 String URI = graph.getPossibleURI(r);
\r
509 if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
\r
510 else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
\r
516 if (state.monitor.isCanceled())
\r
517 throw new CancelTransactionException();
\r
519 while(!fringe.isEmpty()) {
\r
521 Collection<DirectStatements> expansion = expand(graph);
\r
522 classifyPredicates(graph, state, expansion);
\r
523 processFringe(graph, expansion, state);
\r
525 if (state.monitor.isCanceled())
\r
526 throw new CancelTransactionException();
\r
529 if (ModelTransferableGraphSourceRequest.PROFILE) {
\r
530 System.out.println(composedObjectCounter + " " + fastInternalCounter
\r
531 + " " + parentExternalCounter + " "
\r
532 + fullExternalCounter + " " + fullInternalCounter);
\r
535 } catch (IOException e) {
\r
536 throw new DatabaseException(e);
\r
541 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
\r
542 if(ModelTransferableGraphSourceRequest.LOG) {
\r
543 SerialisationSupport support = graph.getService(SerialisationSupport.class);
\r
544 String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
\r
545 String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
\r
546 String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
\r
547 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
\r
551 void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
\r
552 if(ModelTransferableGraphSourceRequest.LOG) {
\r
553 String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
\r
554 String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
\r
555 String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
\r
556 ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
\r