]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java
20856d4ced9de92d68b013cb6c545bfc95bdfdff
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Set;
12 import java.util.TreeSet;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.Databoard;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.serialization.Serializer;
18 import org.simantics.databoard.type.Datatype;
19 import org.simantics.db.DirectStatements;
20 import org.simantics.db.ReadGraph;
21 import org.simantics.db.Resource;
22 import org.simantics.db.ResourceMap;
23 import org.simantics.db.Statement;
24 import org.simantics.db.common.StandardStatement;
25 import org.simantics.db.common.primitiverequest.Value;
26 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
27 import org.simantics.db.common.utils.NameUtils;
28 import org.simantics.db.exception.CancelTransactionException;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
31 import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry;
32 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
33 import org.simantics.db.service.CollectionSupport;
34 import org.simantics.db.service.SerialisationSupport;
35 import org.simantics.db.service.TransferableGraphSupport;
36 import org.simantics.graph.db.TransferableGraphSource;
37 import org.simantics.layer0.Layer0;
38 import org.simantics.scl.runtime.function.Function1;
39 import org.simantics.utils.datastructures.Pair;
40
41 import gnu.trove.list.array.TIntArrayList;
42 import gnu.trove.map.hash.TIntIntHashMap;
43
44 public class DomainProcessor3 {
45         
46         public enum ExclusionDecision {
47                 INCLUDE, EXCLUDE_OBJECT
48         }
49
50         final static private boolean PROFILE = false;
51
52     Serializer variantSerializer;
53     Serializer datatypeSerializer;
54     Binding datatypeBinding;
55     boolean ignoreVirtual;
56
57     int id = 0;
58
59     Set<Resource> fringe = null;
60     Set<Resource> exclusions = null;
61     Function1<Statement,ExclusionDecision> exclusionFunction = null;
62     Set<Resource> predicates = null;
63     Map<Resource,Boolean> isRelatedToPredicates = null;
64     Set<Resource> deadPredicates = null;
65     Set<Resource> strongInverseSet = null;
66     List<Statement> unresolvedWeakLinks = new ArrayList<>();
67     TIntIntHashMap ids = null;
68     ResourceMap<ExtentStatus> status = null;
69     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
70     final SerialisationSupport support;
71     final TransferableGraphConfiguration2 conf;
72     final TransferableGraphSupport tgs;
73
74     private Layer0 L0;
75
76     private long composedObjectCounter = 0;
77     private long fastInternalCounter = 0;
78     private long parentExternalCounter = 0;
79     private long fullInternalCounter = 0;
80     private long fullExternalCounter = 0;
81
82     long startupTime = 0;
83     long expandTime = 0;
84     long fullResolveTime = 0;
85     long fastResolveTime = 0;
86     long otherStatementTime = 0;
87     long parentResolveTime = 0;
88     long extentSeedTime = 0;
89     long classifyPredicateTime = 0;
90     long processFringeTime = 0;
91     long valueOutputTime = 0;
92     long statementOutputTime = 0;
93
94     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
95
96         this.L0 = Layer0.getInstance(graph);
97         this.tgs = graph.getService(TransferableGraphSupport.class);
98
99         this.support = graph.getService(SerialisationSupport.class);
100         this.ignoreVirtual = ignoreVirtual;
101         this.conf = conf;
102
103         if(PROFILE)
104                 startupTime -= System.nanoTime();
105
106         CollectionSupport cs = graph.getService(CollectionSupport.class);
107
108         ids = state.ids;
109         status = cs.createMap(ExtentStatus.class);
110         predicates = cs.createSet();
111         exclusions = cs.createSet();
112         exclusionFunction = conf.exclusionFunction;
113         fringe = new TreeSet<Resource>();
114         isRelatedToPredicates = cs.createMap(Boolean.class);
115         deadPredicates = cs.createSet();
116         strongInverseSet = cs.createSet();
117
118         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
119             status.put(entry.getKey(), entry.getValue());
120             if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
121             if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
122         }
123
124         if(PROFILE)
125                 startupTime += System.nanoTime();
126         
127 //        for(RootSpec p : conf.roots) {
128 //              if(p.internal)
129 //                      fringe.add(p.resource);
130 //        }
131
132     }
133     
134     public ResourceMap<ExtentStatus> getStatus() {
135         return status;
136     }
137
138     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
139
140         CollectionSupport cs = graph.getService(CollectionSupport.class);
141         Collection<Resource> list = cs.createList();
142         Iterator<Resource> it = fringe.iterator();
143         for(int i=0;i<maxAmount;i++) {
144             if(!it.hasNext()) break;
145             list.add(it.next());
146             it.remove();
147         }
148
149         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
150
151     }
152
153     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
154
155         if(PROFILE)
156                 expandTime -= System.nanoTime();
157
158         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
159
160         if(PROFILE)
161                 expandTime += System.nanoTime();
162
163         return result;
164
165     }
166
167     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
168
169         for(Resource predicate : schedule) {
170             
171             Boolean isRelatedTo = Boolean.FALSE;
172             
173             Resource single = graph.getPossibleSuperrelation(predicate);
174             if(single != null) {
175                 
176                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
177                 if(singleIsRelatedTo == null) {
178                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
179                         isRelatedToPredicates.put(single, singleIsRelatedTo);
180                 }
181
182                 isRelatedTo = singleIsRelatedTo;
183                 
184             } else {
185                 
186                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
187                         isRelatedTo = Boolean.TRUE;
188                     if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
189                 } else {
190                     if (!graph.hasStatement(predicate)) {
191                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
192                         deadPredicates.add(predicate);
193                         // Prevents ModelTransferableGraphSource from
194                         // trying to export these statements.
195                         state.inverses.remove(support.getTransientId(predicate));
196                     }
197                 }
198                 
199             }
200             
201                 isRelatedToPredicates.put(predicate, isRelatedTo);
202
203         }
204
205     }
206
207     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
208
209         if(PROFILE)
210                 classifyPredicateTime -= System.nanoTime();
211
212         CollectionSupport cs = graph.getService(CollectionSupport.class);
213         final Set<Resource> schedule = cs.createSet();
214         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
215
216         for (DirectStatements stms : expansion) {
217             for(Statement stm : stms) {
218
219                 Resource predicate = stm.getPredicate();
220                 Resource object = stm.getObject();
221
222                 if (exclusions.contains(object) || exclusions.contains(predicate))
223                     continue;
224                 
225                 if (exclusionFunction != null) {
226                         ExclusionDecision decision = exclusionFunction.apply(stm);
227                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
228                                 status.put(object, ExtentStatus.EXCLUDED);
229                                 exclusions.add(object);
230                                 continue;
231                         }
232                 }
233
234                 if(predicates.add(predicate)) {
235                     Resource inverse = graph.getPossibleInverse(predicate);
236                     schedule.add(predicate);
237                     if(inverse != null) {
238                         newPredicates.put(predicate, inverse);
239                         if(predicates.add(inverse)) schedule.add(inverse);
240                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
241                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
242                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
243                     } else {
244                         state.inverses.put(support.getTransientId(predicate), 0);
245                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
246                     }
247
248                 }
249
250             }
251         }
252
253         classifyPredicates(graph, schedule, state);
254
255         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
256             // Inverse is strong => this has strong inverse
257                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
258                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
259                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
260                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
261         }
262
263         if(PROFILE)
264                 classifyPredicateTime += System.nanoTime();
265
266     }
267
268     /*
269      * Composed objects are internal. Mark them for expansion.
270      */
271
272     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
273         Resource object = graph.getSingleObject(subject, L0.HasDataType);
274         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
275     }
276
277     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
278         final InputStream valueStream = tgs.getValueStream(graph, subject);
279         if (valueStream != null) {
280             if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
281             state.valueOutput.writeInt(sId);
282
283             if (conf.values) {
284                 Datatype dt = getDatatype(graph, subject);
285
286                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
287                 long rawVariantSizePos = 0;
288                 state.valueOutput.writeByte(canWriteRawVariant
289                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
290                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
291                 if (canWriteRawVariant) {
292                     // Add space for raw variant byte size before the data
293                     rawVariantSizePos = state.valueOutput.position();
294                     state.valueOutput.writeInt(0);
295                 }
296
297                 byte[] typeBytes = bindings.get(dt);
298                 if (typeBytes == null) {
299                     typeBytes = datatypeSerializer.serialize(dt);
300                     bindings.put(dt, typeBytes);
301                 }
302
303                 state.valueOutput.write(typeBytes);
304                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
305                 s.skip(new InputStream() {
306                     @Override
307                     public int read() throws IOException {
308                         int value = valueStream.read();
309                         state.valueOutput.write(value);
310                         return value;
311                     }
312                 });
313
314                 if (canWriteRawVariant) {
315                     long currentPos = state.valueOutput.position();
316                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
317                     state.valueOutput.position(rawVariantSizePos);
318                     state.valueOutput.writeInt(variantSize);
319                     state.valueOutput.position(currentPos);
320                 }
321             }
322
323             state.valueCount++;
324         }
325     }
326
327     private TIntArrayList stream = new TIntArrayList();
328
329     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
330         stream.add(support.getTransientId(predicate));
331         stream.add(support.getTransientId(object));
332     }
333
334     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
335
336         Resource predicate = stm.getPredicate();
337
338         Resource object = stm.getObject();
339
340         ExtentStatus objectStatus = status.get(object);
341
342         // Strong predicate
343         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
344         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
345
346             if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
347
348             addToStream(predicate, object);
349
350             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
351                 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
352                 fringe.add(object);
353             }
354
355         } else {
356
357             // Dead predicate
358             if (deadPredicates.contains(predicate)) {
359                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
360                 return;
361             }
362
363             // Weak predicate
364             if(objectStatus == ExtentStatus.EXCLUDED) {
365
366                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
367
368             } else {
369
370                 // The inverse is also weak (or there is no inverse)
371                 if(!strongInverseSet.contains(predicate)) {
372                         
373                         unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
374                     //addToStream(predicate, object);
375
376                     if(objectStatus == null) {
377                         status.put(object, ExtentStatus.PENDING);
378                     }
379
380                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
381
382                 } else {
383
384                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
385
386                 }
387
388             }
389
390         }
391         
392     }
393     
394     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
395         if(!stream.isEmpty()) {
396             state.statementsOutput.writeInt(sId);
397             int streamSize = stream.size();
398             int statementCount = stream.size() / 2;
399             state.statementsOutput.writeInt(statementCount);
400             for (int i = 0; i < streamSize; i++)
401                 state.statementsOutput.writeInt(stream.getQuick(i));
402             state.statementCount += 2*streamSize;
403             stream.resetQuick();
404         }
405     }
406
407     // For progress monitor book-keeping
408     private long internalResourceNumber = 0;
409     private long startTime = 0;
410     private long lastUpdateTime = 0;
411
412     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
413
414         internalResourceNumber++;
415
416         // Update progress monitor with controlled frequency
417         long t = System.nanoTime();
418         long dt = t - lastUpdateTime;
419         if (dt > 200_000_000L) {
420             if (startTime == 0)
421                 startTime = t;
422             lastUpdateTime = t;
423             double totalTime = (t - startTime) * 1e-9;
424             if (totalTime > 0) {
425                 long speed = Math.round((double)internalResourceNumber / totalTime);
426                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
427             }
428         }
429
430         status.put(subject, ExtentStatus.INTERNAL);
431         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
432
433         int sId = support.getTransientId(subject);
434
435         if(PROFILE)
436                 valueOutputTime -= System.nanoTime();
437
438         processValue(graph, subject, sId, state);
439
440         if(PROFILE)
441                 valueOutputTime += System.nanoTime();
442
443         if(PROFILE)
444                 statementOutputTime -= System.nanoTime();
445
446         for(Statement stm : stms) {
447                 processStatement(graph, subject, stm);
448         }
449
450         flushStatementStream(sId, state);
451
452         if(PROFILE)
453                 statementOutputTime += System.nanoTime();
454
455         // Logarithmic progress monitor for unknown amount of work.
456         state.monitor.setWorkRemaining(100000);
457         state.monitor.worked(1);
458     }
459
460     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
461
462         if(PROFILE)
463                 processFringeTime -= System.nanoTime();
464
465         for (DirectStatements stms : expansion) {
466
467             Resource subject = stms.getSubject();
468
469             boolean partOf = false;
470             for(Statement stm : stms) {
471                 Resource predicate = stm.getPredicate();
472                 if(L0.PartOf.equals(predicate)) {
473                     partOf = true;
474                     break;
475                 }
476             }
477
478             ExtentStatus subjectStatus = status.get(subject);
479             if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
480             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
481             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
482
483                 status.put(subject, ExtentStatus.EXTERNAL);
484                 if(ModelTransferableGraphSourceRequest.LOG) {
485                     String uri = graph.getPossibleURI(subject);
486                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
487                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
488                 }
489
490             } else {
491
492                 processInternal(graph, subject, stms, state);
493                 
494             }
495
496         }
497
498         if(PROFILE)
499                 processFringeTime += System.nanoTime();
500
501     }
502
503     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
504
505         try {
506
507             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
508             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
509             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
510
511             Pair<List<InternalEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual); 
512             state.internalEntries = pair.first;
513             
514             for(InternalEntry entry : state.internalEntries) {
515                 Resource r = entry.resource;
516                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
517                     if(ModelTransferableGraphSourceRequest.LOG) {
518                         String URI = graph.getPossibleURI(r);
519                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
520                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
521                     }
522                     fringe.add(r);
523                 }
524             }
525
526             for(Resource unnamedChild : pair.second) {
527                 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
528                     fringe.add(unnamedChild);
529                 }
530             }
531             
532             if (state.monitor.isCanceled())
533                 throw new CancelTransactionException();
534
535             while(!fringe.isEmpty()) {
536
537                 Collection<DirectStatements> expansion = expand(graph);
538                 classifyPredicates(graph, state, expansion);
539                 processFringe(graph, expansion, state);
540
541                 if (state.monitor.isCanceled())
542                     throw new CancelTransactionException();
543             }
544
545             if (ModelTransferableGraphSourceRequest.PROFILE) {
546                 System.out.println(composedObjectCounter + " " + fastInternalCounter
547                         + " " + parentExternalCounter + " "
548                         + fullExternalCounter + " " + fullInternalCounter);
549             }
550
551         } catch (IOException e) {
552             throw new DatabaseException(e);
553         }
554
555     }
556
557     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
558         if(ModelTransferableGraphSourceRequest.LOG) {
559             SerialisationSupport support = graph.getService(SerialisationSupport.class);
560             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
561             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
562             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
563             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
564         }
565     }
566
567     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
568         if(ModelTransferableGraphSourceRequest.LOG) {
569             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
570             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
571             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
572             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
573         }
574     }
575
576 }