+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+class Scheduler extends ThreadLocal<TaskList> {
+
+ private final QueryProcessor processor;
+
+ private static final int MAX_TASKLIST_SIZE = 1<<8;
+
+ /*
+ * New TaskList objects shall be assigned this size.
+ * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
+ * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
+ */
+ private int currentTaskListSize = 1;
+
+ /*
+ * Anyone wishing to execute any tasks needs to hold this monitor
+ */
+ private Object taskExecutionMonitor = new Object();
+
+ /*
+ * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
+ */
+ private Object taskDispatchMonitor = new Object();
+
+ /*
+ * When a thread-specific TaskList becomes full it is added into this list for execution.
+ * The executor thread takes TaskLists from this list.
+ * The sync operation can also execute remaining tasks.
+ */
+ private ArrayList<TaskList> dispatchedTaskLists;
+
+ /*
+ * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
+ */
+ private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
+
+ /*
+ * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
+ */
+ private final Semaphore taskDispatchNotifications = new Semaphore(0);
+
+ Scheduler(QueryProcessor processor) {
+ this.processor = processor;
+ }
+
+ /*
+ * This gets called by multiple query threads
+ */
+ void accept(Runnable task) {
+ TaskList l = get();
+ l.add(task);
+ if(l.shouldSchedule()) {
+ remove();
+ sendToExecution();
+ }
+ }
+
+ /*
+ * A task list has reached full size and is moved into execution by the execution thread
+ */
+ void sendToExecution() {
+ synchronized(activeTaskLists) {
+ // This is dispatched so it should not anymore be in list of active TaskList objects.
+ TaskList l = activeTaskLists.remove(Thread.currentThread());
+ synchronized(taskDispatchMonitor) {
+ if(dispatchedTaskLists == null)
+ dispatchedTaskLists = new ArrayList<TaskList>();
+ dispatchedTaskLists.add(l);
+ }
+ if(currentTaskListSize < MAX_TASKLIST_SIZE)
+ currentTaskListSize = currentTaskListSize<<1;
+ }
+ // Release the execution thread
+ taskDispatchNotifications.release();
+ }
+
+ /*
+ * Gets dispatched TaskList objects for execution.
+ * Returns null when there's nothing to execute (and thread shall go to sleep).
+ */
+ ArrayList<TaskList> getDispatchedQueues() {
+ synchronized(taskDispatchMonitor) {
+ if(dispatchedTaskLists == null)
+ return null;
+ ArrayList<TaskList> result = dispatchedTaskLists;
+ dispatchedTaskLists = null;
+ return result;
+ }
+ }
+
+ void start() {
+
+ Thread thread = new Thread() {
+
+ public void run() {
+
+ // We execute tasks as long as the query processor is alive
+ while(!processor.isDisposed()) {
+
+ try {
+
+ // This is a mutex between this place and sync()
+ synchronized(taskExecutionMonitor) {
+
+ // Execute busily while there is something to execute
+ while(true) {
+
+ ArrayList<TaskList> qs = getDispatchedQueues();
+ if(qs == null) {
+ // No tasks - go to sleep
+ break;
+ }
+
+ // Execute everything we got
+ for(TaskList queue : qs) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ }
+
+ }
+
+ }
+
+ // sendToExecution shall release this
+ taskDispatchNotifications.acquire();
+
+ } catch (InterruptedException e) {
+ QueryListening.LOGGER.error("Unexpected interrupt", e);
+ }
+
+ }
+
+ };
+
+ };
+
+ thread.setName("QueryListening");
+ thread.start();
+
+ }
+
+ /*
+ * ThreadLocal calls this to create a TaskList.
+ * Called with first list per thread and also when accepting next task after sendToExecution.
+ */
+ @Override
+ protected TaskList initialValue() {
+ TaskList result = new TaskList(currentTaskListSize);
+ synchronized(activeTaskLists) {
+ activeTaskLists.put(Thread.currentThread(), result);
+ }
+ return result;
+ }
+
+ /*
+ * This makes sure that all tasks have been executed upon return.
+ * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
+ * also executes anything in active task lists. It resets the dispatch sizes of current and
+ * subsequently created task lists to 1.
+ */
+ void sync() {
+
+ // This is a mutex between this place and execution thread
+ synchronized(taskExecutionMonitor) {
+
+ currentTaskListSize = 1;
+
+ // Perform all dispatched work
+ ArrayList<TaskList> qs = getDispatchedQueues();
+ if(qs != null) {
+ for(TaskList queue : qs) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ }
+ }
+
+ // Also perform any tasks in active queues.
+ // This assumes that all query threads that send new tasks are now parked and shall not interfere.
+ synchronized(activeTaskLists) {
+ for(TaskList queue : activeTaskLists.values()) {
+ // Run all tasks
+ for(Runnable r : queue) {
+ if(r != null)
+ r.run();
+ }
+ // Remove the tasks from list and reset the max size to 1.
+ queue.reset();
+ }
+ }
+
+ }
+
+ }
+
+}
\ No newline at end of file