X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FScheduler.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FScheduler.java;h=2c9c475748194501b715106918b25dd4ae21a0c7;hp=0000000000000000000000000000000000000000;hb=c4d9561b1b35a0e8e594158fbb01a9c632997808;hpb=066c758d34f60a9da4e7b2387689475de1a9cf84 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java new file mode 100644 index 000000000..2c9c47574 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java @@ -0,0 +1,203 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +class Scheduler extends ThreadLocal { + + private final QueryProcessor processor; + + private static final int MAX_TASKLIST_SIZE = 1<<8; + + /* + * New TaskList objects shall be assigned this size. + * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks. + * The intention is to make sure that upon frequent syncs the tasks shall be scheduled + */ + private int currentTaskListSize = 1; + + /* + * Anyone wishing to execute any tasks needs to hold this monitor + */ + private Object taskExecutionMonitor = new Object(); + + /* + * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor + */ + private Object taskDispatchMonitor = new Object(); + + /* + * When a thread-specific TaskList becomes full it is added into this list for execution. + * The executor thread takes TaskLists from this list. + * The sync operation can also execute remaining tasks. + */ + private ArrayList dispatchedTaskLists; + + /* + * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues + */ + private final Map activeTaskLists = new HashMap<>(); + + /* + * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore. + */ + private final Semaphore taskDispatchNotifications = new Semaphore(0); + + Scheduler(QueryProcessor processor) { + this.processor = processor; + } + + /* + * This gets called by multiple query threads + */ + void accept(Runnable task) { + TaskList l = get(); + l.add(task); + if(l.shouldSchedule()) { + remove(); + sendToExecution(); + } + } + + /* + * A task list has reached full size and is moved into execution by the execution thread + */ + void sendToExecution() { + synchronized(activeTaskLists) { + // This is dispatched so it should not anymore be in list of active TaskList objects. + TaskList l = activeTaskLists.remove(Thread.currentThread()); + synchronized(taskDispatchMonitor) { + if(dispatchedTaskLists == null) + dispatchedTaskLists = new ArrayList(); + dispatchedTaskLists.add(l); + } + if(currentTaskListSize < MAX_TASKLIST_SIZE) + currentTaskListSize = currentTaskListSize<<1; + } + // Release the execution thread + taskDispatchNotifications.release(); + } + + /* + * Gets dispatched TaskList objects for execution. + * Returns null when there's nothing to execute (and thread shall go to sleep). + */ + ArrayList getDispatchedQueues() { + synchronized(taskDispatchMonitor) { + if(dispatchedTaskLists == null) + return null; + ArrayList result = dispatchedTaskLists; + dispatchedTaskLists = null; + return result; + } + } + + void start() { + + Thread thread = new Thread() { + + public void run() { + + // We execute tasks as long as the query processor is alive + while(!processor.isDisposed()) { + + try { + + // This is a mutex between this place and sync() + synchronized(taskExecutionMonitor) { + + // Execute busily while there is something to execute + while(true) { + + ArrayList qs = getDispatchedQueues(); + if(qs == null) { + // No tasks - go to sleep + break; + } + + // Execute everything we got + for(TaskList queue : qs) { + for(Runnable r : queue) { + r.run(); + } + } + + } + + } + + // sendToExecution shall release this + taskDispatchNotifications.acquire(); + + } catch (InterruptedException e) { + QueryListening.LOGGER.error("Unexpected interrupt", e); + } + + } + + }; + + }; + + thread.setName("QueryListening"); + thread.start(); + + } + + /* + * ThreadLocal calls this to create a TaskList. + * Called with first list per thread and also when accepting next task after sendToExecution. + */ + @Override + protected TaskList initialValue() { + TaskList result = new TaskList(currentTaskListSize); + synchronized(activeTaskLists) { + activeTaskLists.put(Thread.currentThread(), result); + } + return result; + } + + /* + * This makes sure that all tasks have been executed upon return. + * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and + * also executes anything in active task lists. It resets the dispatch sizes of current and + * subsequently created task lists to 1. + */ + void sync() { + + // This is a mutex between this place and execution thread + synchronized(taskExecutionMonitor) { + + currentTaskListSize = 1; + + // Perform all dispatched work + ArrayList qs = getDispatchedQueues(); + if(qs != null) { + for(TaskList queue : qs) { + for(Runnable r : queue) { + r.run(); + } + } + } + + // Also perform any tasks in active queues. + // This assumes that all query threads that send new tasks are now parked and shall not interfere. + synchronized(activeTaskLists) { + for(TaskList queue : activeTaskLists.values()) { + // Run all tasks + for(Runnable r : queue) { + if(r != null) + r.run(); + } + // Remove the tasks from list and reset the max size to 1. + queue.reset(); + } + } + + } + + } + +} \ No newline at end of file