package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Semaphore; class Scheduler extends ThreadLocal { private final QueryProcessor processor; private static final int MAX_TASKLIST_SIZE = 1<<8; /* * New TaskList objects shall be assigned this size. * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks. * The intention is to make sure that upon frequent syncs the tasks shall be scheduled */ private int currentTaskListSize = 1; /* * Anyone wishing to execute any tasks needs to hold this monitor */ private Object taskExecutionMonitor = new Object(); /* * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor */ private Object taskDispatchMonitor = new Object(); /* * When a thread-specific TaskList becomes full it is added into this list for execution. * The executor thread takes TaskLists from this list. * The sync operation can also execute remaining tasks. */ private ArrayList dispatchedTaskLists; /* * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues */ private final Map activeTaskLists = new HashMap<>(); /* * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore. */ private final Semaphore taskDispatchNotifications = new Semaphore(0); Scheduler(QueryProcessor processor) { this.processor = processor; } /* * This gets called by multiple query threads */ void accept(Runnable task) { TaskList l = get(); l.add(task); if(l.shouldSchedule()) { remove(); sendToExecution(); } } /* * A task list has reached full size and is moved into execution by the execution thread */ void sendToExecution() { synchronized(activeTaskLists) { // This is dispatched so it should not anymore be in list of active TaskList objects. TaskList l = activeTaskLists.remove(Thread.currentThread()); synchronized(taskDispatchMonitor) { if(dispatchedTaskLists == null) dispatchedTaskLists = new ArrayList(); dispatchedTaskLists.add(l); } if(currentTaskListSize < MAX_TASKLIST_SIZE) currentTaskListSize = currentTaskListSize<<1; } // Release the execution thread taskDispatchNotifications.release(); } /* * Gets dispatched TaskList objects for execution. * Returns null when there's nothing to execute (and thread shall go to sleep). */ ArrayList getDispatchedQueues() { synchronized(taskDispatchMonitor) { if(dispatchedTaskLists == null) return null; ArrayList result = dispatchedTaskLists; dispatchedTaskLists = null; return result; } } void start() { Thread thread = new Thread() { public void run() { // We execute tasks as long as the query processor is alive while(!processor.isDisposed()) { try { // This is a mutex between this place and sync() synchronized(taskExecutionMonitor) { // Execute busily while there is something to execute while(true) { ArrayList qs = getDispatchedQueues(); if(qs == null) { // No tasks - go to sleep break; } // Execute everything we got for(TaskList queue : qs) { for(Runnable r : queue) { r.run(); } } } } // sendToExecution shall release this taskDispatchNotifications.acquire(); } catch (InterruptedException e) { QueryListening.LOGGER.error("Unexpected interrupt", e); } } }; }; thread.setName("QueryListening"); thread.start(); } /* * ThreadLocal calls this to create a TaskList. * Called with first list per thread and also when accepting next task after sendToExecution. */ @Override protected TaskList initialValue() { TaskList result = new TaskList(currentTaskListSize); synchronized(activeTaskLists) { activeTaskLists.put(Thread.currentThread(), result); } return result; } /* * This makes sure that all tasks have been executed upon return. * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and * also executes anything in active task lists. It resets the dispatch sizes of current and * subsequently created task lists to 1. */ void sync() { // This is a mutex between this place and execution thread synchronized(taskExecutionMonitor) { currentTaskListSize = 1; // Perform all dispatched work ArrayList qs = getDispatchedQueues(); if(qs != null) { for(TaskList queue : qs) { for(Runnable r : queue) { r.run(); } } } // Also perform any tasks in active queues. // This assumes that all query threads that send new tasks are now parked and shall not interfere. synchronized(activeTaskLists) { for(TaskList queue : activeTaskLists.values()) { // Run all tasks for(Runnable r : queue) { if(r != null) r.run(); } // Remove the tasks from list and reset the max size to 1. queue.reset(); } } } } }