]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Fix livelock situation in QueryProcessor 74/3274/5
authorAntti Villberg <antti.villberg@semantum.fi>
Tue, 24 Sep 2019 08:00:28 +0000 (11:00 +0300)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Wed, 16 Oct 2019 23:05:55 +0000 (02:05 +0300)
gitlab #386

Change-Id: I91bb154e6410b125eb13171f9152996286a6d54d

bundles/org.simantics.browsing.ui.common/src/org/simantics/browsing/ui/common/internal/GECache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/EntityInstances.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java

index b5545df0065264423b0bf9d240679a94063d05aa..014a5c0d54b37c0fbd06810137ca04b83a9f88f9 100644 (file)
@@ -174,13 +174,13 @@ public class GECache implements IGECache {
     private TObjectIntHashMap<NodeContext> references = new TObjectIntHashMap<NodeContext>();
     
     @Override
-    public void incRef(NodeContext context) {
+    synchronized public void incRef(NodeContext context) {
        int exist = references.get(context);
        references.put(context, exist+1);
     }
     
     @Override
-    public void decRef(NodeContext context) {
+    synchronized public void decRef(NodeContext context) {
        int exist = references.get(context);
        references.put(context, exist-1);
        if(exist == 1) {
index cffbba51c75f852e44b215747e94efafa099e296..3e955febc54743e25134184a2d7c3d9d85b9135c 100644 (file)
@@ -5152,7 +5152,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                assert (request != null);
                assert (procedure != null);
        
-               processor.schedule(new SessionTask(this) {
+               processor.scheduleNow(new SessionTask(this) {
 
                        @Override
                        public void run0(int thread) {
@@ -5232,7 +5232,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                assert (request != null);
                assert (procedure != null);
 
-               processor.schedule(new SessionTask(this) {
+               processor.scheduleNow(new SessionTask(this) {
 
                        @Override
                        public void run0(int thread) {
@@ -6312,7 +6312,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
     public boolean performPending() {
         return processor.performPending(this);
     }
-
+    
     public Set<ReadGraphImpl> ancestorSet() {
         HashSet<ReadGraphImpl> result = new HashSet<>();
         ReadGraphImpl g = this;
@@ -6322,5 +6322,23 @@ public class ReadGraphImpl implements AsyncReadGraph {
         }
         return result;
     }
+    
+    public int getLevel() {
+        return getLevelStatic(this);
+    }
+    
+    private static int getLevelStatic(ReadGraphImpl impl) {
+        if(impl == null) return 0;
+        else return 1 + getLevelStatic(impl.parentGraph);
+    }
+    
+    public ReadGraphImpl getTopLevelGraph() {
+        return getTopLevelGraphStatic(this);
+    }
+
+    private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) {
+        if(impl.parentGraph == null) return impl;
+        else return getTopLevelGraphStatic(impl.parentGraph);
+    }
 
 }
index 72582ee605802bfe32fac38d10119f6200602586..e13ecab72cd0819f71c3c337af62ad242db709c2 100644 (file)
@@ -169,7 +169,14 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
             DatabaseException exception;
             
             public AsyncTask(ReadGraphImpl graph) {
+                this(graph, 1);
+            }
+
+            public AsyncTask(ReadGraphImpl graph, int pos) {
                 super(graph);
+                this.position = pos;
+                if(this.position < 1024)
+                    this.position *= 2;
             }
 
             @Override
@@ -213,10 +220,15 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
                         }
                         throw new IllegalStateException("Eternal loop in queries.");
                     }
-                    graph.processor.schedule(new AsyncTask(graph));
+                    graph.processor.scheduleLater(new AsyncTask(graph, position));
                 }
             }
             
+            @Override
+            public boolean maybeReady() {
+               return proc.isDone();
+            }
+            
         }
 
         try {
@@ -230,7 +242,7 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
         if(needsToBlock) task.run(0);
         else if (proc.isDone()) task.run(0);
         else  {
-            graph.processor.schedule(task);
+            graph.processor.scheduleLater(task);
             return null;
         }
 
index 5c18d051089ef6026ce58048947bd82ae3e4f9b2..cc0ca919b584942a4d49a1a4c61b97b741e631f2 100644 (file)
@@ -670,7 +670,7 @@ public class QueryCache extends QueryCacheBase {
         }
         ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-          graph.processor.schedule(new SessionTask(graph) {
+          graph.processor.scheduleNow(new SessionTask(graph) {
             @Override
             public void run0(int thread) {
               try {
@@ -749,7 +749,7 @@ public class QueryCache extends QueryCacheBase {
         }
         AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-          graph.processor.schedule(new SessionTask(graph) {
+          graph.processor.scheduleNow(new SessionTask(graph) {
             @Override
             public void run0(int thread) {
               try {
index e524a080a99a2940bbc5da9a5564202d115bf931..9daab950e3964a8681a12fabc5ef0b2cfaf74ceb 100644 (file)
@@ -14,8 +14,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
 import org.simantics.databoard.Bindings;
@@ -41,7 +41,7 @@ public class QueryListening {
        private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
        private boolean                                         firingListeners       = false;
        final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
-       private BlockingQueue<Runnable>                         tasks = new LinkedBlockingQueue<Runnable>();
+       private BlockingQueue<Runnable>                         tasks = new ArrayBlockingQueue<Runnable>(2048);
        private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
        
        QueryListening(QueryProcessor processor) {
@@ -51,10 +51,10 @@ public class QueryListening {
        
        public void sync() {
                Semaphore s = new Semaphore(0);
-               tasks.add(() -> {
-                       s.release();
-               });
                try {
+                       tasks.put(() -> {
+                               s.release();
+                       });
                        s.acquire();
                } catch (Throwable t) {
                        LOGGER.error("Error while waiting for query dependency management", t);
@@ -93,26 +93,30 @@ public class QueryListening {
 
        void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
                
-               tasks.offer(() -> {
+               try {
+                       tasks.put(() -> {
 
-                       if (parent != null && !inferred) {
-                               try {
-                                       if(!child.isImmutable(graph))
-                                               child.addParent(parent);
-                               } catch (DatabaseException e) {
-                                       LOGGER.error("Error while registering query dependencies", e);
-                               }
-                               if (Development.DEVELOPMENT) {
-                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                                               System.out.println(child + " -> " + parent);
+                               if (parent != null && !inferred) {
+                                       try {
+                                               if(!child.isImmutable(graph))
+                                                       child.addParent(parent);
+                                       } catch (DatabaseException e) {
+                                               LOGGER.error("Error while registering query dependencies", e);
+                                       }
+                                       if (Development.DEVELOPMENT) {
+                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                                                       System.out.println(child + " -> " + parent);
+                                               }
                                        }
                                }
-                       }
 
-                       if (listener != null)
-                               registerListener(child, listener, procedure);
+                               if (listener != null)
+                                       registerListener(child, listener, procedure);
 
-               });
+                       });
+               } catch (InterruptedException e) {
+                       LOGGER.error("Error while registering dependencies", e);
+               }
                
        }
 
index 7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb..ee4b11175e4d2767cee0058d3938d8cd47125997 100644 (file)
@@ -136,11 +136,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public Session                                    session;
        final public ResourceSupport                            resourceSupport;
        
+       final public Semaphore                                  requests = new Semaphore(1);
+       
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
-
-//     public ArrayList<SessionTask>[]                           queues;
        
        public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
 
@@ -151,14 +151,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        public ThreadState[]                                                                    threadStates;
-//     public ReentrantLock[]                                                                  threadLocks;
-//     public Condition[]                                                                          threadConditions;
-
-       //public ArrayList<SessionTask>[]                           ownTasks;
-
-       //public ArrayList<SessionTask>[]                           ownSyncTasks;
-
-       //ArrayList<SessionTask>[]                           delayQueues;
        
        final Object querySupportLock;
        
@@ -167,28 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       public SessionTask getOwnTask(ReadGraphImpl impl) {
-               Set<ReadGraphImpl> ancestors = impl.ancestorSet();
-               synchronized(querySupportLock) {
-                       int index = 0;
-                       while(index < freeScheduling.size()) {
-                               SessionTask task = freeScheduling.get(index);
-                               if(task.hasCommonParent(ancestors)) {
-                                       return freeScheduling.remove(index);
-                               }
-                               index++;
-                       }
-               }
-               return null;
-       }
-
     public SessionTask getSubTask(ReadGraphImpl impl) {
-        Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
         synchronized(querySupportLock) {
             int index = 0;
             while(index < freeScheduling.size()) {
                 SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(onlyThis)) {
+                if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
+                    queueLength.decrementAndGet();
                     return freeScheduling.remove(index);
                 }
                 index++;
@@ -197,68 +174,84 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
         return null;
     }
 
-       public boolean performPending(ReadGraphImpl graph) {
-               SessionTask task = getOwnTask(graph);
+    /*
+     * We are running errands while waiting for requests to complete.
+     * We can only run work that is part of the current root request to avoid any deadlocks
+     */
+    public boolean performPending(ReadGraphImpl graph) {
+        SessionTask task = getSubTask(graph);
                if(task != null) {
                        task.run(QueryProcessor.thread.get());
                        return true;
-               } else {
-                       return false;
                }
+               return false;
        }
 
-//     final public void scheduleOwn(int caller, SessionTask request) {
-//             ownTasks[caller].add(request);
-//     }
-
-       final public void schedule(SessionTask request) {
-           
-               //int performer = request.thread;
-
-//             if(DebugPolicy.SCHEDULE)
-//                     System.out.println("schedule " + request + " " + " -> " + performer);
+    final public void scheduleNow(SessionTask request) {
+        schedule(request, false);
+    }
 
-               //assert(performer >= 0);
+    final public void scheduleLater(SessionTask request) {
+        schedule(request, true);
+    }
 
+    AtomicInteger queueLength = new AtomicInteger(0);
+    
+    final public void schedule(SessionTask request, boolean late) {
+        
+        int queueLengthEstimate = queueLength.get();
+        if(!late && queueLengthEstimate > 80) {
+            request.run(thread.get());
+            return;
+        }
+        
                assert(request != null);
+               
+               synchronized(querySupportLock) {
 
-//             if(caller == performer) {
-//                     request.run(caller);
-//             } else {
-                       
-//                     if(performer == THREADS) {
-                               
-                               synchronized(querySupportLock) {
-
-                                       if(BarrierTracing.BOOKKEEPING) {
-                                               Exception current = new Exception();
-                                               Exception previous = BarrierTracing.tasks.put(request, current);
-                                               if(previous != null) {
-                                                       previous.printStackTrace();
-                                                       current.printStackTrace();
-                                               }
-                                       }
-                                   
-                                       freeScheduling.add(request);
-                                       
-                                       querySupportLock.notifyAll();
+                   if(BarrierTracing.BOOKKEEPING) {
+                       Exception current = new Exception();
+                       Exception previous = BarrierTracing.tasks.put(request, current);
+                       if(previous != null) {
+                           previous.printStackTrace();
+                           current.printStackTrace();
+                       }
+                   }
 
-                               }
+                   if(late) {
+                       int pos = request.position - 1;
+                       if(pos < freeScheduling.size()) { 
+                           freeScheduling.add(pos, request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                       } else {
+                           freeScheduling.addLast(request);
+                    queueLength.incrementAndGet();
+                    requests.release();
+                       }
+                   }
+                   else {
+                       if(request.getLevel() < 4) {
+                           if(freeScheduling.size() < 100) {
+                               freeScheduling.addFirst(request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                           } else {
+                               request.run(thread.get());
+                           }
+                       } else {
+                           if(freeScheduling.size() < 20) {
+                               freeScheduling.addFirst(request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                           } else {
+                               request.run(thread.get());
+                           }
+                       }
+                   }
 
-                               return;
-                               
-//                     }
-//                     
-//                     ReentrantLock queueLock = threadLocks[performer];
-//                     queueLock.lock();
-//                     queues[performer].add(request);
-//                     // This thread could have been sleeping
-//                     if(queues[performer].size() == 1) {
-//                             //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-//                             threadConditions[performer].signalAll();
-//                     }
-//                     queueLock.unlock();
-//             }
+
+               }
 
        }
 
@@ -271,8 +264,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public static abstract class SessionTask {
 
                public final ReadGraphImpl graph;
-               private Set<ReadGraphImpl> ancestors;
                private int counter = 0;
+               protected int position = 1;
                private Exception trace;
 
                public SessionTask(ReadGraphImpl graph) {
@@ -280,13 +273,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        if(graph != null) graph.asyncBarrier.inc();
                }
 
-               public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
-                       if(graph == null) return false;
-                       if(ancestors == null) ancestors = graph.ancestorSet();
-                       return !Collections.disjoint(ancestors, otherAncestors);
-               }
+        public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
+            if(r1 == null || r2 == null) return false;
+            return r1.getTopLevelGraph() == r2.getTopLevelGraph();
+        } 
 
-               public abstract void run0(int thread);
+        public abstract void run0(int thread);
 
                public final void run(int thread) {
                    if(counter++ > 0) {
@@ -302,10 +294,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                    run0(thread);
                    if(graph != null) graph.asyncBarrier.dec();
                }
+               
+               public boolean maybeReady() {
+                       return true;
+               }
 
                @Override
                public String toString() {
-                       return "SessionTask[" + graph.parent + "]";
+                       if(graph == null)
+                               return "SessionTask[no graph]";
+                       else
+                               return "SessionTask[" + graph.parent + "]";
+               }
+               
+               public int getLevel() {
+                   if(graph == null) return 0;
+                   else return graph.getLevel();
                }
 
        }
@@ -1793,6 +1797,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        @Override
        protected void doDispose() {
 
+               requests.release(Integer.MAX_VALUE / 2);
+               
                for(int index = 0; index < THREADS; index++) { 
                        executors[index].dispose();
                }
index ab10efa0baab252a1ff5ef8b8485c313b2b32db1..5510944dc319265e7e033e38a5462e62b280ee16 100644 (file)
@@ -22,6 +22,7 @@ class QueryThread extends Thread implements SessionThread {
        private Session session;
        private QuerySupport querySupport;
        private final QueryProcessor processor;
+       private final Semaphore requests;
 
        final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
 //     final private ArrayList<SessionTask> own;
@@ -54,6 +55,7 @@ class QueryThread extends Thread implements SessionThread {
                sleepers = processor.sleepers;
                querySupport = processor.querySupport;
                threadStates = processor.threadStates;
+               requests = processor.requests;
 //             delayQueues = processor.delayQueues;
 //             executors = processor.executors;
 //             threadLocks = processor.threadLocks;
@@ -98,6 +100,7 @@ class QueryThread extends Thread implements SessionThread {
        private boolean pumpTask() {
                if(!processor.freeScheduling.isEmpty()) {
                        tasks.add(processor.freeScheduling.removeFirst());
+                       processor.queueLength.decrementAndGet();
                        return true;
                }
                return false;
@@ -155,18 +158,7 @@ class QueryThread extends Thread implements SessionThread {
                                
                                threadStates[index] = ThreadState.SLEEP;
                                
-                               synchronized (querySupportLock) {
-                                       querySupportLock.wait(100);
-                                       
-                               }
-                               
-//                             boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
-//                             if(!woken) {
-//                                     synchronized (querySupportLock) {
-//                                             if(!processor.freeScheduling.isEmpty())
-//                                                     System.err.println("some tasks are available!");
-//                                     }
-//                             }
+                               requests.acquire();
 
                                sleepers.decrementAndGet();
 
index 93903fb9b2d4371e01f521eca67c55cc925d33bf..ffbec2f5e01ca88222926bb1c03848ed940d692a 100644 (file)
@@ -185,15 +185,17 @@ public class EntityInstances implements Instances {
        CollectionSupport coll = graph.getService(CollectionSupport.class);
        
        THashSet<Resource> visited = new THashSet<>();
-       List<Resource> rec = findRec(graph, index, filter, visited);
+       List<Resource> rec_ = findRec(graph, index, filter, visited);
+       // We must not modify rec_!
+       List<Resource> rec = rec_;
        for(Resource global : Layer0Utils.listGlobalOntologies(graph)) {
                if(!visited.add(global)) continue;
                List<Resource> rs = graph.syncRequest(new QueryIndex(global, type, filter), TransientCacheListener.<List<Resource>>instance());
-               if(rec.isEmpty() && !rs.isEmpty()) {
-                       // TODO: rec could be an immutable empty list
-                       rec = new ArrayList<Resource>();
+               if(!rs.isEmpty()) {
+                   if(rec == rec_)
+                       rec = new ArrayList<>(rec);
+                rec.addAll(rs);
                }
-               rec.addAll(rs);
        }
        Collection<Resource> result = coll.asSortedList(rec);
        return result; 
index a93476355abebb960eaa5b7f26582d9e89b066dc..fb421b6105ba1e526f6bfcd883b01b1fe41e4287 100644 (file)
@@ -118,7 +118,7 @@ public class SessionRequestManager {
        
        public synchronized void startRead(int thread, final SessionRead task) {
                
-               session.queryProvider2.schedule(new SessionTask(null) {
+               session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                        @Override
             public void run0(int thread) {
@@ -142,7 +142,7 @@ public class SessionRequestManager {
        
        public synchronized void startReadUpdate(int thread) {
                
-               session.queryProvider2.schedule(new SessionTask(null) {
+               session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                        @Override
                        public void run0(int thread) {
@@ -163,7 +163,7 @@ public class SessionRequestManager {
 
        public synchronized void startWrite(int thread, final SessionTask task) {
                
-               session.queryProvider2.schedule(new SessionTask(null) {
+               session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                        @Override
                        public void run0(int thread) {
@@ -184,7 +184,7 @@ public class SessionRequestManager {
 
        public synchronized void startWriteUpdate(int thread) {
                
-               session.queryProvider2.schedule(new SessionTask(null) {
+               session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                        @Override
                        public void run0(int thread) {
@@ -271,7 +271,7 @@ public class SessionRequestManager {
                        if (!reads.isEmpty()) {
 
                                final SessionRead read = reads.poll();
-                               session.queryProvider2.schedule(new SessionTask(null) {
+                               session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                                        @Override
                                        public void run0(int thread) {
@@ -312,7 +312,7 @@ public class SessionRequestManager {
                assert(State.INIT != state);
                
                if(State.READ == state) {
-                       session.queryProvider2.schedule(new SessionTask(null) {
+                       session.queryProvider2.scheduleNow(new SessionTask(null) {
 
                                @Override
                                public void run0(int thread) {