QueryListening sync is slow 36/4036/5
authorAntti Villberg <antti.villberg@semantum.fi>
Sat, 21 Mar 2020 18:56:50 +0000 (20:56 +0200)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Mon, 6 Apr 2020 10:14:54 +0000 (10:14 +0000)
gitlab #503

Change-Id: Ied3aaf386094c3d52b3840d000fb9de01c9ebd03

bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java [new file with mode: 0644]
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java [new file with mode: 0644]

index e3bd437416af310936e66533746b270d8228aba8..91200f1fc3ad037393bc5e3794d6acc4c29a4e7b 100644 (file)
@@ -14,15 +14,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
 
 import org.simantics.databoard.Bindings;
 import org.simantics.db.DevelopmentKeys;
 import org.simantics.db.debug.ListenerReport;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.procedure.ListenerBase;
 import org.simantics.utils.Development;
 import org.simantics.utils.datastructures.Pair;
@@ -34,11 +31,10 @@ import gnu.trove.set.hash.THashSet;
 
 public class QueryListening {
 
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+    static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
 
     private final QueryProcessor                                    processor;
     private final Scheduler                                         scheduler;
-    private final Consumer<Runnable>                                consumer;
     private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
 
     private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
@@ -46,210 +42,11 @@ public class QueryListening {
 
     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
 
-    private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
-
-        private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
-        
-        private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
-
-        @Override
-        protected synchronized ArrayList<Runnable> initialValue() {
-            ArrayList<Runnable> result = new ArrayList<>();
-            allQueues.put(Thread.currentThread(), result);
-            return result;
-        }
-
-        synchronized void sendToExecution() {
-            ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
-            dispatchedQueues.add(rs);
-            notify();
-        }
-
-        synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
-            ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
-            dispatchedQueues = new ArrayList<>();
-            return result;
-        }
-
-    }
-
-    private static class Scheduler {
-
-        private static final int BUFFER_SIZE = 100;
-
-        private final QueryProcessor processor;
-        
-        private final ThreadQueue queues = new ThreadQueue();
-
-        /*
-         * Access to this field is synchronized using the monitor of 'queues'
-         * The method flush() shall be called by the single writing thread and the field has been nulled before the
-         * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
-         */
-        private Semaphore flush = null;
-
-        Scheduler(QueryProcessor processor) {
-            this.processor = processor;
-        }
-        
-        private boolean isTerminated() {
-            return processor.isDisposed();
-        }
-
-        Consumer<Runnable> newConsumer() {
-            return (task) -> {
-
-                ArrayList<Runnable> l = queues.get();
-                l.add(task);
-                if(l.size() == BUFFER_SIZE) {
-                    queues.remove();
-                    queues.sendToExecution();
-                }
-                
-            };
-        }
-
-        void start() {
-            
-            Thread thread = new Thread() {
-                
-                public void run() {
-
-                    synchronized(queues) {
-                        while(!isTerminated()) {
-                            try {
-                                ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
-                                for(ArrayList<Runnable> queue : qs) {
-                                    for(Runnable r : queue)
-                                        r.run();
-                                }
-                                if(flush != null) {
-                                    for(ArrayList<Runnable> queue : queues.allQueues.values()) {
-                                        for(Runnable r : queue) {
-                                            r.run();   
-                                        }
-                                        queue.clear();
-                                    }
-                                    Semaphore s = flush;
-                                    flush = null;
-                                    s.release();
-                                }
-                                queues.wait(1000);
-                            } catch (InterruptedException e) {
-                                LOGGER.error("Unexpected interrupt", e);
-                            }
-                        }
-                    }
-
-                };
-                
-            };
-            
-            thread.setName("QueryListening");
-            thread.start();
-            
-        }
-
-        private Semaphore createFlush() {
-            synchronized(queues) {
-                flush = new Semaphore(0);
-                queues.notify();
-                return flush;
-            }
-        }
-        
-        void flush() {
-            try {
-                createFlush().acquire();
-            } catch (InterruptedException e) {
-                LOGGER.error("Unexpected interrupt", e);
-            }
-        }
-
-    }
-
-    private static class RegisterParentRunnable implements Runnable {
-        
-        private final CacheEntry parent;
-        private final CacheEntry child;
-        
-        public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
-            this.parent = parent;
-            this.child = child;
-        }
-
-        @Override
-        public void run() {
-            child.addParent(parent);
-            if (Development.DEVELOPMENT) {
-                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                    System.out.println(child + " -> " + parent);
-                }
-            }
-        }
-        
-    }
-    
-    private static class RegisterListenerRunnable implements Runnable {
-
-        private final QueryListening queryListening;
-        private final ListenerBase base;
-        private final Object procedure;
-        private final CacheEntry parent;
-        private final CacheEntry entry;
-
-        public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
-            this.queryListening = queryListening;
-            this.base = base;
-            this.procedure = procedure;
-            this.parent = parent;
-            this.entry = entry;
-        }
-
-        @Override
-        public void run() {
-
-            assert (entry != null);
-            assert (procedure != null);
-
-            ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
-            if (list == null) {
-                list = new ArrayList<>(1);
-                queryListening.listeners.put(entry, list);
-            }
-
-            ListenerEntry result = new ListenerEntry(entry, base, procedure);
-            // Equals is here based on base
-            int currentIndex = list.indexOf(result);
-            // There was already a listener
-            if(currentIndex > -1) {
-                ListenerEntry current = list.get(currentIndex);
-                if(!current.base.isDisposed())
-                    return;
-                list.set(currentIndex, result);
-            } else {
-                list.add(result);
-            }
-
-            if (Development.DEVELOPMENT) {
-                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                    new Exception().printStackTrace();
-                    System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
-                }
-            }
-
-            queryListening.addedEntries.put(base, result);
-
-        }
-
-        
-    }
     
     QueryListening(QueryProcessor processor) {
         
         this.processor = processor;
         scheduler = new Scheduler(processor);
-        consumer = scheduler.newConsumer();
         scheduler.start();
         
     }
