]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
Fix livelock situation in QueryProcessor
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
index e524a080a99a2940bbc5da9a5564202d115bf931..9daab950e3964a8681a12fabc5ef0b2cfaf74ceb 100644 (file)
@@ -14,8 +14,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
 import org.simantics.databoard.Bindings;
@@ -41,7 +41,7 @@ public class QueryListening {
        private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
        private boolean                                         firingListeners       = false;
        final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
-       private BlockingQueue<Runnable>                         tasks = new LinkedBlockingQueue<Runnable>();
+       private BlockingQueue<Runnable>                         tasks = new ArrayBlockingQueue<Runnable>(2048);
        private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
        
        QueryListening(QueryProcessor processor) {
@@ -51,10 +51,10 @@ public class QueryListening {
        
        public void sync() {
                Semaphore s = new Semaphore(0);
-               tasks.add(() -> {
-                       s.release();
-               });
                try {
+                       tasks.put(() -> {
+                               s.release();
+                       });
                        s.acquire();
                } catch (Throwable t) {
                        LOGGER.error("Error while waiting for query dependency management", t);
@@ -93,26 +93,30 @@ public class QueryListening {
 
        void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
                
-               tasks.offer(() -> {
+               try {
+                       tasks.put(() -> {
 
-                       if (parent != null && !inferred) {
-                               try {
-                                       if(!child.isImmutable(graph))
-                                               child.addParent(parent);
-                               } catch (DatabaseException e) {
-                                       LOGGER.error("Error while registering query dependencies", e);
-                               }
-                               if (Development.DEVELOPMENT) {
-                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                                               System.out.println(child + " -> " + parent);
+                               if (parent != null && !inferred) {
+                                       try {
+                                               if(!child.isImmutable(graph))
+                                                       child.addParent(parent);
+                                       } catch (DatabaseException e) {
+                                               LOGGER.error("Error while registering query dependencies", e);
+                                       }
+                                       if (Development.DEVELOPMENT) {
+                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                                                       System.out.println(child + " -> " + parent);
+                                               }
                                        }
                                }
-                       }
 
-                       if (listener != null)
-                               registerListener(child, listener, procedure);
+                               if (listener != null)
+                                       registerListener(child, listener, procedure);
 
-               });
+                       });
+               } catch (InterruptedException e) {
+                       LOGGER.error("Error while registering dependencies", e);
+               }
                
        }