]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java
7a0a5f06bee35eb5c4eebf87691dd8e8bdf97c3a
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.TreeSet;
12
13 import org.simantics.databoard.Bindings;
14 import org.simantics.databoard.Databoard;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.serialization.Serializer;
17 import org.simantics.databoard.type.Datatype;
18 import org.simantics.db.DirectStatements;
19 import org.simantics.db.ReadGraph;
20 import org.simantics.db.Resource;
21 import org.simantics.db.ResourceMap;
22 import org.simantics.db.Statement;
23 import org.simantics.db.common.primitiverequest.Value;
24 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
25 import org.simantics.db.common.utils.NameUtils;
26 import org.simantics.db.exception.CancelTransactionException;
27 import org.simantics.db.exception.DatabaseException;
28 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
29 import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry;
30 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
31 import org.simantics.db.service.CollectionSupport;
32 import org.simantics.db.service.SerialisationSupport;
33 import org.simantics.db.service.TransferableGraphSupport;
34 import org.simantics.graph.db.TransferableGraphSource;
35 import org.simantics.layer0.Layer0;
36 import org.simantics.scl.runtime.function.Function1;
37 import org.simantics.utils.datastructures.Pair;
38
39 import gnu.trove.list.array.TIntArrayList;
40 import gnu.trove.map.hash.TIntIntHashMap;
41
42 public class DomainProcessor3 {
43         
44         public enum ExclusionDecision {
45                 INCLUDE, EXCLUDE_OBJECT
46         }
47
48         final static private boolean PROFILE = false;
49
50     Serializer variantSerializer;
51     Serializer datatypeSerializer;
52     Binding datatypeBinding;
53     boolean ignoreVirtual;
54
55     int id = 0;
56
57     Set<Resource> fringe = null;
58     Set<Resource> exclusions = null;
59     Function1<Statement,ExclusionDecision> exclusionFunction = null;
60     Set<Resource> predicates = null;
61     Map<Resource,Boolean> isRelatedToPredicates = null;
62     Set<Resource> deadPredicates = null;
63     Set<Resource> strongInverseSet = null;
64
65     TIntIntHashMap ids = null;
66     ResourceMap<ExtentStatus> status = null;
67     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
68     final SerialisationSupport support;
69     final TransferableGraphConfiguration2 conf;
70     final TransferableGraphSupport tgs;
71
72     private Layer0 L0;
73
74     private long composedObjectCounter = 0;
75     private long fastInternalCounter = 0;
76     private long parentExternalCounter = 0;
77     private long fullInternalCounter = 0;
78     private long fullExternalCounter = 0;
79
80     long startupTime = 0;
81     long expandTime = 0;
82     long fullResolveTime = 0;
83     long fastResolveTime = 0;
84     long otherStatementTime = 0;
85     long parentResolveTime = 0;
86     long extentSeedTime = 0;
87     long classifyPredicateTime = 0;
88     long processFringeTime = 0;
89     long valueOutputTime = 0;
90     long statementOutputTime = 0;
91
92     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
93
94         this.L0 = Layer0.getInstance(graph);
95         this.tgs = graph.getService(TransferableGraphSupport.class);
96
97         this.support = graph.getService(SerialisationSupport.class);
98         this.ignoreVirtual = ignoreVirtual;
99         this.conf = conf;
100
101         if(PROFILE)
102                 startupTime -= System.nanoTime();
103
104         CollectionSupport cs = graph.getService(CollectionSupport.class);
105
106         ids = state.ids;
107         status = cs.createMap(ExtentStatus.class);
108         predicates = cs.createSet();
109         exclusions = cs.createSet();
110         exclusionFunction = conf.exclusionFunction;
111         fringe = new TreeSet<Resource>();
112         isRelatedToPredicates = cs.createMap(Boolean.class);
113         deadPredicates = cs.createSet();
114         strongInverseSet = cs.createSet();
115
116         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
117             status.put(entry.getKey(), entry.getValue());
118             if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
119             if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
120         }
121
122         if(PROFILE)
123                 startupTime += System.nanoTime();
124         
125 //        for(RootSpec p : conf.roots) {
126 //              if(p.internal)
127 //                      fringe.add(p.resource);
128 //        }
129
130     }
131     
132     public ResourceMap<ExtentStatus> getStatus() {
133         return status;
134     }
135
136     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
137
138         CollectionSupport cs = graph.getService(CollectionSupport.class);
139         Collection<Resource> list = cs.createList();
140         Iterator<Resource> it = fringe.iterator();
141         for(int i=0;i<maxAmount;i++) {
142             if(!it.hasNext()) break;
143             list.add(it.next());
144             it.remove();
145         }
146
147         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
148
149     }
150
151     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
152
153         if(PROFILE)
154                 expandTime -= System.nanoTime();
155
156         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
157
158         if(PROFILE)
159                 expandTime += System.nanoTime();
160
161         return result;
162
163     }
164
165     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
166
167         for(Resource predicate : schedule) {
168             
169             Boolean isRelatedTo = Boolean.FALSE;
170             
171             Resource single = graph.getPossibleSuperrelation(predicate);
172             if(single != null) {
173                 
174                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
175                 if(singleIsRelatedTo == null) {
176                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
177                         isRelatedToPredicates.put(single, singleIsRelatedTo);
178                 }
179
180                 isRelatedTo = singleIsRelatedTo;
181                 
182             } else {
183                 
184                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
185                         isRelatedTo = Boolean.TRUE;
186                     if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
187                 } else {
188                     if (!graph.hasStatement(predicate)) {
189                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
190                         deadPredicates.add(predicate);
191                         // Prevents ModelTransferableGraphSource from
192                         // trying to export these statements.
193                         state.inverses.remove(support.getTransientId(predicate));
194                     }
195                 }
196                 
197             }
198             
199                 isRelatedToPredicates.put(predicate, isRelatedTo);
200
201         }
202
203     }
204
205     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
206
207         if(PROFILE)
208                 classifyPredicateTime -= System.nanoTime();
209
210         CollectionSupport cs = graph.getService(CollectionSupport.class);
211         final Set<Resource> schedule = cs.createSet();
212         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
213
214         for (DirectStatements stms : expansion) {
215             for(Statement stm : stms) {
216
217                 Resource predicate = stm.getPredicate();
218                 Resource object = stm.getObject();
219
220                 if (exclusions.contains(object) || exclusions.contains(predicate))
221                     continue;
222                 
223                 if (exclusionFunction != null) {
224                         ExclusionDecision decision = exclusionFunction.apply(stm);
225                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
226                                 status.put(object, ExtentStatus.EXCLUDED);
227                                 exclusions.add(object);
228                                 continue;
229                         }
230                 }
231
232                 if(predicates.add(predicate)) {
233                     Resource inverse = graph.getPossibleInverse(predicate);
234                     schedule.add(predicate);
235                     if(inverse != null) {
236                         newPredicates.put(predicate, inverse);
237                         if(predicates.add(inverse)) schedule.add(inverse);
238                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
239                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
240                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
241                     } else {
242                         state.inverses.put(support.getTransientId(predicate), 0);
243                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
244                     }
245
246                 }
247
248             }
249         }
250
251         classifyPredicates(graph, schedule, state);
252
253         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
254             // Inverse is strong => this has strong inverse
255                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
256                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
257                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
258                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
259         }
260
261         if(PROFILE)
262                 classifyPredicateTime += System.nanoTime();
263
264     }
265
266     /*
267      * Composed objects are internal. Mark them for expansion.
268      */
269
270     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
271         Resource object = graph.getSingleObject(subject, L0.HasDataType);
272         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
273     }
274
275     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
276         final InputStream valueStream = tgs.getValueStream(graph, subject);
277         if (valueStream != null) {
278             if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
279             state.valueOutput.writeInt(sId);
280
281             if (conf.values) {
282                 Datatype dt = getDatatype(graph, subject);
283
284                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
285                 long rawVariantSizePos = 0;
286                 state.valueOutput.writeByte(canWriteRawVariant
287                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
288                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
289                 if (canWriteRawVariant) {
290                     // Add space for raw variant byte size before the data
291                     rawVariantSizePos = state.valueOutput.position();
292                     state.valueOutput.writeInt(0);
293                 }
294
295                 byte[] typeBytes = bindings.get(dt);
296                 if (typeBytes == null) {
297                     typeBytes = datatypeSerializer.serialize(dt);
298                     bindings.put(dt, typeBytes);
299                 }
300
301                 state.valueOutput.write(typeBytes);
302                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
303                 s.skip(new InputStream() {
304                     @Override
305                     public int read() throws IOException {
306                         int value = valueStream.read();
307                         state.valueOutput.write(value);
308                         return value;
309                     }
310                 });
311
312                 if (canWriteRawVariant) {
313                     long currentPos = state.valueOutput.position();
314                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
315                     state.valueOutput.position(rawVariantSizePos);
316                     state.valueOutput.writeInt(variantSize);
317                     state.valueOutput.position(currentPos);
318                 }
319             }
320
321             state.valueCount++;
322         }
323     }
324
325     private TIntArrayList stream = new TIntArrayList();
326
327     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
328         stream.add(support.getTransientId(predicate));
329         stream.add(support.getTransientId(object));
330     }
331
332     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
333
334         Resource predicate = stm.getPredicate();
335
336         Resource object = stm.getObject();
337
338         ExtentStatus objectStatus = status.get(object);
339
340         // Strong predicate
341         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
342         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
343
344             if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
345
346             addToStream(predicate, object);
347
348             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
349                 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
350                 fringe.add(object);
351             }
352
353         } else {
354
355             // Dead predicate
356             if (deadPredicates.contains(predicate)) {
357                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
358                 return;
359             }
360
361             // Weak predicate
362             if(objectStatus == ExtentStatus.EXCLUDED) {
363
364                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
365
366             } else {
367
368                 // The inverse is also weak (or there is no inverse)
369                 if(!strongInverseSet.contains(predicate)) {
370
371                     addToStream(predicate, object);
372
373                     if(objectStatus == null) {
374                         status.put(object, ExtentStatus.PENDING);
375                     }
376
377                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
378
379                 } else {
380
381                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
382
383                 }
384
385             }
386
387         }
388         
389     }
390     
391     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
392         if(!stream.isEmpty()) {
393             state.statementsOutput.writeInt(sId);
394             int streamSize = stream.size();
395             int statementCount = stream.size() / 2;
396             state.statementsOutput.writeInt(statementCount);
397             for (int i = 0; i < streamSize; i++)
398                 state.statementsOutput.writeInt(stream.getQuick(i));
399             state.statementCount += 2*streamSize;
400             stream.resetQuick();
401         }
402     }
403
404     // For progress monitor book-keeping
405     private long internalResourceNumber = 0;
406     private long startTime = 0;
407     private long lastUpdateTime = 0;
408
409     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
410
411         internalResourceNumber++;
412
413         // Update progress monitor with controlled frequency
414         long t = System.nanoTime();
415         long dt = t - lastUpdateTime;
416         if (dt > 200_000_000L) {
417             if (startTime == 0)
418                 startTime = t;
419             lastUpdateTime = t;
420             double totalTime = (t - startTime) * 1e-9;
421             if (totalTime > 0) {
422                 long speed = Math.round((double)internalResourceNumber / totalTime);
423                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
424             }
425         }
426
427         status.put(subject, ExtentStatus.INTERNAL);
428         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
429
430         int sId = support.getTransientId(subject);
431
432         if(PROFILE)
433                 valueOutputTime -= System.nanoTime();
434
435         processValue(graph, subject, sId, state);
436
437         if(PROFILE)
438                 valueOutputTime += System.nanoTime();
439
440         if(PROFILE)
441                 statementOutputTime -= System.nanoTime();
442
443         for(Statement stm : stms) {
444                 processStatement(graph, subject, stm);
445         }
446
447         flushStatementStream(sId, state);
448
449         if(PROFILE)
450                 statementOutputTime += System.nanoTime();
451
452         // Logarithmic progress monitor for unknown amount of work.
453         state.monitor.setWorkRemaining(100000);
454         state.monitor.worked(1);
455     }
456
457     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
458
459         if(PROFILE)
460                 processFringeTime -= System.nanoTime();
461
462         for (DirectStatements stms : expansion) {
463
464             Resource subject = stms.getSubject();
465
466             boolean partOf = false;
467             for(Statement stm : stms) {
468                 Resource predicate = stm.getPredicate();
469                 if(L0.PartOf.equals(predicate)) {
470                     partOf = true;
471                     break;
472                 }
473             }
474
475             ExtentStatus subjectStatus = status.get(subject);
476             if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
477             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
478             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
479
480                 status.put(subject, ExtentStatus.EXTERNAL);
481                 if(ModelTransferableGraphSourceRequest.LOG) {
482                     String uri = graph.getPossibleURI(subject);
483                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
484                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
485                 }
486
487             } else {
488
489                 processInternal(graph, subject, stms, state);
490                 
491             }
492
493         }
494
495         if(PROFILE)
496                 processFringeTime += System.nanoTime();
497
498     }
499
500     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
501
502         try {
503
504             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
505             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
506             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
507
508             Pair<List<InternalEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual); 
509             state.internalEntries = pair.first;
510             
511             for(InternalEntry entry : state.internalEntries) {
512                 Resource r = entry.resource;
513                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
514                     if(ModelTransferableGraphSourceRequest.LOG) {
515                         String URI = graph.getPossibleURI(r);
516                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
517                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
518                     }
519                     fringe.add(r);
520                 }
521             }
522
523             for(Resource unnamedChild : pair.second) {
524                 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
525                     fringe.add(unnamedChild);
526                 }
527             }
528             
529             if (state.monitor.isCanceled())
530                 throw new CancelTransactionException();
531
532             while(!fringe.isEmpty()) {
533
534                 Collection<DirectStatements> expansion = expand(graph);
535                 classifyPredicates(graph, state, expansion);
536                 processFringe(graph, expansion, state);
537
538                 if (state.monitor.isCanceled())
539                     throw new CancelTransactionException();
540             }
541
542             if (ModelTransferableGraphSourceRequest.PROFILE) {
543                 System.out.println(composedObjectCounter + " " + fastInternalCounter
544                         + " " + parentExternalCounter + " "
545                         + fullExternalCounter + " " + fullInternalCounter);
546             }
547
548         } catch (IOException e) {
549             throw new DatabaseException(e);
550         }
551
552     }
553
554     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
555         if(ModelTransferableGraphSourceRequest.LOG) {
556             SerialisationSupport support = graph.getService(SerialisationSupport.class);
557             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
558             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
559             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
560             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
561         }
562     }
563
564     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
565         if(ModelTransferableGraphSourceRequest.LOG) {
566             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
567             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
568             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
569             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
570         }
571     }
572
573 }