- private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
-
- private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
-
- private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
-
- @Override
- protected synchronized ArrayList<Runnable> initialValue() {
- ArrayList<Runnable> result = new ArrayList<>();
- allQueues.put(Thread.currentThread(), result);
- return result;
- }
-
- synchronized void sendToExecution() {
- ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
- dispatchedQueues.add(rs);
- notify();
- }
-
- synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
- ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
- dispatchedQueues = new ArrayList<>();
- return result;
- }
-
- }
-
- private static class Scheduler {
-
- private static final int BUFFER_SIZE = 100;
-
- private final QueryProcessor processor;
-
- private final ThreadQueue queues = new ThreadQueue();
-
- /*
- * Access to this field is synchronized using the monitor of 'queues'
- * The method flush() shall be called by the single writing thread and the field has been nulled before the
- * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
- */
- private Semaphore flush = null;
-
- Scheduler(QueryProcessor processor) {
- this.processor = processor;
- }
-
- private boolean isTerminated() {
- return processor.isDisposed();
- }
-
- Consumer<Runnable> newConsumer() {
- return (task) -> {
-
- ArrayList<Runnable> l = queues.get();
- l.add(task);
- if(l.size() == BUFFER_SIZE) {
- queues.remove();
- queues.sendToExecution();
- }
-
- };
- }
-
- void start() {
-
- Thread thread = new Thread() {
-
- public void run() {
-
- synchronized(queues) {
- while(!isTerminated()) {
- try {
- ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
- for(ArrayList<Runnable> queue : qs) {
- for(Runnable r : queue)
- r.run();
- }
- if(flush != null) {
- for(ArrayList<Runnable> queue : queues.allQueues.values()) {
- for(Runnable r : queue) {
- r.run();
- }
- queue.clear();
- }
- Semaphore s = flush;
- flush = null;
- s.release();
- }
- queues.wait(1000);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
- }
-
- };
-
- };
-
- thread.setName("QueryListening");
- thread.start();
-
- }
-
- private Semaphore createFlush() {
- synchronized(queues) {
- flush = new Semaphore(0);
- queues.notify();
- return flush;
- }
- }
-
- void flush() {
- try {
- createFlush().acquire();
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
-
- }
-
- private static class RegisterParentRunnable implements Runnable {
-
- private final CacheEntry parent;
- private final CacheEntry child;
-
- public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
- this.parent = parent;
- this.child = child;
- }
-
- @Override
- public void run() {
- child.addParent(parent);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- }
-
- private static class RegisterListenerRunnable implements Runnable {
-
- private final QueryListening queryListening;
- private final ListenerBase base;
- private final Object procedure;
- private final CacheEntry parent;
- private final CacheEntry entry;
-
- public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
- this.queryListening = queryListening;
- this.base = base;
- this.procedure = procedure;
- this.parent = parent;
- this.entry = entry;
- }
-
- @Override
- public void run() {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
- if (list == null) {
- list = new ArrayList<>(1);
- queryListening.listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- // Equals is here based on base
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed())
- return;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- queryListening.addedEntries.put(base, result);
-
- }
-
-
- }