@@ -260,7 +57,7 @@ public class QueryListening {
     
     void sync() {
         try {
-            scheduler.flush();
+            scheduler.sync();
         } catch (Throwable t) {
             LOGGER.error("Error while waiting for query dependency management", t);
         }
@@ -276,7 +73,7 @@ public class QueryListening {
         if(parent != null) {
             try {
                 if(!child.isImmutable(graph))
-                    consumer.accept(new RegisterParentRunnable(parent, child));
+                    scheduler.accept(new RegisterParentRunnable(parent, child));
             } catch (DatabaseException e) {
                 LOGGER.error("Error while registering query dependencies", e);
             }
@@ -284,7 +81,7 @@ public class QueryListening {
 
         if(listener != null)
             if(!listener.isDisposed())
-                consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+               scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
 
     }
 
@@ -292,10 +89,7 @@ public class QueryListening {
         
         if(base == null) return;
 
-        consumer.accept(() -> {
-            ListenerEntry entry = addedEntries.get(base);
-            if(entry != null) entry.setLastKnown(result);
-        });
+        scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
 
     }
 
@@ -529,5 +323,102 @@ public class QueryListening {
         else 
             return Collections.emptyList();
     }
-    
+
+    private static class RegisterParentRunnable implements Runnable {
+
+        private final CacheEntry parent;
+        private final CacheEntry child;
+
+        public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+            this.parent = parent;
+            this.child = child;
+        }
+
+        @Override
+        public void run() {
+            child.addParent(parent);
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                    System.out.println(child + " -> " + parent);
+                }
+            }
+        }
+
+    }
+
+    private static class RegisterListenerRunnable implements Runnable {
+
+        private final QueryListening queryListening;
+        private final ListenerBase base;
+        private final Object procedure;
+        private final CacheEntry parent;
+        private final CacheEntry entry;
+
+        public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+            this.queryListening = queryListening;
+            this.base = base;
+            this.procedure = procedure;
+            this.parent = parent;
+            this.entry = entry;
+        }
+
+        @Override
+        public void run() {
+
+            assert (entry != null);
+            assert (procedure != null);
+
+            ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+            if (list == null) {
+                list = new ArrayList<>(1);
+                queryListening.listeners.put(entry, list);
+            }
+
+            ListenerEntry result = new ListenerEntry(entry, base, procedure);
+            // Equals is here based on base
+            int currentIndex = list.indexOf(result);
+            // There was already a listener
+            if(currentIndex > -1) {
+                ListenerEntry current = list.get(currentIndex);
+                if(!current.base.isDisposed())
+                    return;
+                list.set(currentIndex, result);
+            } else {
+                list.add(result);
+            }
+
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                    new Exception().printStackTrace();
+                    System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+                }
+            }
+
+            queryListening.addedEntries.put(base, result);
+
+        }
+
+
+    }
+
+    private static class RegisterFirstKnownRunnable implements Runnable {
+
+        private final Map<ListenerBase,ListenerEntry> addedEntries;
+        private final ListenerBase base;
+        private final Object result;
+
+        public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
+            this.addedEntries = addedEntries;
+            this.base = base;
+            this.result = result;
+        }
+
+        @Override
+        public void run() {
+            ListenerEntry entry = addedEntries.get(base);
+            if(entry != null) entry.setLastKnown(result);
+        }
+
+    }
+
 }
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java
new file mode 100644 (file)
index 0000000..2c9c475
--- /dev/null
@@ -0,0 +1,203 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+class Scheduler extends ThreadLocal<TaskList> {
+
+    private final QueryProcessor processor;
+
+    private static final int MAX_TASKLIST_SIZE = 1<<8;
+
+    /*
+     * New TaskList objects shall be assigned this size.
+     * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
+     * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
+     */
+    private int currentTaskListSize = 1;
+
+    /*
+     * Anyone wishing to execute any tasks needs to hold this monitor
+     */
+    private Object taskExecutionMonitor = new Object();
+
+    /*
+     * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
+     */
+    private Object taskDispatchMonitor = new Object();
+
+    /*
+     * When a thread-specific TaskList becomes full it is added into this list for execution.
+     * The executor thread takes TaskLists from this list.
+     * The sync operation can also execute remaining tasks.
+     */
+    private ArrayList<TaskList> dispatchedTaskLists;
+
+    /*
+     * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
+     */
+    private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
+
+    /*
+     * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
+     */
+    private final Semaphore taskDispatchNotifications = new Semaphore(0);
+
+    Scheduler(QueryProcessor processor) {
+        this.processor = processor;
+    }
+
+    /*
+     * This gets called by multiple query threads
+     */
+    void accept(Runnable task) {
+        TaskList l = get();
+        l.add(task);
+        if(l.shouldSchedule()) {
+            remove();
+            sendToExecution();
+        }
+    }
+
+    /*
+     * A task list has reached full size and is moved into execution by the execution thread
+     */
+    void sendToExecution() {
+        synchronized(activeTaskLists) {
+            // This is dispatched so it should not anymore be in list of active TaskList objects.
+            TaskList l = activeTaskLists.remove(Thread.currentThread());
+            synchronized(taskDispatchMonitor) {
+                if(dispatchedTaskLists == null)
+                    dispatchedTaskLists = new ArrayList<TaskList>();
+                dispatchedTaskLists.add(l);
+            }
+            if(currentTaskListSize < MAX_TASKLIST_SIZE)
+                currentTaskListSize = currentTaskListSize<<1;
+        }
+        // Release the execution thread
+        taskDispatchNotifications.release();
+    }
+
+    /*
+     * Gets dispatched TaskList objects for execution.
+     * Returns null when there's nothing to execute (and thread shall go to sleep).
+     */
+    ArrayList<TaskList> getDispatchedQueues() {
+        synchronized(taskDispatchMonitor) {
+            if(dispatchedTaskLists == null)
+                return null;
+            ArrayList<TaskList> result = dispatchedTaskLists;
+            dispatchedTaskLists = null;
+            return result;
+        }
+    }
+
+    void start() {
+
+        Thread thread = new Thread() {
+
+            public void run() {
+
+                // We execute tasks as long as the query processor is alive
+                while(!processor.isDisposed()) {
+
+                    try {
+
+                        // This is a mutex between this place and sync()
+                        synchronized(taskExecutionMonitor) {
+
+                            // Execute busily while there is something to execute
+                            while(true) {
+
+                                ArrayList<TaskList> qs = getDispatchedQueues();
+                                if(qs == null) {
+                                    // No tasks - go to sleep
+                                    break;
+                                }
+
+                                // Execute everything we got
+                                for(TaskList queue : qs) {
+                                    for(Runnable r : queue) {
+                                        r.run();
+                                    }
+                                }
+
+                            }
+
+                        }
+
+                        // sendToExecution shall release this 
+                        taskDispatchNotifications.acquire();
+
+                    } catch (InterruptedException e) {
+                        QueryListening.LOGGER.error("Unexpected interrupt", e);
+                    }
+
+                }
+
+            };
+
+        };
+
+        thread.setName("QueryListening");
+        thread.start();
+
+    }
+
+    /*
+     * ThreadLocal calls this to create a TaskList.
+     * Called with first list per thread and also when accepting next task after sendToExecution. 
+     */
+    @Override
+    protected TaskList initialValue() {
+        TaskList result = new TaskList(currentTaskListSize);
+        synchronized(activeTaskLists) {
+            activeTaskLists.put(Thread.currentThread(), result);
+        }
+        return result;
+    }
+
+    /*
+     * This makes sure that all tasks have been executed upon return.
+     * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
+     * also executes anything in active task lists. It resets the dispatch sizes of current and
+     * subsequently created task lists to 1.
+     */
+    void sync() {
+
+        // This is a mutex between this place and execution thread
+        synchronized(taskExecutionMonitor) {
+
+            currentTaskListSize = 1;
+
+            // Perform all dispatched work
+            ArrayList<TaskList> qs = getDispatchedQueues();
+            if(qs != null) {
+                for(TaskList queue : qs) {
+                    for(Runnable r : queue) {
+                        r.run();
+                    }
+                }
+            }
+
+            // Also perform any tasks in active queues.
+            // This assumes that all query threads that send new tasks are now parked and shall not interfere.
+            synchronized(activeTaskLists) {
+                for(TaskList queue : activeTaskLists.values()) {
+                    // Run all tasks
+                    for(Runnable r : queue) {
+                        if(r != null)
+                            r.run();
+                    }
+                    // Remove the tasks from list and reset the max size to 1.
+                    queue.reset();
+                }
+            }
+
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java
new file mode 100644 (file)
index 0000000..7734bae
--- /dev/null
@@ -0,0 +1,24 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+
+class TaskList extends ArrayList<Runnable> {
+
+       private static final long serialVersionUID = -8047504882681478972L;
+
+       private int maxSize;
+
+       TaskList(int maxSize) {
+               this.maxSize = maxSize;
+       }
+
+       boolean shouldSchedule() {
+               return size() == maxSize;
+       }
+
+       void reset() {
+               clear();
+               maxSize = 1;
+       }
+
+}
\ No newline at end of file