From: Marko Luukkainen Date: Tue, 24 Nov 2020 16:47:09 +0000 (+0200) Subject: Temporary solution to concurrent modification exception. X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=e296c0b7f165d0a0a11900124eaba36b04b167c0;p=simantics%2Fplatform.git Temporary solution to concurrent modification exception. gitlab #645 Change-Id: Ia67129d5dad5ad7fe571157bbf9be99513832e72 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index a26460a47..075281035 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -46,27 +46,27 @@ public class QueryListening { final THashMap> listeners = new THashMap>(10, 0.75f); - private static class ThreadQueue extends ThreadLocal> { + private static class ThreadQueue extends ThreadLocal> { - private final Map> allQueues = new HashMap<>(); + private final Map> allQueues = new HashMap<>(); - private ArrayList> dispatchedQueues = new ArrayList<>(); + private ArrayList> dispatchedQueues = new ArrayList<>(); @Override - protected synchronized ArrayList initialValue() { - ArrayList result = new ArrayList<>(); + protected synchronized List initialValue() { + List result = Collections.synchronizedList(new ArrayList<>()); allQueues.put(Thread.currentThread(), result); return result; } synchronized void sendToExecution() { - ArrayList rs = allQueues.remove(Thread.currentThread()); + List rs = allQueues.remove(Thread.currentThread()); dispatchedQueues.add(rs); notify(); } - synchronized ArrayList> getDispatchedQueues() { - ArrayList> result = dispatchedQueues; + synchronized ArrayList> getDispatchedQueues() { + ArrayList> result = dispatchedQueues; dispatchedQueues = new ArrayList<>(); return result; } @@ -99,7 +99,7 @@ public class QueryListening { Consumer newConsumer() { return (task) -> { - ArrayList l = queues.get(); + List l = queues.get(); l.add(task); if(l.size() == BUFFER_SIZE) { queues.remove(); @@ -118,17 +118,15 @@ public class QueryListening { synchronized(queues) { while(!isTerminated()) { try { - ArrayList> qs = queues.getDispatchedQueues(); - for(ArrayList queue : qs) { - for(Runnable r : queue) - r.run(); + ArrayList> qs = queues.getDispatchedQueues(); + for(List queue : qs) { + queue.forEach(r -> r.run()); } if(flush != null) { - for(ArrayList queue : queues.allQueues.values()) { - for(Runnable r : queue) { - r.run(); - } - queue.clear(); + for(List queue : queues.allQueues.values()) { + // FIXME : using synchronized list is not enough, the list may change between forEach and clear! + queue.forEach(r -> r.run()); + queue.clear(); } Semaphore s = flush; flush = null;