]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Merge changes I3a38eed6,I3f8b3504
authorTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Tue, 4 Feb 2020 15:53:13 +0000 (15:53 +0000)
committerGerrit Code Review <gerrit2@simantics>
Tue, 4 Feb 2020 15:53:13 +0000 (15:53 +0000)
* changes:
  Wrong graph was used when performing async query from session
  DB request scheduling scheme fails with district diagrams

14 files changed:
bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java [new file with mode: 0644]
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/QueryControlImpl.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeClassRequest.java

index c491fb377464e366f5b34e8e8eac2e0e205840c5..8f96bb9e6d3a51a48c7b98baa40f962fc83f5617 100644 (file)
@@ -18,31 +18,32 @@ import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.query.AsyncReadEntry;
 import org.simantics.db.impl.query.PendingTaskSupport;
 import org.simantics.db.procedure.AsyncProcedure;
-
+import org.simantics.db.request.AsyncRead;
 public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
 
     private static final Object NO_RESULT = new Object();
 
-    public final Object key;
-    public final ReadGraphImpl queryGraph;
-    public final ReadGraphImpl callerGraph;
-    public final AsyncProcedure<Result> procedure;
-    public PendingTaskSupport pendingTaskSupport;
-    public Object result = NO_RESULT;
-    public Throwable exception = null;
+    private final Object key;
+    private final ReadGraphImpl queryGraph;
+    private final ReadGraphImpl callerGraph;
+    private final AsyncProcedure<Result> procedure;
+    private PendingTaskSupport pendingTaskSupport;
+    private final boolean needsToBlock;
+    private Object result = NO_RESULT;
+    private Throwable exception = null;
 
     private ReadGraphImpl queryGraph() {
         return queryGraph;
     }
     
     public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure, Object key, boolean needsToBlock) {
-
+        
         // A new graph for evaluating the query with correct parent and asyncBarrier
         queryGraph = callerGraph.withParent(entry, () -> {
 
             dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock);
             
-        });
+        }, needsToBlock);
         
         queryGraph.asyncBarrier.inc();
 
@@ -50,6 +51,7 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
         this.key = key;
         this.queryGraph.asyncBarrier.inc();
         this.callerGraph = callerGraph;
+        this.needsToBlock = needsToBlock;
         if (BarrierTracing.BOOKKEEPING) {
             BarrierTracing.registerBAP(this);
         }
@@ -84,7 +86,8 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
     @SuppressWarnings("unchecked")
     public Result get() throws DatabaseException {
 
-        queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
+        if(needsToBlock)
+            queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
 
         if(exception != null) {
             if(exception instanceof DatabaseException) throw (DatabaseException)exception;
@@ -113,7 +116,7 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
         
         AsyncProcedure<Result> procedure = entry != null ? entry : procedure_;
 
-        ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent);
+        ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent, null, needsToBlock);
         executeGraph.asyncBarrier.inc();
         try {
             if(procedure != null) {
@@ -144,5 +147,33 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
         }
 
     }
+    
+    public void print() {
+        System.err.println("BlockingAsyncProcedure");
+        System.err.println("-key: " + key);
+        System.err.println("-queryGraph: " + queryGraph);
+        System.err.println("-callerGraph: " + callerGraph);
+        System.err.println("-procedure: " + procedure);
+        System.err.println("-pendingTaskSupport: " + pendingTaskSupport);
+        System.err.println("-result: " + result);
+        System.err.println("-exception: " + exception);
+    }
+    
+    public Result performSync(AsyncRead<Result> request) throws DatabaseException {
+        try {
+            request.perform(queryGraph, this);
+        } finally {
+            dec();
+        }
+        return get();
+    }
+
+    public void performAsync(AsyncRead<Result> request) throws DatabaseException {
+        try {
+            request.perform(queryGraph, this);
+        } finally {
+            dec();
+        }
+    }
 
 }
index 060fc22aefb866d15a17bb0f70e7cd4692846d90..637db3f26f8e119173fbf29cbe54b46ed9b17002 100644 (file)
@@ -18,209 +18,206 @@ import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 
-final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
+public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
 
-       private static final long serialVersionUID = 4724463372850048672L;
+    private static final long serialVersionUID = 4724463372850048672L;
 
