From: Marko Luukkainen Date: Thu, 26 Nov 2020 10:31:52 +0000 (+0000) Subject: Merge "Temporary solution to concurrent modification exception." into release/1.43.1 X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=f154f15aa05eec3b92e477aedd46b9cba09f6c92;hp=75eeab9e5966da1373b5f5ca151f76662796bf11;p=simantics%2Fplatform.git Merge "Temporary solution to concurrent modification exception." into release/1.43.1 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index a26460a47..075281035 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -46,27 +46,27 @@ public class QueryListening { final THashMap> listeners = new THashMap>(10, 0.75f); - private static class ThreadQueue extends ThreadLocal> { + private static class ThreadQueue extends ThreadLocal> { - private final Map> allQueues = new HashMap<>(); + private final Map> allQueues = new HashMap<>(); - private ArrayList> dispatchedQueues = new ArrayList<>(); + private ArrayList> dispatchedQueues = new ArrayList<>(); @Override - protected synchronized ArrayList initialValue() { - ArrayList result = new ArrayList<>(); + protected synchronized List initialValue() { + List result = Collections.synchronizedList(new ArrayList<>()); allQueues.put(Thread.currentThread(), result); return result; } synchronized void sendToExecution() { - ArrayList rs = allQueues.remove(Thread.currentThread()); + List rs = allQueues.remove(Thread.currentThread()); dispatchedQueues.add(rs); notify(); } - synchronized ArrayList> getDispatchedQueues() { - ArrayList> result = dispatchedQueues; + synchronized ArrayList> getDispatchedQueues() { + ArrayList> result = dispatchedQueues; dispatchedQueues = new ArrayList<>(); return result; } @@ -99,7 +99,7 @@ public class QueryListening { Consumer newConsumer() { return (task) -> { - ArrayList l = queues.get(); + List l = queues.get(); l.add(task); if(l.size() == BUFFER_SIZE) { queues.remove(); @@ -118,17 +118,15 @@ public class QueryListening { synchronized(queues) { while(!isTerminated()) { try { - ArrayList> qs = queues.getDispatchedQueues(); - for(ArrayList queue : qs) { - for(Runnable r : queue) - r.run(); + ArrayList> qs = queues.getDispatchedQueues(); + for(List queue : qs) { + queue.forEach(r -> r.run()); } if(flush != null) { - for(ArrayList queue : queues.allQueues.values()) { - for(Runnable r : queue) { - r.run(); - } - queue.clear(); + for(List queue : queues.allQueues.values()) { + // FIXME : using synchronized list is not enough, the list may change between forEach and clear! + queue.forEach(r -> r.run()); + queue.clear(); } Semaphore s = flush; flush = null;