]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / Scheduler.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Map;
6 import java.util.concurrent.Semaphore;
7
8 class Scheduler extends ThreadLocal<TaskList> {
9
10     private final QueryProcessor processor;
11
12     private static final int MAX_TASKLIST_SIZE = 1<<8;
13
14     /*
15      * New TaskList objects shall be assigned this size.
16      * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
17      * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
18      */
19     private int currentTaskListSize = 1;
20
21     /*
22      * Anyone wishing to execute any tasks needs to hold this monitor
23      */
24     private Object taskExecutionMonitor = new Object();
25
26     /*
27      * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
28      */
29     private Object taskDispatchMonitor = new Object();
30
31     /*
32      * When a thread-specific TaskList becomes full it is added into this list for execution.
33      * The executor thread takes TaskLists from this list.
34      * The sync operation can also execute remaining tasks.
35      */
36     private ArrayList<TaskList> dispatchedTaskLists;
37
38     /*
39      * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
40      */
41     private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
42
43     /*
44      * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
45      */
46     private final Semaphore taskDispatchNotifications = new Semaphore(0);
47
48     Scheduler(QueryProcessor processor) {
49         this.processor = processor;
50     }
51
52     /*
53      * This gets called by multiple query threads
54      */
55     void accept(Runnable task) {
56         TaskList l = get();
57         l.add(task);
58         if(l.shouldSchedule()) {
59             remove();
60             sendToExecution();
61         }
62     }
63
64     /*
65      * A task list has reached full size and is moved into execution by the execution thread
66      */
67     void sendToExecution() {
68         synchronized(activeTaskLists) {
69             // This is dispatched so it should not anymore be in list of active TaskList objects.
70             TaskList l = activeTaskLists.remove(Thread.currentThread());
71             synchronized(taskDispatchMonitor) {
72                 if(dispatchedTaskLists == null)
73                     dispatchedTaskLists = new ArrayList<TaskList>();
74                 dispatchedTaskLists.add(l);
75             }
76             if(currentTaskListSize < MAX_TASKLIST_SIZE)
77                 currentTaskListSize = currentTaskListSize<<1;
78         }
79         // Release the execution thread
80         taskDispatchNotifications.release();
81     }
82
83     /*
84      * Gets dispatched TaskList objects for execution.
85      * Returns null when there's nothing to execute (and thread shall go to sleep).
86      */
87     ArrayList<TaskList> getDispatchedQueues() {
88         synchronized(taskDispatchMonitor) {
89             if(dispatchedTaskLists == null)
90                 return null;
91             ArrayList<TaskList> result = dispatchedTaskLists;
92             dispatchedTaskLists = null;
93             return result;
94         }
95     }
96
97     void start() {
98
99         Thread thread = new Thread() {
100
101             public void run() {
102
103                 // We execute tasks as long as the query processor is alive
104                 while(!processor.isDisposed()) {
105
106                     try {
107
108                         // This is a mutex between this place and sync()
109                         synchronized(taskExecutionMonitor) {
110
111                             // Execute busily while there is something to execute
112                             while(true) {
113
114                                 ArrayList<TaskList> qs = getDispatchedQueues();
115                                 if(qs == null) {
116                                     // No tasks - go to sleep
117                                     break;
118                                 }
119
120                                 // Execute everything we got
121                                 for(TaskList queue : qs) {
122                                     for(Runnable r : queue) {
123                                         r.run();
124                                     }
125                                 }
126
127                             }
128
129                         }
130
131                         // sendToExecution shall release this 
132                         taskDispatchNotifications.acquire();
133
134                     } catch (InterruptedException e) {
135                         QueryListening.LOGGER.error("Unexpected interrupt", e);
136                     }
137
138                 }
139
140             };
141
142         };
143
144         thread.setName("QueryListening");
145         thread.start();
146
147     }
148
149     /*
150      * ThreadLocal calls this to create a TaskList.
151      * Called with first list per thread and also when accepting next task after sendToExecution. 
152      */
153     @Override
154     protected TaskList initialValue() {
155         TaskList result = new TaskList(currentTaskListSize);
156         synchronized(activeTaskLists) {
157             activeTaskLists.put(Thread.currentThread(), result);
158         }
159         return result;
160     }
161
162     /*
163      * This makes sure that all tasks have been executed upon return.
164      * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
165      * also executes anything in active task lists. It resets the dispatch sizes of current and
166      * subsequently created task lists to 1.
167      */
168     void sync() {
169
170         // This is a mutex between this place and execution thread
171         synchronized(taskExecutionMonitor) {
172
173             currentTaskListSize = 1;
174
175             // Perform all dispatched work
176             ArrayList<TaskList> qs = getDispatchedQueues();
177             if(qs != null) {
178                 for(TaskList queue : qs) {
179                     for(Runnable r : queue) {
180                         r.run();
181                     }
182                 }
183             }
184
185             // Also perform any tasks in active queues.
186             // This assumes that all query threads that send new tasks are now parked and shall not interfere.
187             synchronized(activeTaskLists) {
188                 for(TaskList queue : activeTaskLists.values()) {
189                     // Run all tasks
190                     for(Runnable r : queue) {
191                         if(r != null)
192                             r.run();
193                     }
194                     // Remove the tasks from list and reset the max size to 1.
195                     queue.reset();
196                 }
197             }
198
199         }
200
201     }
202
203 }