-       static final int WAIT_TIME = 60000;
+    static final int WAIT_TIME = 60000;
 
-       public static final boolean PRINT = false;
+    public static final boolean PRINT = false;
 
-       final public AsyncBarrierImpl caller;
-       
-       final public Runnable callback;
+    final AsyncBarrierImpl caller;
 
-       public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback) {
-               super(0);
-               this.caller = caller;
-               this.callback = callback;
+    private final Runnable callback;
+
+    private final boolean needsToBlock;
+
+    public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
+        super(0);
+        this.caller = caller;
+        this.callback = callback;
+        this.needsToBlock = needsToBlock;
         if (BarrierTracing.BOOKKEEPING) {
             BarrierTracing.trace(this, entry);
         }
-       }
-
-
-       public void inc() {
+    }
+
+    public AsyncBarrier getBlockingBarrier() {
+        if(needsToBlock)
+            return this;
+        if(caller == null)
+            return null;
+        else return caller.getBlockingBarrier();
+    }
+
+    @Override
+    public boolean isBlocking() {
+        return needsToBlock;
+    }
+
+    @Override
+    public void inc() {
+
+        if(BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.inc(this);
+        } else {
+            inc(null, null);
+        }
 
-           if(BarrierTracing.BOOKKEEPING) {
-               BarrierTracing.inc(this);
-           } else {
-               inc(null, null);
-           }
-           
-       }
+    }
 
-       void inc(Object id, String info) {
+    void inc(Object id, String info) {
 
-               if(PRINT) {
-                       System.err.println("inc barrier[" + get() + "] " + this);
-                       StackTraceElement[] elems = new Exception().getStackTrace();
-                       for(int i=0;i<4;i++) System.err.println(elems[i]);
-               }
+        if(PRINT) {
+            System.err.println("inc barrier[" + get() + "] " + this);
+            StackTraceElement[] elems = new Exception().getStackTrace();
+            for(int i=0;i<4;i++) System.err.println(elems[i]);
+        }
 
-               if (incrementAndGet() == 1) {
-                       if (caller != null) {
-                       if(BarrierTracing.BOOKKEEPING) {
+        if (incrementAndGet() == 1) {
+            if (caller != null) {
+                if(BarrierTracing.BOOKKEEPING) {
                     caller.inc(this, "Child");
-                       } else {
+                } else {
                     caller.inc(null, null);
-                       }
-                       }
-               }
+                }
+            }
+        }
 
-       }
+    }
 
