]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java
DB request scheduling scheme fails with district diagrams
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / Scheduling.java
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java
new file mode 100644 (file)
index 0000000..061435f
--- /dev/null
@@ -0,0 +1,102 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.db.impl.graph.BarrierTracing;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+
+public class Scheduling {
+    
+    private final Semaphore                                  requests;
+
+    private Map<AsyncBarrier, LinkedList<SessionTask>>        freeScheduling = new HashMap<>();
+    
+    private LinkedList<SessionTask>                          topLevelTasks = new LinkedList<SessionTask>();
+
+    public Scheduling(Semaphore requests) {
+        this.requests = requests;
+    }
+    
+    public SessionTask getSubTask(ReadGraphImpl parent) {
+        synchronized(this) {
+            assert(parent.asyncBarrier.isBlocking());
+            LinkedList<SessionTask> tasks = freeScheduling.get(parent.asyncBarrier);
+            if(tasks == null)
+                return null;
+            SessionTask task = tasks.removeLast();
+            if(tasks.isEmpty())
+                freeScheduling.remove(parent.asyncBarrier);
+            return task;
+        }
+    }
+
+    public boolean pumpTask(ArrayList<SessionTask> tasks) {
+        
+        synchronized(this) {
+
+            // First finish existing executions
+            if(!freeScheduling.isEmpty()) {
+                Map.Entry<AsyncBarrier, LinkedList<SessionTask>> ls =  freeScheduling.entrySet().iterator().next();
+                assert(ls.getKey().isBlocking());
+                tasks.add(ls.getValue().removeLast());
+                if(ls.getValue().isEmpty())
+                    freeScheduling.remove(ls.getKey());
+                return true;
+            }
+            // Check for new tasks
+            if(!topLevelTasks.isEmpty()) {
+                tasks.add(topLevelTasks.removeLast());
+                return true;
+            }
+
+            return false;
+
+        }
+        
+    }
+
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
+        
+        assert(request != null);
+
+        synchronized(this) {
+
+            if(BarrierTracing.BOOKKEEPING) {
+                Exception current = new Exception();
+                Exception previous = BarrierTracing.tasks.put(request, current);
+                if(previous != null) {
+                    previous.printStackTrace();
+                    current.printStackTrace();
+                }
+            }
+
+            requests.release();
+
+            if(request.rootGraph != null) {
+                AsyncBarrier sb = request.rootGraph.asyncBarrier.getBlockingBarrier();
+                if(sb != null) {
+                    LinkedList<SessionTask> ls = freeScheduling.get(sb);
+                    if(ls == null) {
+                        ls = new LinkedList<SessionTask>();
+                        freeScheduling.put(sb, ls);
+                    }
+                    ls.addFirst(request);
+                    return null;
+                }
+            }
+            
+            topLevelTasks.addFirst(request);
+        }
+
+        return null;
+
+    }
+
+}