]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java
DB request scheduling scheme fails with district diagrams
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / Scheduling.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.LinkedList;
6 import java.util.Map;
7 import java.util.concurrent.Semaphore;
8
9 import org.simantics.db.impl.graph.BarrierTracing;
10 import org.simantics.db.impl.graph.ReadGraphImpl;
11 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
13
14 public class Scheduling {
15     
16     private final Semaphore                                  requests;
17
18     private Map<AsyncBarrier, LinkedList<SessionTask>>        freeScheduling = new HashMap<>();
19     
20     private LinkedList<SessionTask>                          topLevelTasks = new LinkedList<SessionTask>();
21
22     public Scheduling(Semaphore requests) {
23         this.requests = requests;
24     }
25     
26     public SessionTask getSubTask(ReadGraphImpl parent) {
27         synchronized(this) {
28             assert(parent.asyncBarrier.isBlocking());
29             LinkedList<SessionTask> tasks = freeScheduling.get(parent.asyncBarrier);
30             if(tasks == null)
31                 return null;
32             SessionTask task = tasks.removeLast();
33             if(tasks.isEmpty())
34                 freeScheduling.remove(parent.asyncBarrier);
35             return task;
36         }
37     }
38
39     public boolean pumpTask(ArrayList<SessionTask> tasks) {
40         
41         synchronized(this) {
42
43             // First finish existing executions
44             if(!freeScheduling.isEmpty()) {
45                 Map.Entry<AsyncBarrier, LinkedList<SessionTask>> ls =  freeScheduling.entrySet().iterator().next();
46                 assert(ls.getKey().isBlocking());
47                 tasks.add(ls.getValue().removeLast());
48                 if(ls.getValue().isEmpty())
49                     freeScheduling.remove(ls.getKey());
50                 return true;
51             }
52             // Check for new tasks
53             if(!topLevelTasks.isEmpty()) {
54                 tasks.add(topLevelTasks.removeLast());
55                 return true;
56             }
57
58             return false;
59
60         }
61         
62     }
63
64     final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
65         
66         assert(request != null);
67
68         synchronized(this) {
69
70             if(BarrierTracing.BOOKKEEPING) {
71                 Exception current = new Exception();
72                 Exception previous = BarrierTracing.tasks.put(request, current);
73                 if(previous != null) {
74                     previous.printStackTrace();
75                     current.printStackTrace();
76                 }
77             }
78
79             requests.release();
80
81             if(request.rootGraph != null) {
82                 AsyncBarrier sb = request.rootGraph.asyncBarrier.getBlockingBarrier();
83                 if(sb != null) {
84                     LinkedList<SessionTask> ls = freeScheduling.get(sb);
85                     if(ls == null) {
86                         ls = new LinkedList<SessionTask>();
87                         freeScheduling.put(sb, ls);
88                     }
89                     ls.addFirst(request);
90                     return null;
91                 }
92             }
93             
94             topLevelTasks.addFirst(request);
95  
96         }
97
98         return null;
99
100     }
101
102 }