]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java
QueryListening sync is slow
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / Scheduler.java
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java
new file mode 100644 (file)
index 0000000..2c9c475
--- /dev/null
@@ -0,0 +1,203 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+class Scheduler extends ThreadLocal<TaskList> {
+
+    private final QueryProcessor processor;
+
+    private static final int MAX_TASKLIST_SIZE = 1<<8;
+
+    /*
+     * New TaskList objects shall be assigned this size.
+     * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
+     * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
+     */
+    private int currentTaskListSize = 1;
+
+    /*
+     * Anyone wishing to execute any tasks needs to hold this monitor
+     */
+    private Object taskExecutionMonitor = new Object();
+
+    /*
+     * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
+     */
+    private Object taskDispatchMonitor = new Object();
+
+    /*
+     * When a thread-specific TaskList becomes full it is added into this list for execution.
+     * The executor thread takes TaskLists from this list.
+     * The sync operation can also execute remaining tasks.
+     */
+    private ArrayList<TaskList> dispatchedTaskLists;
+
+    /*
+     * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
+     */
+    private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
+
+    /*
+     * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
+     */
+    private final Semaphore taskDispatchNotifications = new Semaphore(0);
+
+    Scheduler(QueryProcessor processor) {
+        this.processor = processor;
+    }
+
+    /*
+     * This gets called by multiple query threads
+     */
+    void accept(Runnable task) {
+        TaskList l = get();
+        l.add(task);
+        if(l.shouldSchedule()) {
+            remove();
+            sendToExecution();
+        }
+    }
+
+    /*
+     * A task list has reached full size and is moved into execution by the execution thread
+     */
+    void sendToExecution() {
+        synchronized(activeTaskLists) {
+            // This is dispatched so it should not anymore be in list of active TaskList objects.
+            TaskList l = activeTaskLists.remove(Thread.currentThread());
+            synchronized(taskDispatchMonitor) {
+                if(dispatchedTaskLists == null)
+                    dispatchedTaskLists = new ArrayList<TaskList>();
+                dispatchedTaskLists.add(l);
+            }
+            if(currentTaskListSize < MAX_TASKLIST_SIZE)
+                currentTaskListSize = currentTaskListSize<<1;
+        }
+        // Release the execution thread
+        taskDispatchNotifications.release();
+    }
+
+    /*
+     * Gets dispatched TaskList objects for execution.
+     * Returns null when there's nothing to execute (and thread shall go to sleep).
+     */
+    ArrayList<TaskList> getDispatchedQueues() {
+        synchronized(taskDispatchMonitor) {
+            if(dispatchedTaskLists == null)
+                return null;
+            ArrayList<TaskList> result = dispatchedTaskLists;
+            dispatchedTaskLists = null;
+            return result;
+        }
+    }
+
+    void start() {
+
+        Thread thread = new Thread() {
+
+            public void run() {
+
+                // We execute tasks as long as the query processor is alive
+                while(!processor.isDisposed()) {
+
+                    try {
+
+                        // This is a mutex between this place and sync()
+                        synchronized(taskExecutionMonitor) {
+
+                            // Execute busily while there is something to execute
+                            while(true) {
+
+                                ArrayList<TaskList> qs = getDispatchedQueues();
+                                if(qs == null) {
+                                    // No tasks - go to sleep
+                                    break;
+                                }
+
+                                // Execute everything we got
+                                for(TaskList queue : qs) {
+                                    for(Runnable r : queue) {
+                                        r.run();
+                                    }
+                                }
+
+                            }
+
+                        }
+
+                        // sendToExecution shall release this 
+                        taskDispatchNotifications.acquire();
+
+                    } catch (InterruptedException e) {
+                        QueryListening.LOGGER.error("Unexpected interrupt", e);
+                    }
+
+                }
+
+            };
+
+        };
+
+        thread.setName("QueryListening");
+        thread.start();
+
+    }
+
+    /*
+     * ThreadLocal calls this to create a TaskList.
+     * Called with first list per thread and also when accepting next task after sendToExecution. 
+     */
+    @Override
+    protected TaskList initialValue() {
+        TaskList result = new TaskList(currentTaskListSize);
+        synchronized(activeTaskLists) {
+            activeTaskLists.put(Thread.currentThread(), result);
+        }
+        return result;
+    }
+
+    /*
+     * This makes sure that all tasks have been executed upon return.
+     * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
+     * also executes anything in active task lists. It resets the dispatch sizes of current and
+     * subsequently created task lists to 1.
+     */
+    void sync() {
+
+        // This is a mutex between this place and execution thread
+        synchronized(taskExecutionMonitor) {
+
+            currentTaskListSize = 1;
+
+            // Perform all dispatched work
+            ArrayList<TaskList> qs = getDispatchedQueues();
+            if(qs != null) {
+                for(TaskList queue : qs) {
+                    for(Runnable r : queue) {
+                        r.run();
+                    }
+                }
+            }
+
+            // Also perform any tasks in active queues.
+            // This assumes that all query threads that send new tasks are now parked and shall not interfere.
+            synchronized(activeTaskLists) {
+                for(TaskList queue : activeTaskLists.values()) {
+                    // Run all tasks
+                    for(Runnable r : queue) {
+                        if(r != null)
+                            r.run();
+                    }
+                    // Remove the tasks from list and reset the max size to 1.
+                    queue.reset();
+                }
+            }
+
+        }
+
+    }
+
+}
\ No newline at end of file