]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Temporary solution to concurrent modification exception. 19/4619/1
authorMarko Luukkainen <marko.luukkainen@semantum.fi>
Tue, 24 Nov 2020 16:47:09 +0000 (18:47 +0200)
committerMarko Luukkainen <marko.luukkainen@semantum.fi>
Tue, 24 Nov 2020 16:47:09 +0000 (18:47 +0200)
gitlab #645

Change-Id: Ia67129d5dad5ad7fe571157bbf9be99513832e72

bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java

index a26460a47d733538aa0e3ec287541be76eee6c4e..075281035a7802842a2b6ab285700b2a22f29d6b 100644 (file)
@@ -46,27 +46,27 @@ public class QueryListening {
 
     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
 
-    private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
+    private static class ThreadQueue extends ThreadLocal<List<Runnable>> {
 
-        private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
+        private final Map<Thread,List<Runnable>> allQueues = new HashMap<>();
         
-        private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
+        private ArrayList<List<Runnable>> dispatchedQueues = new ArrayList<>();
 
         @Override
-        protected synchronized ArrayList<Runnable> initialValue() {
-            ArrayList<Runnable> result = new ArrayList<>();
+        protected synchronized List<Runnable> initialValue() {
+            List<Runnable> result = Collections.synchronizedList(new ArrayList<>());
             allQueues.put(Thread.currentThread(), result);
             return result;
         }
 
         synchronized void sendToExecution() {
-            ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
+            List<Runnable> rs = allQueues.remove(Thread.currentThread());
             dispatchedQueues.add(rs);
             notify();
         }
 
-        synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
-            ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
+        synchronized ArrayList<List<Runnable>> getDispatchedQueues() {
+            ArrayList<List<Runnable>> result = dispatchedQueues;
             dispatchedQueues = new ArrayList<>();
             return result;
         }
@@ -99,7 +99,7 @@ public class QueryListening {
         Consumer<Runnable> newConsumer() {
             return (task) -> {
 
-                ArrayList<Runnable> l = queues.get();
+                List<Runnable> l = queues.get();
                 l.add(task);
                 if(l.size() == BUFFER_SIZE) {
                     queues.remove();
@@ -118,17 +118,15 @@ public class QueryListening {
                     synchronized(queues) {
                         while(!isTerminated()) {
                             try {
-                                ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
-                                for(ArrayList<Runnable> queue : qs) {
-                                    for(Runnable r : queue)
-                                        r.run();
+                                ArrayList<List<Runnable>> qs = queues.getDispatchedQueues();
+                                for(List<Runnable> queue : qs) {
+                                       queue.forEach(r -> r.run());
                                 }
                                 if(flush != null) {
-                                    for(ArrayList<Runnable> queue : queues.allQueues.values()) {
-                                        for(Runnable r : queue) {
-                                            r.run();   
-                                        }
-                                        queue.clear();
+                                    for(List<Runnable> queue : queues.allQueues.values()) {
+                                       // FIXME : using synchronized list is not enough, the list may change between forEach and clear!
+                                       queue.forEach(r -> r.run());
+                                       queue.clear();
                                     }
                                     Semaphore s = flush;
                                     flush = null;