-       public void dec() {
+    @Override
+    public void dec() {
 
-               if(PRINT) {
-                       System.err.println("dec barrier[" + get() + "] " + this);
-                       StackTraceElement[] elems = new Exception().getStackTrace();
-                       for(int i=0;i<3;i++) System.err.println(elems[i]);
-               }
+        if(PRINT) {
+            System.err.println("dec barrier[" + get() + "] " + this);
+            StackTraceElement[] elems = new Exception().getStackTrace();
+            for(int i=0;i<3;i++) System.err.println(elems[i]);
+        }
 
-               int count = decrementAndGet();
-               if (count < 1) {
+        int count = decrementAndGet();
+        if (count < 1) {
             if(BarrierTracing.BOOKKEEPING) {
                 BarrierTracing.dec(this, count);
             }
-                       if (count == 0) {
-                               if (caller != null) {
-                                       caller.dec();
-                               }
-                       }
-                       if (count < 0) {
-                               Logger.defaultLogError(
-                                               "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
-                                               new Exception());
-                       }
-                       assert (count >= 0);
-                       
-                       if(callback != null)
-                           callback.run();
-                       
-               }
-
-       }
-
-       public static String report(AsyncBarrierImpl barrier) {
-               CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
-               if(e != null) return e.toString();
-               else return "Barrier@" + System.identityHashCode(barrier);
-       }
-       
-       public static void printReverse(AsyncBarrierImpl barrier, int indent) {
-
-               if (barrier.get() == 0)
-                       return;
-               for (int i = 0; i < indent; i++)
-                       System.err.print(" ");
-               System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
-//             if (BOOKKEEPING) {
-//                     Debugger debugger = debuggerMap.get(barrier);
-//                     debugger.toErr(indent + 2);
-//             }
-
-               Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
-               if (children != null) {
-                       for (AsyncBarrierImpl child : children)
-                               printReverse(child, indent + 2);
-               }
-
-       }
-
-       public void waitBarrier(Object request, ReadGraphImpl impl) {
-
-               if (get() > 0) {
-
-                       long waitCount = 0;
-
-                       while (get() != 0) {
-
-                               boolean executed = impl.performPending();
-                               if(executed) waitCount = 0;
-                               
-                               ++waitCount;
-                               if(waitCount > 100) Thread.yield();
-                               if(waitCount > 1000) {
-                                       try {
-                                               Thread.sleep(1);
-                                       } catch (InterruptedException e) {
-                                               e.printStackTrace();
-                                       }
-                               }
-                               if(waitCount > WAIT_TIME) {
-
-                                       System.err.println("AsyncBarrierImpl.waitBarrier("
-                                                       + request
-                                                       + ") is taking long to execute, so far "
-                                                       + (waitCount / 1000) + " s.");
-
-                                       if (BarrierTracing.BOOKKEEPING) {
-                                               synchronized (BarrierTracing.reverseLookup) {
-                                                       printReverse(this, 0);
-                                               }
-                                               BarrierTracing.printBAPS();
-                                       }
-                                       
-                                       for(SessionTask t : impl.processor.freeScheduling) {
-                                           System.err.println("Pending task:" + t);
-                                       }
-
-//                                     if(Development.DEVELOPMENT) {
-//                                             impl.processor.threadLocks[0].lock();
-//                                             System.err.println("-queues=" + impl.processor.queues[0].size());
-//                                             impl.processor.threadLocks[0].unlock();
-//                                             System.err.println("-own=" + impl.processor.ownTasks[0].size());
-//                                             System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
-//                                             for(SessionTask task : impl.processor.ownSyncTasks[0]) {
-//                                                     System.err.println("--" + task);
-//                                             }
-//                                     }
-
-                                       throw new RuntimeDatabaseException("Request timed out.");
-                                       //waitCount = 0;
-
-                               }
-
-                       }
-
-               }
-
-       }
-
-       public void restart() {
-               assertReady();
-               if(BarrierTracing.BOOKKEEPING) {
-                   BarrierTracing.restart(this);
-               }
-       }
-
-       public void assertReady() {
-               int current = get();
-               if (current != 0)
-                       throw new AssertionError("Barrier was not finished (pending="
-                                       + current + ").");
-       }
-
-       public void report() {
-               // System.out.println("Barrier log:");
-               // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
-               // System.out.println(entry.getKey() + " " + entry.getValue());
-               // }
-               // System.out.println("SyncIntProcedure log:");
-               // for(Map.Entry<String, Integer> entry :
-               // SyncIntProcedure.counters.entrySet()) {
-               // System.out.println(entry.getKey() + " " + entry.getValue());
-               // }
-       }
-
-       @Override
-       public String toString() {
-               return report(this);
-//             return "AsyncBarrierImpl@" + System.identityHashCode(this)
-//                             + " - counter = " + get() + " - caller = " + caller;
-       }
+            if (count == 0) {
+                if (caller != null) {
+                    caller.dec();
+                }
+            }
+            if (count < 0) {
+                Logger.defaultLogError(
+                        "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
+                        new Exception());
+            }
+            assert (count >= 0);
+
+            if(callback != null)
+                callback.run();
+
+        }
+
+    }
+
+    public static String report(AsyncBarrierImpl barrier) {
+        CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
+        if(e != null) return e.toString();
+        else return "Barrier@" + System.identityHashCode(barrier);
+    }
+
+    public static void printReverse(AsyncBarrierImpl barrier, int indent) {
+
+        if (barrier.get() == 0)
+            return;
+        for (int i = 0; i < indent; i++)
+            System.err.print(" ");
+        System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
+
+        Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
+        if (children != null) {
+            for (AsyncBarrierImpl child : children)
+                printReverse(child, indent + 2);
+        }
+        
+    }
+
+    public void waitBarrier(Object request, ReadGraphImpl impl) {
+
+        if (get() > 0) {
+
+            long waitCount = 0;
+
+            while (get() != 0) {
+
+                boolean executed = impl.performPending();
+                if(executed) waitCount = 0;
+
+                ++waitCount;
+                if(waitCount > 100) Thread.yield();
+                if(waitCount > 1000) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                if(waitCount > WAIT_TIME) {
+
+                    System.err.println("AsyncBarrierImpl.waitBarrier("
+                            + request
+                            + ") is taking long to execute, so far "
+                            + (waitCount / 1000) + " s.");
+
+                    if (BarrierTracing.BOOKKEEPING) {
+                        synchronized (BarrierTracing.reverseLookup) {
+                            printReverse(this, 0);
+                        }
+                        BarrierTracing.printBAPS();
+                    }
+
+                    throw new RuntimeDatabaseException("Request timed out.");
+
+                }
+
+            }
+
+        }
+
+    }
+
+    public void restart() {
+        assertReady();
+        if(BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.restart(this);
+        }
+    }
+
+    public void assertReady() {
+        int current = get();
+        if (current != 0)
+            throw new AssertionError("Barrier was not finished (pending="
+                    + current + ").");
+    }
+
+    public void report() {
+        // System.out.println("Barrier log:");
+        // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
+        // System.out.println(entry.getKey() + " " + entry.getValue());
+        // }
+        // System.out.println("SyncIntProcedure log:");
+        // for(Map.Entry<String, Integer> entry :
+        // SyncIntProcedure.counters.entrySet()) {
+        // System.out.println(entry.getKey() + " " + entry.getValue());
+        // }
+    }
+
+    @Override
+    public String toString() {
+        return report(this);
+        //             return "AsyncBarrierImpl@" + System.identityHashCode(this)
+        //                             + " - counter = " + get() + " - caller = " + caller;
+    }
+
 
 }
