1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.HashMap;
6 import java.util.concurrent.Semaphore;
8 class Scheduler extends ThreadLocal<TaskList> {
10 private final QueryProcessor processor;
12 private static final int MAX_TASKLIST_SIZE = 1<<8;
15 * New TaskList objects shall be assigned this size.
16 * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
17 * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
19 private int currentTaskListSize = 1;
22 * Anyone wishing to execute any tasks needs to hold this monitor
24 private Object taskExecutionMonitor = new Object();
27 * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
29 private Object taskDispatchMonitor = new Object();
32 * When a thread-specific TaskList becomes full it is added into this list for execution.
33 * The executor thread takes TaskLists from this list.
34 * The sync operation can also execute remaining tasks.
36 private ArrayList<TaskList> dispatchedTaskLists;
39 * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
41 private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
44 * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
46 private final Semaphore taskDispatchNotifications = new Semaphore(0);
48 Scheduler(QueryProcessor processor) {
49 this.processor = processor;
53 * This gets called by multiple query threads
55 void accept(Runnable task) {
58 if(l.shouldSchedule()) {
65 * A task list has reached full size and is moved into execution by the execution thread
67 void sendToExecution() {
68 synchronized(activeTaskLists) {
69 // This is dispatched so it should not anymore be in list of active TaskList objects.
70 TaskList l = activeTaskLists.remove(Thread.currentThread());
71 synchronized(taskDispatchMonitor) {
72 if(dispatchedTaskLists == null)
73 dispatchedTaskLists = new ArrayList<TaskList>();
74 dispatchedTaskLists.add(l);
76 if(currentTaskListSize < MAX_TASKLIST_SIZE)
77 currentTaskListSize = currentTaskListSize<<1;
79 // Release the execution thread
80 taskDispatchNotifications.release();
84 * Gets dispatched TaskList objects for execution.
85 * Returns null when there's nothing to execute (and thread shall go to sleep).
87 ArrayList<TaskList> getDispatchedQueues() {
88 synchronized(taskDispatchMonitor) {
89 if(dispatchedTaskLists == null)
91 ArrayList<TaskList> result = dispatchedTaskLists;
92 dispatchedTaskLists = null;
99 Thread thread = new Thread() {
103 // We execute tasks as long as the query processor is alive
104 while(!processor.isDisposed()) {
108 // This is a mutex between this place and sync()
109 synchronized(taskExecutionMonitor) {
111 // Execute busily while there is something to execute
114 ArrayList<TaskList> qs = getDispatchedQueues();
116 // No tasks - go to sleep
120 // Execute everything we got
121 for(TaskList queue : qs) {
122 for(Runnable r : queue) {
131 // sendToExecution shall release this
132 taskDispatchNotifications.acquire();
134 } catch (InterruptedException e) {
135 QueryListening.LOGGER.error("Unexpected interrupt", e);
144 thread.setName("QueryListening");
150 * ThreadLocal calls this to create a TaskList.
151 * Called with first list per thread and also when accepting next task after sendToExecution.
154 protected TaskList initialValue() {
155 TaskList result = new TaskList(currentTaskListSize);
156 synchronized(activeTaskLists) {
157 activeTaskLists.put(Thread.currentThread(), result);
163 * This makes sure that all tasks have been executed upon return.
164 * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
165 * also executes anything in active task lists. It resets the dispatch sizes of current and
166 * subsequently created task lists to 1.
170 // This is a mutex between this place and execution thread
171 synchronized(taskExecutionMonitor) {
173 currentTaskListSize = 1;
175 // Perform all dispatched work
176 ArrayList<TaskList> qs = getDispatchedQueues();
178 for(TaskList queue : qs) {
179 for(Runnable r : queue) {
185 // Also perform any tasks in active queues.
186 // This assumes that all query threads that send new tasks are now parked and shall not interfere.
187 synchronized(activeTaskLists) {
188 for(TaskList queue : activeTaskLists.values()) {
190 for(Runnable r : queue) {
194 // Remove the tasks from list and reset the max size to 1.