bd67680db6704f73bbe3a73d69aaa7774f8d3b5f
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.HashMap;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.TreeSet;
12
13 import org.simantics.databoard.Bindings;
14 import org.simantics.databoard.Databoard;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.serialization.Serializer;
17 import org.simantics.databoard.type.Datatype;
18 import org.simantics.db.DirectStatements;
19 import org.simantics.db.ReadGraph;
20 import org.simantics.db.Resource;
21 import org.simantics.db.ResourceMap;
22 import org.simantics.db.Statement;
23 import org.simantics.db.common.primitiverequest.Value;
24 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
25 import org.simantics.db.common.utils.NameUtils;
26 import org.simantics.db.exception.CancelTransactionException;
27 import org.simantics.db.exception.DatabaseException;
28 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
29 import org.simantics.db.layer0.util.ConsistsOfProcess.InternalEntry;
30 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
31 import org.simantics.db.service.CollectionSupport;
32 import org.simantics.db.service.SerialisationSupport;
33 import org.simantics.db.service.TransferableGraphSupport;
34 import org.simantics.graph.db.TransferableGraphSource;
35 import org.simantics.layer0.Layer0;
36 import org.simantics.scl.runtime.function.Function1;
37
38 import gnu.trove.list.array.TIntArrayList;
39 import gnu.trove.map.hash.TIntIntHashMap;
40
41 public class DomainProcessor3 {
42         
43         public enum ExclusionDecision {
44                 INCLUDE, EXCLUDE_OBJECT
45         }
46
47         final static private boolean PROFILE = false;
48
49     Serializer variantSerializer;
50     Serializer datatypeSerializer;
51     Binding datatypeBinding;
52     boolean ignoreVirtual;
53
54     int id = 0;
55
56     Set<Resource> fringe = null;
57     Set<Resource> exclusions = null;
58     Function1<Statement,ExclusionDecision> exclusionFunction = null;
59     Set<Resource> predicates = null;
60     Map<Resource,Boolean> isRelatedToPredicates = null;
61     Set<Resource> deadPredicates = null;
62     Set<Resource> strongInverseSet = null;
63
64     TIntIntHashMap ids = null;
65     ResourceMap<ExtentStatus> status = null;
66     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
67     final SerialisationSupport support;
68     final TransferableGraphConfiguration2 conf;
69     final TransferableGraphSupport tgs;
70
71     private Layer0 L0;
72
73     private long composedObjectCounter = 0;
74     private long fastInternalCounter = 0;
75     private long parentExternalCounter = 0;
76     private long fullInternalCounter = 0;
77     private long fullExternalCounter = 0;
78
79     long startupTime = 0;
80     long expandTime = 0;
81     long fullResolveTime = 0;
82     long fastResolveTime = 0;
83     long otherStatementTime = 0;
84     long parentResolveTime = 0;
85     long extentSeedTime = 0;
86     long classifyPredicateTime = 0;
87     long processFringeTime = 0;
88     long valueOutputTime = 0;
89     long statementOutputTime = 0;
90
91     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
92
93         this.L0 = Layer0.getInstance(graph);
94         this.tgs = graph.getService(TransferableGraphSupport.class);
95
96         this.support = graph.getService(SerialisationSupport.class);
97         this.ignoreVirtual = ignoreVirtual;
98         this.conf = conf;
99
100         if(PROFILE)
101                 startupTime -= System.nanoTime();
102
103         CollectionSupport cs = graph.getService(CollectionSupport.class);
104
105         ids = state.ids;
106         status = cs.createMap(ExtentStatus.class);
107         predicates = cs.createSet();
108         exclusions = cs.createSet();
109         exclusionFunction = conf.exclusionFunction;
110         fringe = new TreeSet<Resource>();
111         isRelatedToPredicates = cs.createMap(Boolean.class);
112         deadPredicates = cs.createSet();
113         strongInverseSet = cs.createSet();
114
115         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
116             status.put(entry.getKey(), entry.getValue());
117             if(ExtentStatus.EXCLUDED.equals(entry.getValue())) exclusions.add(entry.getKey());
118             if(ExtentStatus.INTERNAL.equals(entry.getValue())) fringe.add(entry.getKey());
119         }
120
121         if(PROFILE)
122                 startupTime += System.nanoTime();
123         
124 //        for(RootSpec p : conf.roots) {
125 //              if(p.internal)
126 //                      fringe.add(p.resource);
127 //        }
128
129     }
130     
131     public ResourceMap<ExtentStatus> getStatus() {
132         return status;
133     }
134
135     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
136
137         CollectionSupport cs = graph.getService(CollectionSupport.class);
138         Collection<Resource> list = cs.createList();
139         Iterator<Resource> it = fringe.iterator();
140         for(int i=0;i<maxAmount;i++) {
141             if(!it.hasNext()) break;
142             list.add(it.next());
143             it.remove();
144         }
145
146         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
147
148     }
149
150     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
151
152         if(PROFILE)
153                 expandTime -= System.nanoTime();
154
155         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
156
157         if(PROFILE)
158                 expandTime += System.nanoTime();
159
160         return result;
161
162     }
163
164     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
165
166         for(Resource predicate : schedule) {
167             
168             Boolean isRelatedTo = Boolean.FALSE;
169             
170             Resource single = graph.getPossibleSuperrelation(predicate);
171             if(single != null) {
172                 
173                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
174                 if(singleIsRelatedTo == null) {
175                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
176                         isRelatedToPredicates.put(single, singleIsRelatedTo);
177                 }
178
179                 isRelatedTo = singleIsRelatedTo;
180                 
181             } else {
182                 
183                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
184                         isRelatedTo = Boolean.TRUE;
185                     if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
186                 } else {
187                     if (!graph.hasStatement(predicate)) {
188                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
189                         deadPredicates.add(predicate);
190                         // Prevents ModelTransferableGraphSource from
191                         // trying to export these statements.
192                         state.inverses.remove(support.getTransientId(predicate));
193                     }
194                 }
195                 
196             }
197             
198                 isRelatedToPredicates.put(predicate, isRelatedTo);
199
200         }
201
202     }
203
204     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
205
206         if(PROFILE)
207                 classifyPredicateTime -= System.nanoTime();
208
209         CollectionSupport cs = graph.getService(CollectionSupport.class);
210         final Set<Resource> schedule = cs.createSet();
211         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
212
213         for (DirectStatements stms : expansion) {
214             for(Statement stm : stms) {
215
216                 Resource predicate = stm.getPredicate();
217                 Resource object = stm.getObject();
218
219                 if (exclusions.contains(object) || exclusions.contains(predicate))
220                     continue;
221                 
222                 if (exclusionFunction != null) {
223                         ExclusionDecision decision = exclusionFunction.apply(stm);
224                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
225                                 status.put(object, ExtentStatus.EXCLUDED);
226                                 exclusions.add(object);
227                                 continue;
228                         }
229                 }
230
231                 if(predicates.add(predicate)) {
232                     Resource inverse = graph.getPossibleInverse(predicate);
233                     schedule.add(predicate);
234                     if(inverse != null) {
235                         newPredicates.put(predicate, inverse);
236                         if(predicates.add(inverse)) schedule.add(inverse);
237                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
238                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
239                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
240                     } else {
241                         state.inverses.put(support.getTransientId(predicate), 0);
242                         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
243                     }
244
245                 }
246
247             }
248         }
249
250         classifyPredicates(graph, schedule, state);
251
252         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
253             // Inverse is strong => this has strong inverse
254                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
255                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
256                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
257                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
258         }
259
260         if(PROFILE)
261                 classifyPredicateTime += System.nanoTime();
262
263     }
264
265     /*
266      * Composed objects are internal. Mark them for expansion.
267      */
268
269     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
270         Resource object = graph.getSingleObject(subject, L0.HasDataType);
271         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
272     }
273
274     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
275         final InputStream valueStream = tgs.getValueStream(graph, subject);
276         if (valueStream != null) {
277             if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
278             state.valueOutput.writeInt(sId);
279
280             if (conf.values) {
281                 Datatype dt = getDatatype(graph, subject);
282
283                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
284                 long rawVariantSizePos = 0;
285                 state.valueOutput.writeByte(canWriteRawVariant
286                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
287                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
288                 if (canWriteRawVariant) {
289                     // Add space for raw variant byte size before the data
290                     rawVariantSizePos = state.valueOutput.position();
291                     state.valueOutput.writeInt(0);
292                 }
293
294                 byte[] typeBytes = bindings.get(dt);
295                 if (typeBytes == null) {
296                     typeBytes = datatypeSerializer.serialize(dt);
297                     bindings.put(dt, typeBytes);
298                 }
299
300                 state.valueOutput.write(typeBytes);
301                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
302                 s.skip(new InputStream() {
303                     @Override
304                     public int read() throws IOException {
305                         int value = valueStream.read();
306                         state.valueOutput.write(value);
307                         return value;
308                     }
309                 });
310
311                 if (canWriteRawVariant) {
312                     long currentPos = state.valueOutput.position();
313                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
314                     state.valueOutput.position(rawVariantSizePos);
315                     state.valueOutput.writeInt(variantSize);
316                     state.valueOutput.position(currentPos);
317                 }
318             }
319
320             state.valueCount++;
321         }
322     }
323
324     private TIntArrayList stream = new TIntArrayList();
325
326     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
327         stream.add(support.getTransientId(predicate));
328         stream.add(support.getTransientId(object));
329     }
330
331     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
332
333         Resource predicate = stm.getPredicate();
334
335         Resource object = stm.getObject();
336
337         ExtentStatus objectStatus = status.get(object);
338
339         // Strong predicate
340         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
341         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
342
343             if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
344
345             addToStream(predicate, object);
346
347             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
348                 if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
349                 fringe.add(object);
350             }
351
352         } else {
353
354             // Dead predicate
355             if (deadPredicates.contains(predicate)) {
356                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
357                 return;
358             }
359
360             // Weak predicate
361             if(objectStatus == ExtentStatus.EXCLUDED) {
362
363                 if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
364
365             } else {
366
367                 // The inverse is also weak (or there is no inverse)
368                 if(!strongInverseSet.contains(predicate)) {
369
370                     addToStream(predicate, object);
371
372                     if(objectStatus == null) {
373                         status.put(object, ExtentStatus.PENDING);
374                     }
375
376                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
377
378                 } else {
379
380                     if(ModelTransferableGraphSourceRequest.LOG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
381
382                 }
383
384             }
385
386         }
387         
388     }
389     
390     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
391         if(!stream.isEmpty()) {
392             state.statementsOutput.writeInt(sId);
393             int streamSize = stream.size();
394             int statementCount = stream.size() / 2;
395             state.statementsOutput.writeInt(statementCount);
396             for (int i = 0; i < streamSize; i++)
397                 state.statementsOutput.writeInt(stream.getQuick(i));
398             state.statementCount += 2*streamSize;
399             stream.resetQuick();
400         }
401     }
402
403     // For progress monitor book-keeping
404     private long internalResourceNumber = 0;
405     private long startTime = 0;
406     private long lastUpdateTime = 0;
407
408     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
409
410         internalResourceNumber++;
411
412         // Update progress monitor with controlled frequency
413         long t = System.nanoTime();
414         long dt = t - lastUpdateTime;
415         if (dt > 200_000_000L) {
416             if (startTime == 0)
417                 startTime = t;
418             lastUpdateTime = t;
419             double totalTime = (t - startTime) * 1e-9;
420             if (totalTime > 0) {
421                 long speed = Math.round((double)internalResourceNumber / totalTime);
422                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
423             }
424         }
425
426         status.put(subject, ExtentStatus.INTERNAL);
427         if(ModelTransferableGraphSourceRequest.LOG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
428
429         int sId = support.getTransientId(subject);
430
431         if(PROFILE)
432                 valueOutputTime -= System.nanoTime();
433
434         processValue(graph, subject, sId, state);
435
436         if(PROFILE)
437                 valueOutputTime += System.nanoTime();
438
439         if(PROFILE)
440                 statementOutputTime -= System.nanoTime();
441
442         for(Statement stm : stms) {
443                 processStatement(graph, subject, stm);
444         }
445
446         flushStatementStream(sId, state);
447
448         if(PROFILE)
449                 statementOutputTime += System.nanoTime();
450
451         // Logarithmic progress monitor for unknown amount of work.
452         state.monitor.setWorkRemaining(100000);
453         state.monitor.worked(1);
454     }
455
456     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
457
458         if(PROFILE)
459                 processFringeTime -= System.nanoTime();
460
461         for (DirectStatements stms : expansion) {
462
463             Resource subject = stms.getSubject();
464
465             boolean partOf = false;
466             for(Statement stm : stms) {
467                 Resource predicate = stm.getPredicate();
468                 if(L0.PartOf.equals(predicate)) {
469                     partOf = true;
470                     break;
471                 }
472             }
473
474             ExtentStatus subjectStatus = status.get(subject);
475             if(ModelTransferableGraphSourceRequest.LOG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
476             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
477             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
478
479                 status.put(subject, ExtentStatus.EXTERNAL);
480                 if(ModelTransferableGraphSourceRequest.LOG) {
481                     String uri = graph.getPossibleURI(subject);
482                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
483                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
484                 }
485
486             } else {
487
488                 processInternal(graph, subject, stms, state);
489                 
490             }
491
492         }
493
494         if(PROFILE)
495                 processFringeTime += System.nanoTime();
496
497     }
498
499     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
500
501         try {
502
503             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
504             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
505             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
506
507             state.internalEntries = ConsistsOfProcess.walk(graph, status, fringe, exclusions, ignoreVirtual); 
508             
509             for(InternalEntry entry : state.internalEntries) {
510                 Resource r = entry.resource;
511                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
512                     if(ModelTransferableGraphSourceRequest.LOG) {
513                         String URI = graph.getPossibleURI(r);
514                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
515                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
516                     }
517                     fringe.add(r);
518                 }
519             }
520
521             if (state.monitor.isCanceled())
522                 throw new CancelTransactionException();
523
524             while(!fringe.isEmpty()) {
525
526                 Collection<DirectStatements> expansion = expand(graph);
527                 classifyPredicates(graph, state, expansion);
528                 processFringe(graph, expansion, state);
529
530                 if (state.monitor.isCanceled())
531                     throw new CancelTransactionException();
532             }
533
534             if (ModelTransferableGraphSourceRequest.PROFILE) {
535                 System.out.println(composedObjectCounter + " " + fastInternalCounter
536                         + " " + parentExternalCounter + " "
537                         + fullExternalCounter + " " + fullInternalCounter);
538             }
539
540         } catch (IOException e) {
541             throw new DatabaseException(e);
542         }
543
544     }
545
546     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
547         if(ModelTransferableGraphSourceRequest.LOG) {
548             SerialisationSupport support = graph.getService(SerialisationSupport.class);
549             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
550             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
551             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
552             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
553         }
554     }
555
556     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
557         if(ModelTransferableGraphSourceRequest.LOG) {
558             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
559             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
560             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
561             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
562         }
563     }
564
565 }