index 2433b897f888d15044e135433aa444fb48f498d0..68fdd824ed29f95eee0fa8f1cb0a49eb9dcebf92 100644 (file)
@@ -32,16 +32,10 @@ public class BarrierTracing {
 
     synchronized public static void printBAPS() {
         for(BlockingAsyncProcedure bap : baps.keySet()) {
-            Throwable e = baps.get(bap);
-            System.err.println("BlockingAsyncProcedure");
-            System.err.println("-key: " + bap.key);
-            System.err.println("-queryGraph: " + bap.queryGraph);
-            System.err.println("-callerGraph: " + bap.callerGraph);
-            System.err.println("-procedure: " + bap.procedure);
-            System.err.println("-pendingTaskSupport: " + bap.pendingTaskSupport);
-            System.err.println("-result: " + bap.result);
-            System.err.println("-exception: " + bap.exception);
-            e.printStackTrace();
+            bap.print();
+            Throwable t = baps.get(bap);
+            if(t != null)
+                t.printStackTrace();
         }
     }
     
index 2eced2b7c046ebef86a64760cee27106667c706c..48662d1903a08267a13dded57e04e04f8b85db6d 100644 (file)
@@ -5662,7 +5662,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                this.parentGraph = parentGraph;
                this.parent = parent;
                this.processor = support;
-               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null);
+               this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false);
        }
 
     ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
@@ -5676,34 +5676,30 @@ public class ReadGraphImpl implements AsyncReadGraph {
                this(graph, parent, graph.processor);
        }
 
-    ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) {
-        this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback));
+    ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+        this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock));
     }
 
+    static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+        return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock);
+    }
+    
        ReadGraphImpl(ReadGraphImpl graph) {
                this(graph, graph.parent);
        }
 
-       public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) {
-               return new ReadGraphImpl(this, parent, callback);
+       public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) {
+               return new ReadGraphImpl(this, parent, callback, needsToBlock);
        }
 
-    public ReadGraphImpl withParent(CacheEntry parent) {
-        return withParent(parent, null);
-    }
-
-       public ReadGraphImpl syncWithParent(CacheEntry parent) {
-        return new ReadGraphImpl(this, parent, processor, null);
-    }
-
        public ReadGraphImpl forRecompute(CacheEntry parent) {
                return new ReadGraphImpl(null, parent, processor);
        }
 
-       public static ReadGraphImpl create(QueryProcessor support) {
-           ReadGraphImpl result = new ReadGraphImpl(null, null, support);
-           return result;
-       }
+    public static ReadGraphImpl create(QueryProcessor support) {
+        ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+        return result;
+    }
 
        public ReadGraphImpl newRestart(ReadGraphImpl impl) {
 
index 0dd5730a5f3a1f5c6c232063741bf4343303feb4..5017151b10b9f8478bf1df7319326d020b0fb583 100644 (file)
@@ -92,11 +92,8 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
                         }
 
                     }, id, true);
