final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
+ private static class ThreadQueue extends ThreadLocal<List<Runnable>> {
- private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
+ private final Map<Thread,List<Runnable>> allQueues = new HashMap<>();
- private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
+ private ArrayList<List<Runnable>> dispatchedQueues = new ArrayList<>();
@Override
- protected synchronized ArrayList<Runnable> initialValue() {
- ArrayList<Runnable> result = new ArrayList<>();
+ protected synchronized List<Runnable> initialValue() {
+ List<Runnable> result = Collections.synchronizedList(new ArrayList<>());
allQueues.put(Thread.currentThread(), result);
return result;
}
synchronized void sendToExecution() {
- ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
+ List<Runnable> rs = allQueues.remove(Thread.currentThread());
dispatchedQueues.add(rs);
notify();
}
- synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
- ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
+ synchronized ArrayList<List<Runnable>> getDispatchedQueues() {
+ ArrayList<List<Runnable>> result = dispatchedQueues;
dispatchedQueues = new ArrayList<>();
return result;
}
Consumer<Runnable> newConsumer() {
return (task) -> {
- ArrayList<Runnable> l = queues.get();
+ List<Runnable> l = queues.get();
l.add(task);
if(l.size() == BUFFER_SIZE) {
queues.remove();
synchronized(queues) {
while(!isTerminated()) {
try {
- ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
- for(ArrayList<Runnable> queue : qs) {
- for(Runnable r : queue)
- r.run();
+ ArrayList<List<Runnable>> qs = queues.getDispatchedQueues();
+ for(List<Runnable> queue : qs) {
+ queue.forEach(r -> r.run());
}
if(flush != null) {
- for(ArrayList<Runnable> queue : queues.allQueues.values()) {
- for(Runnable r : queue) {
- r.run();
- }
- queue.clear();
+ for(List<Runnable> queue : queues.allQueues.values()) {
+ // FIXME : using synchronized list is not enough, the list may change between forEach and clear!
+ queue.forEach(r -> r.run());
+ queue.clear();
}
Semaphore s = flush;
flush = null;