]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java
b55f0a3d2d0b9c1ebc4e7b2627b8793f375fafd5
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
8 import java.util.Map;
9 import java.util.Set;
10 import java.util.TreeSet;
11
12 import org.simantics.databoard.Bindings;
13 import org.simantics.databoard.Databoard;
14 import org.simantics.databoard.binding.Binding;
15 import org.simantics.databoard.serialization.Serializer;
16 import org.simantics.databoard.type.Datatype;
17 import org.simantics.db.DirectStatements;
18 import org.simantics.db.ReadGraph;
19 import org.simantics.db.Resource;
20 import org.simantics.db.ResourceMap;
21 import org.simantics.db.Statement;
22 import org.simantics.db.common.primitiverequest.Value;
23 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
24 import org.simantics.db.common.utils.NameUtils;
25 import org.simantics.db.exception.CancelTransactionException;
26 import org.simantics.db.exception.DatabaseException;
27 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
28 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
29 import org.simantics.db.service.CollectionSupport;
30 import org.simantics.db.service.SerialisationSupport;
31 import org.simantics.db.service.TransferableGraphSupport;
32 import org.simantics.graph.db.TransferableGraphSource;
33 import org.simantics.layer0.Layer0;
34 import org.simantics.scl.runtime.function.Function1;
35
36 import gnu.trove.list.array.TIntArrayList;
37 import gnu.trove.map.hash.TIntIntHashMap;
38
39 public class DomainProcessor3 {
40         
41         public enum ExclusionDecision {
42                 INCLUDE, EXCLUDE_OBJECT
43         }
44
45         final static private boolean PROFILE = false;
46
47     Serializer variantSerializer;
48     Serializer datatypeSerializer;
49     Binding datatypeBinding;
50     boolean ignoreVirtual;
51
52     int id = 0;
53
54     Set<Resource> fringe = null;
55     Set<Resource> exclusions = null;
56     Function1<Statement,ExclusionDecision> exclusionFunction = null;
57     Set<Resource> predicates = null;
58     Map<Resource,Boolean> isRelatedToPredicates = null;
59     Set<Resource> deadPredicates = null;
60     Set<Resource> strongInverseSet = null;
61
62     TIntIntHashMap ids = null;
63     ResourceMap<ExtentStatus> status = null;
64     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
65     final SerialisationSupport support;
66     final TransferableGraphConfiguration2 conf;
67     final TransferableGraphSupport tgs;
68
69     private Layer0 L0;
70
71     private long composedObjectCounter = 0;
72     private long fastInternalCounter = 0;
73     private long parentExternalCounter = 0;
74     private long fullInternalCounter = 0;
75     private long fullExternalCounter = 0;
76
77     long startupTime = 0;
78     long expandTime = 0;
79     long fullResolveTime = 0;
80     long fastResolveTime = 0;
81     long otherStatementTime = 0;
82     long parentResolveTime = 0;
83     long extentSeedTime = 0;
84     long classifyPredicateTime = 0;
85     long processFringeTime = 0;
86     long valueOutputTime = 0;
87     long statementOutputTime = 0;
88
89     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
90
91         this.L0 = Layer0.getInstance(graph);
92         this.tgs = graph.getService(TransferableGraphSupport.class);
93
94         this.support = graph.getService(SerialisationSupport.class);
95         this.ignoreVirtual = ignoreVirtual;
96         this.conf = conf;
97
98         if(PROFILE)
99                 startupTime -= System.nanoTime();
100
101         CollectionSupport cs = graph.getService(CollectionSupport.class);
102
103         ids = state.ids;
104         status = cs.createMap(ExtentStatus.class);
105         predicates = cs.createSet();
106         exclusions = cs.createSet();
107         exclusionFunction = conf.exclusionFunction;
108         fringe = new TreeSet<Resource>();
109         isRelatedToPredicates = cs.createMap(Boolean.class);
110         deadPredicates = cs.createSet();
111         strongInverseSet = cs.createSet();
112
113         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
114             status.put(entry.getKey(), entry.getValue());
115             if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
116             if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
117         }
118
119         if(PROFILE)
120                 startupTime += System.nanoTime();
121         
122 //        for(RootSpec p : conf.roots) {
123 //              if(p.internal)
124 //                      fringe.add(p.resource);
125 //        }
126
127     }
128     
129     public ResourceMap<ExtentStatus> getStatus() {
130         return status;
131     }
132
133     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
134
135         CollectionSupport cs = graph.getService(CollectionSupport.class);
136         Collection<Resource> list = cs.createList();
137         Iterator<Resource> it = fringe.iterator();
138         for(int i=0;i<maxAmount;i++) {
139             if(!it.hasNext()) break;
140             list.add(it.next());
141             it.remove();
142         }
143
144         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
145
146     }
147
148     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
149
150         if(PROFILE)
151                 expandTime -= System.nanoTime();
152
153         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
154
155         if(PROFILE)
156                 expandTime += System.nanoTime();
157
158         return result;
159
160     }
161
162     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
163
164         for(Resource predicate : schedule) {
165             
166             Boolean isRelatedTo = Boolean.FALSE;
167             
168             Resource single = graph.getPossibleSuperrelation(predicate);
169             if(single != null) {
170                 
171                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
172                 if(singleIsRelatedTo == null) {
173                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
174                         isRelatedToPredicates.put(single, singleIsRelatedTo);
175                 }
176
177                 isRelatedTo = singleIsRelatedTo;
178                 
179             } else {
180                 
181                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
182                         isRelatedTo = Boolean.TRUE;
183                     if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
184                 } else {
185                     if (!graph.hasStatement(predicate)) {
186                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
187                         deadPredicates.add(predicate);
188                         // Prevents ModelTransferableGraphSource from
189                         // trying to export these statements.
190                         state.inverses.remove(support.getTransientId(predicate));
191                     }
192                 }
193                 
194             }
195             
196                 isRelatedToPredicates.put(predicate, isRelatedTo);
197
198         }
199
200     }
201
202     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
203
204         if(PROFILE)
205                 classifyPredicateTime -= System.nanoTime();
206
207         CollectionSupport cs = graph.getService(CollectionSupport.class);
208         final Set<Resource> schedule = cs.createSet();
209         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
210
211         for (DirectStatements stms : expansion) {
212             for(Statement stm : stms) {
213
214                 Resource predicate = stm.getPredicate();
215                 Resource object = stm.getObject();
216
217                 if (exclusions.contains(object) || exclusions.contains(predicate))
218                     continue;
219                 
220                 if (exclusionFunction != null) {
221                         ExclusionDecision decision = exclusionFunction.apply(stm);
222                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
223                                 status.put(object, ExtentStatus.EXCLUDED);
224                                 exclusions.add(object);
225                                 continue;
226                         }
227                 }
228
229                 if(predicates.add(predicate)) {
230                     Resource inverse = graph.getPossibleInverse(predicate);
231                     schedule.add(predicate);
232                     if(inverse != null) {
233                         newPredicates.put(predicate, inverse);
234                         if(predicates.add(inverse)) schedule.add(inverse);
235                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
236                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
237                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
238                     } else {
239                         state.inverses.put(support.getTransientId(predicate), 0);
240                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
241                     }
242
243                 }
244
245             }
246         }
247
248         classifyPredicates(graph, schedule, state);
249
250         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
251             // Inverse is strong => this has strong inverse
252                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
253                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
254                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
255                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
256         }
257
258         if(PROFILE)
259                 classifyPredicateTime += System.nanoTime();
260
261     }
262
263     /*
264      * Composed objects are internal. Mark them for expansion.
265      */
266
267     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
268         Resource object = graph.getSingleObject(subject, L0.HasDataType);
269         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
270     }
271
272     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
273         final InputStream valueStream = tgs.getValueStream(graph, subject);
274         if (valueStream != null) {
275             if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
276             state.valueOutput.writeInt(sId);
277
278             if (conf.values) {
279                 Datatype dt = getDatatype(graph, subject);
280
281                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
282                 long rawVariantSizePos = 0;
283                 state.valueOutput.writeByte(canWriteRawVariant
284                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
285                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
286                 if (canWriteRawVariant) {
287                     // Add space for raw variant byte size before the data
288                     rawVariantSizePos = state.valueOutput.position();
289                     state.valueOutput.writeInt(0);
290                 }
291
292                 byte[] typeBytes = bindings.get(dt);
293                 if (typeBytes == null) {
294                     typeBytes = datatypeSerializer.serialize(dt);
295                     bindings.put(dt, typeBytes);
296                 }
297
298                 state.valueOutput.write(typeBytes);
299                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
300                 s.skip(new InputStream() {
301                     @Override
302                     public int read() throws IOException {
303                         int value = valueStream.read();
304                         state.valueOutput.write(value);
305                         return value;
306                     }
307                 });
308
309                 if (canWriteRawVariant) {
310                     long currentPos = state.valueOutput.position();
311                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
312                     state.valueOutput.position(rawVariantSizePos);
313                     state.valueOutput.writeInt(variantSize);
314                     state.valueOutput.position(currentPos);
315                 }
316             }
317
318             state.valueCount++;
319         }
320     }
321
322     private TIntArrayList stream = new TIntArrayList();
323
324     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
325         stream.add(support.getTransientId(predicate));
326         stream.add(support.getTransientId(object));
327     }
328
329     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
330
331         Resource predicate = stm.getPredicate();
332
333         Resource object = stm.getObject();
334
335         ExtentStatus objectStatus = status.get(object);
336
337         // Strong predicate
338         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
339         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
340
341             if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
342
343             addToStream(predicate, object);
344
345             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
346                 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
347                 fringe.add(object);
348             }
349
350         } else {
351
352             // Dead predicate
353             if (deadPredicates.contains(predicate)) {
354                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
355                 return;
356             }
357
358             // Weak predicate
359             if(objectStatus == ExtentStatus.EXCLUDED) {
360
361                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
362
363             } else {
364
365                 // The inverse is also weak (or there is no inverse)
366                 if(!strongInverseSet.contains(predicate)) {
367
368                     addToStream(predicate, object);
369
370                     if(objectStatus == null) {
371                         status.put(object, ExtentStatus.PENDING);
372                     }
373
374                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
375
376                 } else {
377
378                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
379
380                 }
381
382             }
383
384         }
385         
386     }
387     
388     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
389         if(!stream.isEmpty()) {
390             state.statementsOutput.writeInt(sId);
391             int streamSize = stream.size();
392             int statementCount = stream.size() / 2;
393             state.statementsOutput.writeInt(statementCount);
394             for (int i = 0; i < streamSize; i++)
395                 state.statementsOutput.writeInt(stream.getQuick(i));
396             state.statementCount += 2*streamSize;
397             stream.resetQuick();
398         }
399     }
400
401     // For progress monitor book-keeping
402     private long internalResourceNumber = 0;
403     private long startTime = 0;
404     private long lastUpdateTime = 0;
405
406     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
407
408         internalResourceNumber++;
409
410         // Update progress monitor with controlled frequency
411         long t = System.nanoTime();
412         long dt = t - lastUpdateTime;
413         if (dt > 200_000_000L) {
414             if (startTime == 0)
415                 startTime = t;
416             lastUpdateTime = t;
417             double totalTime = (t - startTime) * 1e-9;
418             if (totalTime > 0) {
419                 long speed = Math.round((double)internalResourceNumber / totalTime);
420                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
421             }
422         }
423
424         status.put(subject, ExtentStatus.INTERNAL);
425         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
426
427         int sId = support.getTransientId(subject);
428
429         if(PROFILE)
430                 valueOutputTime -= System.nanoTime();
431
432         processValue(graph, subject, sId, state);
433
434         if(PROFILE)
435                 valueOutputTime += System.nanoTime();
436
437         if(PROFILE)
438                 statementOutputTime -= System.nanoTime();
439
440         for(Statement stm : stms) {
441                 processStatement(graph, subject, stm);
442         }
443
444         flushStatementStream(sId, state);
445
446         if(PROFILE)
447                 statementOutputTime += System.nanoTime();
448
449         // Logarithmic progress monitor for unknown amount of work.
450         state.monitor.setWorkRemaining(100000);
451         state.monitor.worked(1);
452     }
453
454     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
455
456         if(PROFILE)
457                 processFringeTime -= System.nanoTime();
458
459         for (DirectStatements stms : expansion) {
460
461             Resource subject = stms.getSubject();
462
463             boolean partOf = false;
464             for(Statement stm : stms) {
465                 Resource predicate = stm.getPredicate();
466                 if(L0.PartOf.equals(predicate)) {
467                     partOf = true;
468                     break;
469                 }
470             }
471
472             ExtentStatus subjectStatus = status.get(subject);
473             if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
474             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
475             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
476
477                 status.put(subject, ExtentStatus.EXTERNAL);
478                 if(ModelTransferableGraphSourceRequest.LOG) {
479                     String uri = graph.getPossibleURI(subject);
480                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
481                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
482                 }
483
484             } else {
485
486                 processInternal(graph, subject, stms, state);
487                 
488             }
489
490         }
491
492         if(PROFILE)
493                 processFringeTime += System.nanoTime();
494
495     }
496
497     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
498
499         try {
500
501             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
502             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
503             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
504
505             for(Resource r : ConsistsOfProcess.walk(graph, fringe, exclusions, ignoreVirtual)) {
506                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
507                     if(ModelTransferableGraphSourceRequest.LOG) {
508                         String URI = graph.getPossibleURI(r);
509                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
510                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
511                     }
512                     fringe.add(r);
513                 }
514             }
515
516             if (state.monitor.isCanceled())
517                 throw new CancelTransactionException();
518
519             while(!fringe.isEmpty()) {
520
521                 Collection<DirectStatements> expansion = expand(graph);
522                 classifyPredicates(graph, state, expansion);
523                 processFringe(graph, expansion, state);
524
525                 if (state.monitor.isCanceled())
526                     throw new CancelTransactionException();
527             }
528
529             if (ModelTransferableGraphSourceRequest.PROFILE) {
530                 System.out.println(composedObjectCounter + " " + fastInternalCounter
531                         + " " + parentExternalCounter + " "
532                         + fullExternalCounter + " " + fullInternalCounter);
533             }
534
535         } catch (IOException e) {
536             throw new DatabaseException(e);
537         }
538
539     }
540
541     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
542         if(ModelTransferableGraphSourceRequest.LOG) {
543             SerialisationSupport support = graph.getService(SerialisationSupport.class);
544             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
545             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
546             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
547             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
548         }
549     }
550
551     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
552         if(ModelTransferableGraphSourceRequest.LOG) {
553             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
554             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
555             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
556             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
557         }
558     }
559
560 }