]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/DomainProcessor3.java
Clean up and support internal seed resources in tg export
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / util / DomainProcessor3.java
1 package org.simantics.db.layer0.util;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Set;
12 import java.util.TreeSet;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.Databoard;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.serialization.Serializer;
18 import org.simantics.databoard.type.Datatype;
19 import org.simantics.db.DirectStatements;
20 import org.simantics.db.ReadGraph;
21 import org.simantics.db.Resource;
22 import org.simantics.db.ResourceMap;
23 import org.simantics.db.Statement;
24 import org.simantics.db.common.StandardStatement;
25 import org.simantics.db.common.primitiverequest.Value;
26 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
27 import org.simantics.db.common.utils.NameUtils;
28 import org.simantics.db.exception.CancelTransactionException;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
31 import org.simantics.db.layer0.util.ConsistsOfProcess.ConsistsOfProcessEntry;
32 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest.Expansion3;
33 import org.simantics.db.service.CollectionSupport;
34 import org.simantics.db.service.SerialisationSupport;
35 import org.simantics.db.service.TransferableGraphSupport;
36 import org.simantics.graph.db.TransferableGraphSource;
37 import org.simantics.layer0.Layer0;
38 import org.simantics.scl.runtime.function.Function1;
39 import org.simantics.utils.datastructures.Pair;
40
41 import gnu.trove.list.array.TIntArrayList;
42 import gnu.trove.map.hash.TIntIntHashMap;
43
44 public class DomainProcessor3 {
45         
46         public enum ExclusionDecision {
47                 INCLUDE, EXCLUDE_OBJECT
48         }
49
50         final static private boolean PROFILE = false;
51
52     Serializer variantSerializer;
53     Serializer datatypeSerializer;
54     Binding datatypeBinding;
55     boolean ignoreVirtual;
56
57     int id = 0;
58
59     Set<Resource> fringe = null;
60     Function1<Statement,ExclusionDecision> exclusionFunction = null;
61     Set<Resource> predicates = null;
62     Map<Resource,Boolean> isRelatedToPredicates = null;
63     Set<Resource> deadPredicates = null;
64     Set<Resource> strongInverseSet = null;
65     List<Statement> unresolvedWeakLinks = new ArrayList<>();
66     TIntIntHashMap ids = null;
67     ResourceMap<ExtentStatus> status = null;
68     Map<Datatype, byte[]> bindings = new HashMap<Datatype, byte[]>();
69     final SerialisationSupport support;
70     final TransferableGraphConfiguration2 conf;
71     final TransferableGraphSupport tgs;
72
73     private Layer0 L0;
74
75     private long composedObjectCounter = 0;
76     private long fastInternalCounter = 0;
77     private long parentExternalCounter = 0;
78     private long fullInternalCounter = 0;
79     private long fullExternalCounter = 0;
80
81     long startupTime = 0;
82     long expandTime = 0;
83     long fullResolveTime = 0;
84     long fastResolveTime = 0;
85     long otherStatementTime = 0;
86     long parentResolveTime = 0;
87     long extentSeedTime = 0;
88     long classifyPredicateTime = 0;
89     long processFringeTime = 0;
90     long valueOutputTime = 0;
91     long statementOutputTime = 0;
92
93     public DomainProcessor3(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
94
95         this.L0 = Layer0.getInstance(graph);
96         this.tgs = graph.getService(TransferableGraphSupport.class);
97
98         this.support = graph.getService(SerialisationSupport.class);
99         this.ignoreVirtual = ignoreVirtual;
100         this.conf = conf;
101
102         if(PROFILE)
103                 startupTime -= System.nanoTime();
104
105         CollectionSupport cs = graph.getService(CollectionSupport.class);
106
107         ids = state.ids;
108         status = cs.createMap(ExtentStatus.class);
109         predicates = cs.createSet();
110         exclusionFunction = conf.exclusionFunction;
111         fringe = new TreeSet<Resource>();
112         isRelatedToPredicates = cs.createMap(Boolean.class);
113         deadPredicates = cs.createSet();
114         strongInverseSet = cs.createSet();
115
116         for(Map.Entry<Resource, ExtentStatus> entry : conf.preStatus.entrySet()) {
117             status.put(entry.getKey(), entry.getValue());
118         }
119
120         if(PROFILE)
121                 startupTime += System.nanoTime();
122
123     }
124     
125     public ResourceMap<ExtentStatus> getStatus() {
126         return status;
127     }
128
129     public Collection<DirectStatements> extractFromFringe(ReadGraph graph, int maxAmount) throws DatabaseException {
130
131         CollectionSupport cs = graph.getService(CollectionSupport.class);
132         Collection<Resource> list = cs.createList();
133         Iterator<Resource> it = fringe.iterator();
134         for(int i=0;i<maxAmount;i++) {
135             if(!it.hasNext()) break;
136             list.add(it.next());
137             it.remove();
138         }
139
140         return graph.syncRequest(new Expansion3(list, ignoreVirtual));
141
142     }
143
144     public Collection<DirectStatements> expand(ReadGraph graph) throws DatabaseException {
145
146         if(PROFILE)
147                 expandTime -= System.nanoTime();
148
149         Collection<DirectStatements> result = extractFromFringe(graph, 2<<12);
150
151         if(PROFILE)
152                 expandTime += System.nanoTime();
153
154         return result;
155
156     }
157
158     public void classifyPredicates(ReadGraph graph, final Set<Resource> schedule, DomainProcessorState state) throws DatabaseException {
159
160         for(Resource predicate : schedule) {
161             
162             Boolean isRelatedTo = Boolean.FALSE;
163             
164             Resource single = graph.getPossibleSuperrelation(predicate);
165             if(single != null) {
166                 
167                 Boolean singleIsRelatedTo = isRelatedToPredicates.get(single);
168                 if(singleIsRelatedTo == null) {
169                         singleIsRelatedTo = graph.isSubrelationOf(single, L0.IsRelatedTo);
170                         isRelatedToPredicates.put(single, singleIsRelatedTo);
171                 }
172
173                 isRelatedTo = singleIsRelatedTo;
174                 
175             } else {
176                 
177                 if(graph.isSubrelationOf(predicate, L0.IsRelatedTo)) {
178                         isRelatedTo = Boolean.TRUE;
179                     if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("isRelatedToPredicates4 => " + NameUtils.getSafeName(graph, predicate));
180                 } else {
181                     if (!graph.hasStatement(predicate)) {
182                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("FOUND DEAD PREDICATE (no statements): " + predicate);
183                         deadPredicates.add(predicate);
184                         // Prevents ModelTransferableGraphSource from
185                         // trying to export these statements.
186                         state.inverses.remove(support.getTransientId(predicate));
187                     }
188                 }
189                 
190             }
191             
192                 isRelatedToPredicates.put(predicate, isRelatedTo);
193
194         }
195
196     }
197
198     public void classifyPredicates(ReadGraph graph, DomainProcessorState state, final Collection<DirectStatements> expansion) throws DatabaseException {
199
200         if(PROFILE)
201                 classifyPredicateTime -= System.nanoTime();
202
203         CollectionSupport cs = graph.getService(CollectionSupport.class);
204         final Set<Resource> schedule = cs.createSet();
205         final Map<Resource, Resource> newPredicates = cs.createMap(Resource.class);
206
207         for (DirectStatements stms : expansion) {
208             for(Statement stm : stms) {
209
210                 Resource predicate = stm.getPredicate();
211                 Resource object = stm.getObject();
212
213                 if(ExtentStatus.EXCLUDED.equals(status.get(predicate))) continue;
214                 if(ExtentStatus.EXCLUDED.equals(status.get(object))) continue;
215                 
216                 if (exclusionFunction != null) {
217                         ExclusionDecision decision = exclusionFunction.apply(stm);
218                         if(ExclusionDecision.EXCLUDE_OBJECT.equals(decision)) {
219                                 status.put(object, ExtentStatus.EXCLUDED);
220                                 continue;
221                         }
222                 }
223
224                 if(predicates.add(predicate)) {
225                     Resource inverse = graph.getPossibleInverse(predicate);
226                     schedule.add(predicate);
227                     if(inverse != null) {
228                         newPredicates.put(predicate, inverse);
229                         if(predicates.add(inverse)) schedule.add(inverse);
230                         state.inverses.put(support.getTransientId(predicate), support.getTransientId(inverse));
231                         state.inverses.put(support.getTransientId(inverse), support.getTransientId(predicate));
232                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("INVERSE FOR " + graph.getPossibleURI(predicate) + " => " + graph.getPossibleURI(inverse));
233                     } else {
234                         state.inverses.put(support.getTransientId(predicate), 0);
235                         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("NO INVERSE FOR " + graph.getPossibleURI(predicate));
236                     }
237
238                 }
239
240             }
241         }
242
243         classifyPredicates(graph, schedule, state);
244
245         for(Map.Entry<Resource, Resource> entry : newPredicates.entrySet()) {
246             // Inverse is strong => this has strong inverse
247                 Boolean isRelatedToValue = isRelatedToPredicates.get(entry.getValue());
248                 if(isRelatedToValue) strongInverseSet.add(entry.getKey());
249                 Boolean isRelatedToKey = isRelatedToPredicates.get(entry.getKey());
250                 if(isRelatedToKey) strongInverseSet.add(entry.getValue());
251         }
252
253         if(PROFILE)
254                 classifyPredicateTime += System.nanoTime();
255
256     }
257
258     /*
259      * Composed objects are internal. Mark them for expansion.
260      */
261
262     private Datatype getDatatype(ReadGraph graph, Resource subject) throws DatabaseException {
263         Resource object = graph.getSingleObject(subject, L0.HasDataType);
264         return graph.syncRequest(new Value<Datatype>(object, datatypeBinding), TransientCacheListener.<Datatype>instance());
265     }
266
267     public void processValue(ReadGraph graph, Resource subject, int sId, final DomainProcessorState state) throws DatabaseException, IOException {
268         final InputStream valueStream = tgs.getValueStream(graph, subject);
269         if (valueStream != null) {
270             if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[VALUE] " + NameUtils.getSafeName(graph, subject, true));
271             state.valueOutput.writeInt(sId);
272
273             if (conf.values) {
274                 Datatype dt = getDatatype(graph, subject);
275
276                 boolean canWriteRawVariant = !state.valueModifier.mayNeedModification(dt);
277                 long rawVariantSizePos = 0;
278                 state.valueOutput.writeByte(canWriteRawVariant
279                         ? TransferableGraphSource.TAG_RAW_COPY_VARIANT_VALUE
280                         : TransferableGraphSource.TAG_POTENTIALLY_MODIFIED_VARIANT_VALUE);
281                 if (canWriteRawVariant) {
282                     // Add space for raw variant byte size before the data
283                     rawVariantSizePos = state.valueOutput.position();
284                     state.valueOutput.writeInt(0);
285                 }
286
287                 byte[] typeBytes = bindings.get(dt);
288                 if (typeBytes == null) {
289                     typeBytes = datatypeSerializer.serialize(dt);
290                     bindings.put(dt, typeBytes);
291                 }
292
293                 state.valueOutput.write(typeBytes);
294                 Serializer s = Bindings.getSerializerUnchecked(Bindings.getBinding(dt));
295                 s.skip(new InputStream() {
296                     @Override
297                     public int read() throws IOException {
298                         int value = valueStream.read();
299                         state.valueOutput.write(value);
300                         return value;
301                     }
302                 });
303
304                 if (canWriteRawVariant) {
305                     long currentPos = state.valueOutput.position();
306                     int variantSize = (int)(currentPos - rawVariantSizePos - 4);
307                     state.valueOutput.position(rawVariantSizePos);
308                     state.valueOutput.writeInt(variantSize);
309                     state.valueOutput.position(currentPos);
310                 }
311             }
312
313             state.valueCount++;
314         }
315     }
316
317     private TIntArrayList stream = new TIntArrayList();
318
319     public void addToStream(Resource predicate, Resource object) throws DatabaseException {
320         stream.add(support.getTransientId(predicate));
321         stream.add(support.getTransientId(object));
322     }
323
324     public void processStatement(ReadGraph graph, Resource subject, Statement stm) throws DatabaseException, IOException {
325
326         Resource predicate = stm.getPredicate();
327
328         Resource object = stm.getObject();
329
330         ExtentStatus objectStatus = status.get(object);
331
332         // Strong predicate
333         Boolean isRelatedTo = isRelatedToPredicates.get(predicate);
334         if ((objectStatus !=  ExtentStatus.EXCLUDED) && isRelatedTo) {
335
336             if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "related", objectStatus, subject, predicate, object);
337
338             addToStream(predicate, object);
339
340             if(objectStatus == null || objectStatus == ExtentStatus.PENDING) {
341                 if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[ADDED TO FRINGE] " + NameUtils.getSafeName(graph, object));
342                 fringe.add(object);
343             }
344
345         } else {
346
347             // Dead predicate
348             if (deadPredicates.contains(predicate)) {
349                 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "excluding statement with dead predicate ", objectStatus, subject, predicate, object);
350                 return;
351             }
352
353             // Weak predicate
354             if(objectStatus == ExtentStatus.EXCLUDED) {
355
356                 if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "weak reference to excluded object ", objectStatus, subject, predicate, object);
357
358             } else {
359
360                 // The inverse is also weak (or there is no inverse)
361                 if(!strongInverseSet.contains(predicate)) {
362                         
363                         unresolvedWeakLinks.add(new StandardStatement(subject, predicate, object));
364                     //addToStream(predicate, object);
365
366                     if(objectStatus == null) {
367                         status.put(object, ExtentStatus.PENDING);
368                     }
369
370                     if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "fully weak internal", objectStatus, subject, predicate, object);
371
372                 } else {
373
374                     if(ModelTransferableGraphSourceRequest.DEBUG) logStatementWithExtent(graph, "strong inverse internal ", objectStatus, subject, predicate, object);
375
376                 }
377
378             }
379
380         }
381         
382     }
383     
384     public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
385         if(!stream.isEmpty()) {
386             state.statementsOutput.writeInt(sId);
387             int streamSize = stream.size();
388             int statementCount = stream.size() / 2;
389             state.statementsOutput.writeInt(statementCount);
390             for (int i = 0; i < streamSize; i++)
391                 state.statementsOutput.writeInt(stream.getQuick(i));
392             state.statementCount += 2*streamSize;
393             stream.resetQuick();
394         }
395     }
396
397     // For progress monitor book-keeping
398     private long internalResourceNumber = 0;
399     private long startTime = 0;
400     private long lastUpdateTime = 0;
401
402     public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
403
404         internalResourceNumber++;
405
406         // Update progress monitor with controlled frequency
407         long t = System.nanoTime();
408         long dt = t - lastUpdateTime;
409         if (dt > 200_000_000L) {
410             if (startTime == 0)
411                 startTime = t;
412             lastUpdateTime = t;
413             double totalTime = (t - startTime) * 1e-9;
414             if (totalTime > 0) {
415                 long speed = Math.round((double)internalResourceNumber / totalTime);
416                 state.monitor.subTask("Included " + internalResourceNumber + " resources (" + speed + " resources/s)");
417             }
418         }
419
420         status.put(subject, ExtentStatus.INTERNAL);
421         if(ModelTransferableGraphSourceRequest.DEBUG) ModelTransferableGraphSourceRequest.log("[INTERNAL] " + NameUtils.getSafeName(graph, subject, true));
422
423         int sId = support.getTransientId(subject);
424
425         if(PROFILE)
426                 valueOutputTime -= System.nanoTime();
427
428         processValue(graph, subject, sId, state);
429
430         if(PROFILE)
431                 valueOutputTime += System.nanoTime();
432
433         if(PROFILE)
434                 statementOutputTime -= System.nanoTime();
435
436         for(Statement stm : stms) {
437                 processStatement(graph, subject, stm);
438         }
439
440         flushStatementStream(sId, state);
441
442         if(PROFILE)
443                 statementOutputTime += System.nanoTime();
444
445         // Logarithmic progress monitor for unknown amount of work.
446         state.monitor.setWorkRemaining(100000);
447         state.monitor.worked(1);
448     }
449
450     public void processFringe(ReadGraph graph, Collection<DirectStatements> expansion, final DomainProcessorState state) throws DatabaseException, IOException {
451
452         if(PROFILE)
453                 processFringeTime -= System.nanoTime();
454
455         for (DirectStatements stms : expansion) {
456
457             Resource subject = stms.getSubject();
458
459             boolean partOf = false;
460             for(Statement stm : stms) {
461                 Resource predicate = stm.getPredicate();
462                 if(L0.PartOf.equals(predicate)) {
463                     partOf = true;
464                     break;
465                 }
466             }
467
468             ExtentStatus subjectStatus = status.get(subject);
469             if(ModelTransferableGraphSourceRequest.DEBUG && subjectStatus != null) ModelTransferableGraphSourceRequest.log("EXISTING STATUS FOR " + graph.getPossibleURI(subject) + " - " + subjectStatus);
470             if(subjectStatus == ExtentStatus.EXTERNAL || subjectStatus == ExtentStatus.EXCLUDED) continue;
471             if(partOf && (subjectStatus == null || ExtentStatus.PENDING == subjectStatus) && graph.getPossibleURI(subject) != null) {
472
473                 status.put(subject, ExtentStatus.EXTERNAL);
474                 if(ModelTransferableGraphSourceRequest.DEBUG) {
475                     String uri = graph.getPossibleURI(subject);
476                     if(uri == null) ModelTransferableGraphSourceRequest.log("[EXTERNAL]: No URI for " + subject);
477                     else ModelTransferableGraphSourceRequest.log("[EXTERNAL] " + uri);
478                 }
479
480             } else {
481
482                 processInternal(graph, subject, stms, state);
483                 
484             }
485
486         }
487
488         if(PROFILE)
489                 processFringeTime += System.nanoTime();
490
491     }
492
493     public void process(ReadGraph graph, DomainProcessorState state) throws DatabaseException {
494
495         try {
496
497             this.variantSerializer = graph.getService(Databoard.class).getSerializerUnchecked(Bindings.VARIANT);
498             this.datatypeBinding = Bindings.getBindingUnchecked(Datatype.class);
499             this.datatypeSerializer = graph.getService(Databoard.class).getSerializerUnchecked(this.datatypeBinding);
500
501             Pair<List<ConsistsOfProcessEntry>,Set<Resource>> pair = ConsistsOfProcess.walk(graph, status, conf.seeds, ignoreVirtual); 
502             state.internalEntries = pair.first;
503             
504             for(ConsistsOfProcessEntry entry : state.internalEntries) {
505                 Resource r = entry.resource;
506                 if (status.put(r, ExtentStatus.INTERNAL) == null) {
507                     if(ModelTransferableGraphSourceRequest.DEBUG) {
508                         String URI = graph.getPossibleURI(r);
509                         if(URI != null) ModelTransferableGraphSourceRequest.log("URI INTERNAL " + URI);
510                         else ModelTransferableGraphSourceRequest.log("URI has no URI for " + r);
511                     }
512                     fringe.add(r);
513                 }
514             }
515
516             for(Resource unnamedChild : pair.second) {
517                 if (status.put(unnamedChild, ExtentStatus.INTERNAL) == null) {
518                     fringe.add(unnamedChild);
519                 }
520             }
521             
522             if (state.monitor.isCanceled())
523                 throw new CancelTransactionException();
524
525             while(!fringe.isEmpty()) {
526
527                 Collection<DirectStatements> expansion = expand(graph);
528                 classifyPredicates(graph, state, expansion);
529                 processFringe(graph, expansion, state);
530
531                 if (state.monitor.isCanceled())
532                     throw new CancelTransactionException();
533             }
534
535             if (ModelTransferableGraphSourceRequest.PROFILE) {
536                 System.out.println(composedObjectCounter + " " + fastInternalCounter
537                         + " " + parentExternalCounter + " "
538                         + fullExternalCounter + " " + fullInternalCounter);
539             }
540
541         } catch (IOException e) {
542             throw new DatabaseException(e);
543         }
544
545     }
546
547     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, int sId, int pId, int oId) throws DatabaseException {
548         if(ModelTransferableGraphSourceRequest.DEBUG) {
549             SerialisationSupport support = graph.getService(SerialisationSupport.class);
550             String s = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(sId));
551             String p = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(pId));
552             String o = NameUtils.getURIOrSafeNameInternal(graph, support.getResource(oId));
553             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
554         }
555     }
556
557     void logStatementWithExtent(ReadGraph graph, String header, ExtentStatus status, Resource sId, Resource pId, Resource oId) throws DatabaseException {
558         if(ModelTransferableGraphSourceRequest.DEBUG) {
559             String s = NameUtils.getURIOrSafeNameInternal(graph, sId);
560             String p = NameUtils.getURIOrSafeNameInternal(graph, pId);
561             String o = NameUtils.getURIOrSafeNameInternal(graph, oId);
562             ModelTransferableGraphSourceRequest.log(header + " [" + status + "] " + s + " - " + p + " - " + o);
563         }
564     }
565
566 }