]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java
Fix TG export regression caused by commit b9b77f42
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Set;
12 import java.util.TreeSet;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.Databoard;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.serialization.Serializer;
18 import org.simantics.databoard.type.Datatype;
19 import org.simantics.db.DirectStatements;
20 import org.simantics.db.ReadGraph;
21 import org.simantics.db.Resource;
22 import org.simantics.db.ResourceMap;
23 import org.simantics.db.Statement;
24 import org.simantics.db.common.StandardStatement;
25 import org.simantics.db.common.primitiverequest.Value;
26 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
27 import org.simantics.db.common.utils.NameUtils;
28 import org.simantics.db.exception.CancelTransactionException;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
31 import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry;
32 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
33 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec;
34 import org.simantics.db.layer0.util.TransferableGraphConfiguration2.SeedSpec.SeedSpecType;
35 import org.simantics.db.service.CollectionSupport;
36 import org.simantics.db.service.SerialisationSupport;
37 import org.simantics.db.service.TransferableGraphSupport;
38 import org.simantics.graph.db.TransferableGraphSource;
39 import org.simantics.layer0.Layer0;
40 import org.simantics.scl.runtime.function.Function1;
41 import org.simantics.utils.datastructures.Pair;
42
43 import gnu.trove.list.array.TIntArrayList;
44 import gnu.trove.map.hash.TIntIntHashMap;
45
46 public class DomainProcessor3 {
47         
48         public enum ExclusionDecision {
49                 INCLUDE, EXCLUDE_OBJECT
50         }
51
52         final static private boolean PROFILE = false;
53
54     Serializer variantSerializer;
55     Serializer datatypeSerializer;
56     Binding datatypeBinding;
57     boolean ignoreVirtual;
58
59     int id = 0;
60
61     Set<Resource> fringe = null;
62     Function1<Statement,ExclusionDecision> exclusionFunction = null;
63     Set<Resource> predicates = null;
64     Map<Resource,Boolean> isRelatedToPredicates = null;
65     Set<Resource> deadPredicates = null;
66     Set<Resource> strongInverseSet = null;
67     List<Statement> unresolvedWeakLinks = new ArrayList<>();
68     TIntIntHashMap ids = null;
69     ResourceMap<ExtentStatus> status = null;
70     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
71     final SerialisationSupport support;
72     final TransferableGraphConfiguration2 conf;
73     final TransferableGraphSupport tgs;
74
75     private Layer0 L0;
76
77     private long composedObjectCounter = 0;
78     private long fastInternalCounter = 0;
79     private long parentExternalCounter = 0;
80     private long fullInternalCounter = 0;
81     private long fullExternalCounter = 0;
82
83     long startupTime = 0;
84     long expandTime = 0;
85     long fullResolveTime = 0;
86     long fastResolveTime = 0;
87     long otherStatementTime = 0;
88     long parentResolveTime = 0;
89     long extentSeedTime = 0;
90     long classifyPredicateTime = 0;
91     long processFringeTime = 0;
92     long valueOutputTime = 0;
93     long statementOutputTime = 0;
94
95     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
96
97         this.L0 = Layer0.getInstance(graph);
98         this.tgs = graph.getService(TransferableGraphSupport.class);
99
100         this.support = graph.getService(SerialisationSupport.class);
101         this.ignoreVirtual = ignoreVirtual;
102         this.conf = conf;
103
104         if(PROFILE)
105                 startupTime -= System.nanoTime();
106
107         CollectionSupport cs = graph.getService(CollectionSupport.class);
108
109         ids = state.ids;
110         status = cs.createMap(ExtentStatus.class);
111         predicates = cs.createSet();
112         exclusionFunction = conf.exclusionFunction;
113         fringe = new TreeSet<Resource>();
114         isRelatedToPredicates = cs.createMap(Boolean.class);
115         deadPredicates = cs.createSet();
116         strongInverseSet = cs.createSet();
117
118         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
119             // INTERNAL prestatus shall be ignored. Domain processor will initialize statuses based on seeds.
120             if (entry.getValue().equals(ExtentStatus.INTERNAL)) continue;
121             status.put(entry.getKey(), entry.getValue());
122         }
123
124         for(SeedSpec ss : conf.seeds) {
125             if(SeedSpecType.INTERNAL.equals(ss.specType)) continue;
126             // Non-internal resources are not reported as internals by ConsistsOfProcess so they are manually entered into fringe
127             fringe.add(ss.resource);
128             // Roots are classified in status as INTERNAL
129             status.put(ss.resource, ExtentStatus.INTERNAL);
130         }
131
132         if(PROFILE)
133                 startupTime += System.nanoTime();
134
135     }
136     
137     public ResourceMap<ExtentStatus> getStatus() {
138         return status;
139     }
140
141     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
142
143         CollectionSupport cs = graph.getService(CollectionSupport.class);
144         Collection<Resource> list = cs.createList();
145         Iterator<Resource> it = fringe.iterator();
146         for(int i=0;i<maxAmount;i++) {
147             if(!it.hasNext()) break;
148             list.add(it.next());
149             it.remove();
150         }
151
152         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
153
154     }
155
156     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
157
158         if(PROFILE)
159                 expandTime -= System.nanoTime();
160
161         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
162
163         if(PROFILE)
164                 expandTime += System.nanoTime();
165
166         return result;
167
168     }
169
170     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
171
172         for(Resource predicate : schedule) {
173             
174             Boolean isRelatedTo = Boolean.FALSE;
175             
176             Resource single = graph.getPossibleSuperrelation(predicate);
177             if(single != null) {
178                 
179                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
180                 if(singleIsRelatedTo == null) {
181                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
182                         isRelatedToPredicates.put(single, singleIsRelatedTo);
183                 }
184
185                 isRelatedTo = singleIsRelatedTo;
186                 
187             } else {
188                 
189                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
190                         isRelatedTo = Boolean.TRUE;
191                     if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
192                 } else {
193                     if (!graph.hasStatement(predicate)) {
194                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
195                         deadPredicates.add(predicate);
196                         // Prevents ModelTransferableGraphSource from
197                         // trying to export these statements.
198                         state.inverses.remove(support.getTransientId(predicate));
199                     }
200                 }
201                 
202             }
203             
204                 isRelatedToPredicates.put(predicate, isRelatedTo);
205
206         }
207
208     }
209
210     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
211
212         if(PROFILE)
213                 classifyPredicateTime -= System.nanoTime();
214
215         CollectionSupport cs = graph.getService(CollectionSupport.class);
216         final Set<Resource> schedule = cs.createSet();
217         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
218
219         for (DirectStatements stms : expansion) {
220             for(Statement stm : stms) {
221
222                 Resource predicate = stm.getPredicate();
223                 Resource object = stm.getObject();
224
225                 if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue;
226                 if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue;
227                 
228                 if (exclusionFunction != null) {
229                         ExclusionDecision decision = exclusionFunction.apply(stm);
230                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
231                                 status.put(object, ExtentStatus.EXCLUDED);
232                                 continue;
233                         }
234                 }
235
236                 if(predicates.add(predicate)) {
237                     Resource inverse = graph.getPossibleInverse(predicate);
238                     schedule.add(predicate);
239                     if(inverse != null) {
240                         newPredicates.put(predicate, inverse);
241                         if(predicates.add(inverse)) schedule.add(inverse);
242                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
243                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
244                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
245                     } else {
246                         state.inverses.put(support.getTransientId(predicate), 0);
247                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
248                     }
249
250                 }
251
252             }
253         }
254
255         classifyPredicates(graph, schedule, state);
256
257         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
258             // Inverse is strong => this has strong inverse
259                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
260                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
261                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
262                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
263         }
264
265         if(PROFILE)
266                 classifyPredicateTime += System.nanoTime();
267
268     }
269
270     /*
271      * Composed objects are internal. Mark them for expansion.
272      */
273
274     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
275         Resource object = graph.getSingleObject(subject, L0.HasDataType);
276         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
277     }
278
279     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
280         final InputStream valueStream = tgs.getValueStream(graph, subject);
281         if (valueStream != null) {
282             if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
283             state.valueOutput.writeInt(sId);
284
285             if (conf.values) {
286                 Datatype dt = getDatatype(graph, subject);
287
288                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
289                 long rawVariantSizePos = 0;
290                 state.valueOutput.writeByte(canWriteRawVariant
291                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
292                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
293                 if (canWriteRawVariant) {
294                     // Add space for raw variant byte size before the data
295                     rawVariantSizePos = state.valueOutput.position();
296                     state.valueOutput.writeInt(0);
297                 }
298
299                 byte[] typeBytes = bindings.get(dt);
300                 if (typeBytes == null) {
301                     typeBytes = datatypeSerializer.serialize(dt);
302                     bindings.put(dt, typeBytes);
303                 }
304
305                 state.valueOutput.write(typeBytes);
306                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
307                 s.skip(new InputStream() {
308                     @Override
309                     public int read() throws IOException {
310                         int value = valueStream.read();
311                         state.valueOutput.write(value);
312                         return value;
313                     }
314                 });
315
316                 if (canWriteRawVariant) {
317                     long currentPos = state.valueOutput.position();
318                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
319                     state.valueOutput.position(rawVariantSizePos);
320                     state.valueOutput.writeInt(variantSize);
321                     state.valueOutput.position(currentPos);
322                 }
323             }
324
325             state.valueCount++;
326         }
327     }
328
329     private TIntArrayList stream = new TIntArrayList();
330
331     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
332         stream.add(support.getTransientId(predicate));
333         stream.add(support.getTransientId(object));
334     }
335
336     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
337
338         Resource predicate = stm.getPredicate();
339
340         Resource object = stm.getObject();
341
342         ExtentStatus objectStatus = status.get(object);
343
344         // Strong predicate
345         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
346         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
347
348             if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
349
350             addToStream(predicate, object);
351
352             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
353                 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
354                 fringe.add(object);
355             }
356
357         } else {
358
359             // Dead predicate
360             if (deadPredicates.contains(predicate)) {
361                 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
362                 return;
363             }
364
365             // Weak predicate
366             if(objectStatus == ExtentStatus.EXCLUDED) {
367
368                 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
369
370             } else {
371
372                 // The inverse is also weak (or there is no inverse)
373                 if(!strongInverseSet.contains(predicate)) {
374                         
375                         unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
376                     //addToStream(predicate, object);
377
378                     if(objectStatus == null) {
379                         status.put(object, ExtentStatus.PENDING);
380                     }
381
382                     if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
383
384                 } else {
385
386                     if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
387
388                 }
389
390             }
391
392         }
393         
394     }
395     
396     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
397         if(!stream.isEmpty()) {
398             state.statementsOutput.writeInt(sId);
399             int streamSize = stream.size();
400             int statementCount = stream.size() / 2;
401             state.statementsOutput.writeInt(statementCount);
402             for (int i = 0; i < streamSize; i++)
403                 state.statementsOutput.writeInt(stream.getQuick(i));
404             state.statementCount += 2*streamSize;
405             stream.resetQuick();
406         }
407     }
408
409     // For progress monitor book-keeping
410     private long internalResourceNumber = 0;
411     private long startTime = 0;
412     private long lastUpdateTime = 0;
413
414     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
415
416         internalResourceNumber++;
417
418         // Update progress monitor with controlled frequency
419         long t = System.nanoTime();
420         long dt = t - lastUpdateTime;
421         if (dt > 200_000_000L) {
422             if (startTime == 0)
423                 startTime = t;
424             lastUpdateTime = t;
425             double totalTime = (t - startTime) * 1e-9;
426             if (totalTime > 0) {
427                 long speed = Math.round((double)internalResourceNumber / totalTime);
428                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
429             }
430         }
431
432         status.put(subject, ExtentStatus.INTERNAL);
433         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
434
435         int sId = support.getTransientId(subject);
436
437         if(PROFILE)
438                 valueOutputTime -= System.nanoTime();
439
440         processValue(graph, subject, sId, state);
441
442         if(PROFILE)
443                 valueOutputTime += System.nanoTime();
444
445         if(PROFILE)
446                 statementOutputTime -= System.nanoTime();
447
448         for(Statement stm : stms) {
449                 processStatement(graph, subject, stm);
450         }
451
452         flushStatementStream(sId, state);
453
454         if(PROFILE)
455                 statementOutputTime += System.nanoTime();
456
457         // Logarithmic progress monitor for unknown amount of work.
458         state.monitor.setWorkRemaining(100000);
459         state.monitor.worked(1);
460     }
461
462     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
463
464         if(PROFILE)
465                 processFringeTime -= System.nanoTime();
466
467         for (DirectStatements stms : expansion) {
468
469             Resource subject = stms.getSubject();
470
471             boolean partOf = false;
472             for(Statement stm : stms) {
473                 Resource predicate = stm.getPredicate();
474                 if(L0.PartOf.equals(predicate)) {
475                     partOf = true;
476                     break;
477                 }
478             }
479
480             ExtentStatus subjectStatus = status.get(subject);
481             if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
482             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
483             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
484
485                 status.put(subject, ExtentStatus.EXTERNAL);
486                 if(ModelTransferableGraphSourceRequest.DEBUG) {
487                     String uri = graph.getPossibleURI(subject);
488                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
489                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
490                 }
491
492             } else {
493
494                 processInternal(graph, subject, stms, state);
495                 
496             }
497
498         }
499
500         if(PROFILE)
501                 processFringeTime += System.nanoTime();
502
503     }
504
505     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
506
507         try {
508
509             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
510             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
511             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
512
513             Pair<List<ConsistsOfProcessEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual); 
514             state.internalEntries = pair.first;
515             
516             for(ConsistsOfProcessEntry entry : state.internalEntries) {
517                 Resource r = entry.resource;
518                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
519                     if(ModelTransferableGraphSourceRequest.DEBUG) {
520                         String URI = graph.getPossibleURI(r);
521                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
522                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
523                     }
524                     fringe.add(r);
525                 }
526             }
527
528             for(Resource unnamedChild : pair.second) {
529                 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
530                     fringe.add(unnamedChild);
531                 }
532             }
533             
534             if (state.monitor.isCanceled())
535                 throw new CancelTransactionException();
536
537             while(!fringe.isEmpty()) {
538
539                 Collection<DirectStatements> expansion = expand(graph);
540                 classifyPredicates(graph, state, expansion);
541                 processFringe(graph, expansion, state);
542
543                 if (state.monitor.isCanceled())
544                     throw new CancelTransactionException();
545             }
546
547             if (ModelTransferableGraphSourceRequest.PROFILE) {
548                 System.out.println(composedObjectCounter + " " + fastInternalCounter
549                         + " " + parentExternalCounter + " "
550                         + fullExternalCounter + " " + fullInternalCounter);
551             }
552
553         } catch (IOException e) {
554             throw new DatabaseException(e);
555         }
556
557     }
558
559     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
560         if(ModelTransferableGraphSourceRequest.DEBUG) {
561             SerialisationSupport support = graph.getService(SerialisationSupport.class);
562             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
563             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
564             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
565             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
566         }
567     }
568
569     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
570         if(ModelTransferableGraphSourceRequest.DEBUG) {
571             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
572             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
573             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
574             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
575         }
576     }
577
578 }