-
-                    id.perform(proc.queryGraph, proc);
                     
-                    proc.dec();
-                    proc.get();
+                    proc.performSync(id);
 
                 } catch (Throwable t) {
                     except(t);
@@ -158,17 +155,10 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
             AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
 
         BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(callerGraph, entry, procedure_, request, needsToBlock);
-
-        try {
-            request.perform(proc.queryGraph, proc);
-        } finally {
-            proc.queryGraph.asyncBarrier.dec();
-        }
-
         if(needsToBlock) {
-            proc.waitBarrier();
-            return proc.get();
+            return proc.performSync(request);
         } else {
+            proc.performAsync(request);
             return null;
         }
 
index 556d5fc3e12b8d2df0d5e42ea2efb2221a8fb65f..d044043394ef771f7e7e0cfab3e5257f77883db6 100644 (file)
@@ -180,7 +180,7 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
        
                try {
                        
-                       ReadGraphImpl queryGraph = graph.withParent(this);
+                       ReadGraphImpl queryGraph = graph.withParent(this, null, true);
 
                        if(!registered) {
                                id.register(graph, this);
index deeb65fddb92a3c161e6315d6ce75ef1d00282aa..f80fb5098b847d9264768be6c833c39d38a7ac24 100644 (file)
@@ -106,7 +106,7 @@ public class QueryCacheBase {
 
        public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
 
-               ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
+               ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, false);
 
                AsyncMultiReadEntry entry = (AsyncMultiReadEntry)entry_;
                AsyncMultiProcedure<T> procedure = (AsyncMultiProcedure<T>)procedure_;
@@ -169,7 +169,7 @@ public class QueryCacheBase {
 
        public <T> Object performQuery(ReadGraphImpl parentGraph, final MultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
 
-               ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
+               ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, true);
 
                MultiReadEntry entry = (MultiReadEntry)entry_;
                SyncMultiProcedure<T> procedure = (SyncMultiProcedure<T>)procedure_;
index eaaa9b3730976265c0673dc4547bbe35b3d4904c..a46bc0e8dd4e1b024dea3b0dd8d7269b10a19192 100644 (file)
@@ -141,10 +141,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
-       
-       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
-    
-    public LinkedList<SessionTask>                           topLevelTasks = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -152,6 +148,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
+       final Scheduling scheduling;
+       
        public ThreadState[]                                                                    threadStates;
        
        final Object querySupportLock;
@@ -161,26 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-    public SessionTask getSubTask(ReadGraphImpl parent) {
-        synchronized(querySupportLock) {
-            int index = 0;
-            while(index < freeScheduling.size()) {
-                SessionTask task = freeScheduling.get(index);
-                if(task.isSubtask(parent) && task.maybeReady()) {
-                    return freeScheduling.remove(index);
-                }
-                index++;
-            }
-        }
-        return null;
-    }
 
     /*
      * We are running errands while waiting for requests to complete.
      * We can only run work that is part of the current root request to avoid any deadlocks
      */
     public boolean performPending(ReadGraphImpl under) {
-        SessionTask task = getSubTask(under);
+        SessionTask task = scheduling.getSubTask(under);
                if(task != null) {
                        task.run(thread.get());
                        return true;
@@ -195,28 +180,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
     }
 
     final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
-
-        assert(request != null);
-
-        synchronized(querySupportLock) {
-
-            LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
-            
-            if(BarrierTracing.BOOKKEEPING) {
-                Exception current = new Exception();
-                Exception previous = BarrierTracing.tasks.put(request, current);
-                if(previous != null) {
-                    previous.printStackTrace();
-                    current.printStackTrace();
-                }
-            }
-
-            queue.addFirst(request);
-            requests.release();
-
-        }
-
-        return null;
+        
+        return scheduling.scheduleOrReturnForExecution(request);
 
     }
 
@@ -298,6 +263,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
+               scheduling = new Scheduling(requests);
+               
                querySupport = core;
                cache = new QueryCache(core, threads);
                session = querySupport.getSession();
@@ -1888,12 +1855,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        Exception callerException = null;
 
-       public interface AsyncBarrier {
-               public void inc(); 
-               public void dec();
-               //        public void inc(String debug); 
-               //        public void dec(String debug);
-       }
+    public interface AsyncBarrier {
+        public void inc(); 
+        public void dec();
+        public void waitBarrier(Object request, ReadGraphImpl impl);
+        public boolean isBlocking();
+    }
 
 //     final public QueryProcessor processor;
 //     final public QuerySupport support;
index cfa088a2dbfd7a966e0af3db7f4d9a5aa5d1ad4c..7aa89b3f032ea68f08e75f1d65bd0c0e87bdc030 100644 (file)
@@ -98,17 +98,7 @@ class QueryThread extends Thread implements SessionThread {
        }
 
        private boolean pumpTask() {
-           // First finish existing executions
-        if(!processor.freeScheduling.isEmpty()) {
-            tasks.add(processor.freeScheduling.removeFirst());
-            return true;
-        }
-        // Check for new tasks
-        if(!processor.topLevelTasks.isEmpty()) {
-            tasks.add(processor.topLevelTasks.removeFirst());
-            return true;
-        }
-               return false;
+           return processor.scheduling.pumpTask(tasks);
        }
        
        ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
index 9edcbb997b97554946ea510728fe9395259d5741..8f547d22b3e950a9c3b87e035fd9a0e07a4aef08 100644 (file)
@@ -100,10 +100,10 @@ public final class ReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implem
 
         AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
 
-        ReadGraphImpl queryGraph = graph.withParent(entry);
+        ReadGraphImpl queryGraph = graph.withParent(entry, null, needsToBlock);
         queryGraph.asyncBarrier.inc();
 
-        ReadGraphImpl executeGraph = graph.withParent(graph.parent);
+        ReadGraphImpl executeGraph = graph.withParent(graph.parent, null, needsToBlock);
         executeGraph.asyncBarrier.inc();
         
         try {
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java
new file mode 100644 (file)
index 0000000..061435f
--- /dev/null
@@ -0,0 +1,102 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.db.impl.graph.BarrierTracing;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+
+public class Scheduling {
+    
+    private final Semaphore                                  requests;
+
+    private Map<AsyncBarrier, LinkedList<SessionTask>>        freeScheduling = new HashMap<>();
+    
+    private LinkedList<SessionTask>                          topLevelTasks = new LinkedList<SessionTask>();
+
+    public Scheduling(Semaphore requests) {
+        this.requests = requests;
+    }
+    
+    public SessionTask getSubTask(ReadGraphImpl parent) {
+        synchronized(this) {
+            assert(parent.asyncBarrier.isBlocking());
+            LinkedList<SessionTask> tasks = freeScheduling.get(parent.asyncBarrier);
+            if(tasks == null)
+                return null;
+            SessionTask task = tasks.removeLast();
+            if(tasks.isEmpty())
+                freeScheduling.remove(parent.asyncBarrier);
+            return task;
+        }
+    }
+
+    public boolean pumpTask(ArrayList<SessionTask> tasks) {
+        
+        synchronized(this) {
+
+            // First finish existing executions
+            if(!freeScheduling.isEmpty()) {
+                Map.Entry<AsyncBarrier, LinkedList<SessionTask>> ls =  freeScheduling.entrySet().iterator().next();
+                assert(ls.getKey().isBlocking());
+                tasks.add(ls.getValue().removeLast());
+                if(ls.getValue().isEmpty())
+                    freeScheduling.remove(ls.getKey());
+                return true;
+            }
+            // Check for new tasks
+            if(!topLevelTasks.isEmpty()) {
+                tasks.add(topLevelTasks.removeLast());
+                return true;
+            }
+
+            return false;
+
+        }
+        
+    }
+
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
+        
+        assert(request != null);
+
+        synchronized(this) {
+
+            if(BarrierTracing.BOOKKEEPING) {
+                Exception current = new Exception();
+                Exception previous = BarrierTracing.tasks.put(request, current);
+                if(previous != null) {
+                    previous.printStackTrace();
+                    current.printStackTrace();
+                }
+            }
+
+            requests.release();
+
+            if(request.rootGraph != null) {
+                AsyncBarrier sb = request.rootGraph.asyncBarrier.getBlockingBarrier();
+                if(sb != null) {
+                    LinkedList<SessionTask> ls = freeScheduling.get(sb);
+                    if(ls == null) {
+                        ls = new LinkedList<SessionTask>();
+                        freeScheduling.put(sb, ls);
+                    }
+                    ls.addFirst(request);
+                    return null;
+                }
+            }
+            
+            topLevelTasks.addFirst(request);
+        }
+
+        return null;
+
+    }
+
+}
index 7165abef3547478f88dde81ba84c9cf15e7d90ef..237e8b50b96f2f88a006662f14a1571f5770a07b 100644 (file)
@@ -139,7 +139,7 @@ public class QueryControlImpl implements QueryControl {
        @Override
        public ReadGraph getIndependentGraph(ReadGraph graph) {
                ReadGraphImpl impl = (ReadGraphImpl)graph;
-               return impl.withParent(null);
+               return impl.withParent(null, null, false);
        }
 
        @Override
index 030d512eb736e2f6303f0d68c1d0b0e3d2e57b97..22d0f316545762d4937b384bea9fd648e8bdd441 100644 (file)
@@ -1632,37 +1632,31 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     if (listener != null) {
 
                         try {
-                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
-                                               } catch (DatabaseException e) {
-                                                       Logger.defaultLogError(e);
-                                               }
+                            QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            LOGGER.error("Unhandled query exception", e);
+                        }
 
                     } else {
 
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
-                               
-                               public void execute(AsyncReadGraph graph_, T result) {
-                                       task.finish();
-                                       super.execute(graph_, result);
-                               }
-                               
-                               public void exception(AsyncReadGraph graph_, Throwable t) {
-                                       task.finish();
-                                       super.exception(graph_, t);
-                               }
-                               
-                       };
-
-                       try {
+                        BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
 
-                            request.perform(newGraph, wrap);
-                            wrap.dec();
-                            wrap.get();
+                            public void execute(AsyncReadGraph graph_, T result) {
+                                task.finish();
+                                super.execute(graph_, result);
+                            }
 
-                        } catch (DatabaseException e) {
+                            public void exception(AsyncReadGraph graph_, Throwable t) {
+                                task.finish();
+                                super.exception(graph_, t);
+                            }
 
-                                                       Logger.defaultLogError(e);
+                        };
 
+                        try {
+                            wrap.performSync(request);
+                        } catch (DatabaseException e) {
+                            LOGGER.error("Unhandled query exception", e);
                         }
 
                     }
index d99d2963ba12f79b55ad586113683a8b53b0a4ff..8ab24b71ab083012ca97fd695e90592b0304d929 100644 (file)
@@ -36,18 +36,6 @@ public class NodeClassRequest extends BaseRequest2<Resource, ElementClass> {
     @Override
     public void perform(AsyncReadGraph graph, final AsyncProcedure<ElementClass> procedure) {
 
-//        System.out.println("NodeClassRequest " + data.getResourceId());
-//        graph.asyncRequest(new SafeName(data), new Procedure<String>() {
-//            @Override
-//            public void exception(Throwable t) {
-//            }
-//            @Override
-//            public void execute(String result) {
-//                System.out.println("NodeClassRequest "  + result);
-//                debug = result;
-//            }
-//        });
-
         graph.forAdapted(data, ElementFactory.class, new AsyncProcedure<ElementFactory>() {
 
             @Override
@@ -61,18 +49,6 @@ public class NodeClassRequest extends BaseRequest2<Resource, ElementClass> {
             @Override
             public void execute(AsyncReadGraph graph, final ElementFactory factory) {
 
-//                System.out.println("NodeClassRequest factory " + data.getResourceId() + " = " + factory.getClass().getName());
-//                graph.asyncRequest(new SafeName(data), new Procedure<String>() {
-//                    @Override
-//                    public void exception(Throwable t) {
-//                    }
-//                    @Override
-//                    public void execute(String result) {
-//                        System.out.println("NodeClassRequest "  + result + " " + factory.getClass().getName());
-//                        debug = result;
-//                    }
-//                });
-
                 if (factory == null) {
                     // Should never happen since we are using forAdapted
                     System.out.println("No factory in NodeClassRequest! " + this);