Multiple readers in db client 11/3811/4
authorAntti Villberg <antti.villberg@semantum.fi>
Thu, 23 Jan 2020 12:17:13 +0000 (14:17 +0200)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Thu, 30 Jan 2020 10:21:44 +0000 (10:21 +0000)
gitlab #5

Change-Id: I5a44b7d19f6529f1ceceab1875e227db326e1c24

21 files changed:
bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java [new file with mode: 0644]
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java [new file with mode: 0644]
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java [new file with mode: 0644]
bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java
bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java [new file with mode: 0644]
bundles/org.simantics.document.server/src/org/simantics/document/server/request/DocumentRequest.java
bundles/org.simantics.document.server/src/org/simantics/document/server/request/NodesRequest.java
bundles/org.simantics.issues.common/src/org/simantics/issues/common/ChildMaxIssueSeverity.java
bundles/org.simantics.issues.common/src/org/simantics/issues/common/MaxIssueSeverityRecursive.java
bundles/org.simantics.modeling.ui/src/org/simantics/modeling/ui/diagram/monitor/MonitorClassFactory2.java

index 2d3e2804c6ebf4d630984892195c6fe2e2e4b0fa..c491fb377464e366f5b34e8e8eac2e0e205840c5 100644 (file)
 package org.simantics.db.impl;
 
 import org.simantics.db.AsyncReadGraph;
-import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.DatabaseException;
-import org.simantics.db.impl.graph.AsyncBarrierImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
 import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.AsyncReadEntry;
+import org.simantics.db.impl.query.PendingTaskSupport;
 import org.simantics.db.procedure.AsyncProcedure;
 
 public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
 
     private static final Object NO_RESULT = new Object();
 
-    private final Object key;
-    private final AsyncBarrierImpl barrier;
-    private final ReadGraphImpl procedureGraph;
-    private final AsyncProcedure<Result> procedure;
+    public final Object key;
+    public final ReadGraphImpl queryGraph;
+    public final ReadGraphImpl callerGraph;
+    public final AsyncProcedure<Result> procedure;
+    public PendingTaskSupport pendingTaskSupport;
+    public Object result = NO_RESULT;
+    public Throwable exception = null;
 
-    private Object result = NO_RESULT;
-    private Throwable exception = null;
+    private ReadGraphImpl queryGraph() {
+        return queryGraph;
+    }
+    
+    public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure, Object key, boolean needsToBlock) {
+
+        // A new graph for evaluating the query with correct parent and asyncBarrier
+        queryGraph = callerGraph.withParent(entry, () -> {
+
+            dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock);
+            
+        });
+        
+        queryGraph.asyncBarrier.inc();
 
-    public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure<Result> procedure, Object key) {
         this.procedure = procedure;
         this.key = key;
-        this.barrier = barrier;
-        this.barrier.inc();
-        this.procedureGraph = procedureGraph;
+        this.queryGraph.asyncBarrier.inc();
+        this.callerGraph = callerGraph;
+        if (BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.registerBAP(this);
+        }
     }
 
     @Override
     public void execute(AsyncReadGraph graph_, Result result) {
+        
         this.result = result;
-        try {
-            if(procedure != null) procedure.execute(procedureGraph, result);
-        } catch (Throwable throwable) {
-            Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable);
-        } finally {
-            barrier.dec();
-        }
+        queryGraph.asyncBarrier.dec();
+        
     }
 
     @Override
     public void exception(AsyncReadGraph graph_, Throwable t) {
+
         this.exception = t;
-        try {
-            if(procedure != null) procedure.exception(procedureGraph, t);
-        } catch (Throwable throwable) {
-            Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable);
-        } finally {
-            barrier.dec();
-        }
+        queryGraph.asyncBarrier.dec();
+        
     }
-
+    
     public void waitBarrier() {
-        barrier.waitBarrier(key, procedureGraph);
+        queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
+    }
+    
+    public void dec() {
+
+        queryGraph.asyncBarrier.dec();
+        
     }
 
     @SuppressWarnings("unchecked")
     public Result get() throws DatabaseException {
 
-        barrier.waitBarrier(key, procedureGraph);
+        queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
 
         if(exception != null) {
             if(exception instanceof DatabaseException) throw (DatabaseException)exception;
@@ -80,10 +95,6 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
 
     }
 
-    public boolean isDone() {
-        return barrier.get() == 0;
-    }
-
     @SuppressWarnings("unchecked")
     public Result getResult() {
         return (Result)result;
@@ -97,5 +108,41 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
     public String toString() {
         return "." + procedure; 
     }
+    
+    private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure_, boolean needsToBlock) {
+        
+        AsyncProcedure<Result> procedure = entry != null ? entry : procedure_;
+
+        ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent);
+        executeGraph.asyncBarrier.inc();
+        try {
+            if(procedure != null) {
+                procedure.execute(executeGraph, get());
+            }
+        } catch (DatabaseException e) {
+            if(procedure != null) procedure.exception(executeGraph, e);
+            exception = e;
+        } catch (Throwable t) {
+            DatabaseException dbe = new DatabaseException(t);
+            if(procedure != null) procedure.exception(executeGraph, dbe);
+            exception = dbe;
+        } finally {
+
+            if (entry != null) {
+                assert(entry.isReady());
+                // This does not throw
+                entry.performFromCache(executeGraph, procedure_);
+            }
+
+            executeGraph.asyncBarrier.dec();
+            if(needsToBlock)
+                executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
+        }
+
+        if (BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.unregisterBAP(this);
+        }
+
+    }
 
 }
index b94ec6f6857cb0e0ca913ef920a5231c9ecc3577..060fc22aefb866d15a17bb0f70e7cd4692846d90 100644 (file)
@@ -18,20 +18,24 @@ import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 
 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
 
        private static final long serialVersionUID = 4724463372850048672L;
 
-       static final int WAIT_TIME = 600;
+       static final int WAIT_TIME = 60000;
 
        public static final boolean PRINT = false;
 
        final public AsyncBarrierImpl caller;
+       
+       final public Runnable callback;
 
-       public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
+       public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback) {
                super(0);
                this.caller = caller;
+               this.callback = callback;
         if (BarrierTracing.BOOKKEEPING) {
             BarrierTracing.trace(this, entry);
         }
@@ -92,6 +96,10 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
                                                new Exception());
                        }
                        assert (count >= 0);
+                       
+                       if(callback != null)
+                           callback.run();
+                       
                }
 
        }
@@ -142,7 +150,7 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
                                                e.printStackTrace();
                                        }
                                }
-                               if(waitCount > WAIT_TIME*1000) {
+                               if(waitCount > WAIT_TIME) {
 
                                        System.err.println("AsyncBarrierImpl.waitBarrier("
                                                        + request
@@ -153,6 +161,11 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
                                                synchronized (BarrierTracing.reverseLookup) {
                                                        printReverse(this, 0);
                                                }
+                                               BarrierTracing.printBAPS();
+                                       }
+                                       
+                                       for(SessionTask t : impl.processor.freeScheduling) {
+                                           System.err.println("Pending task:" + t);
                                        }
 
 //                                     if(Development.DEVELOPMENT) {
index 3ff77f6bc108c18b59535a3c02f3f1b51f78fac0..2433b897f888d15044e135433aa444fb48f498d0 100644 (file)
@@ -5,6 +5,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 
@@ -19,7 +20,31 @@ public class BarrierTracing {
     public static final HashMap<AsyncBarrierImpl, CacheEntry<?>> entryMap = new HashMap<>();
     public static final HashMap<AsyncBarrierImpl, Throwable> restartMap = new HashMap<>();
     public static final HashMap<AsyncBarrierImpl, Throwable> startMap = new HashMap<>();
+    public static final HashMap<BlockingAsyncProcedure, Throwable> baps = new HashMap<>();
 
+    synchronized public static void registerBAP(BlockingAsyncProcedure bap) {
+        baps.put(bap, new Exception());
+    }
+    
+    synchronized public static void unregisterBAP(BlockingAsyncProcedure bap) {
+        baps.remove(bap);
+    }
+
+    synchronized public static void printBAPS() {
+        for(BlockingAsyncProcedure bap : baps.keySet()) {
+            Throwable e = baps.get(bap);
+            System.err.println("BlockingAsyncProcedure");
+            System.err.println("-key: " + bap.key);
+            System.err.println("-queryGraph: " + bap.queryGraph);
+            System.err.println("-callerGraph: " + bap.callerGraph);
+            System.err.println("-procedure: " + bap.procedure);
+            System.err.println("-pendingTaskSupport: " + bap.pendingTaskSupport);
+            System.err.println("-result: " + bap.result);
+            System.err.println("-exception: " + bap.exception);
+            e.printStackTrace();
+        }
+    }
+    
     public static void trace(AsyncBarrierImpl barrier, CacheEntry<?> entry) {
 
         if (RESTART_GUARD) {
index a693b29228e026c1e1f6a5c7f3652c8d4320eb4c..2eced2b7c046ebef86a64760cee27106667c706c 100644 (file)
@@ -1926,9 +1926,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
        public <T> T syncRequest(final Read<T> request) throws DatabaseException {
                assert (request != null);
 
-               ITask task = ThreadLogger.task(request);
                T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
-               task.finish();
                return result;
                
        }
@@ -1950,10 +1948,8 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
                assert (request != null);
 
-               ITask task = ThreadLogger.task(request);
                ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
-               T result = QueryCache.resultReadEntry(this, request, parent, listener, procedure);
-               task.finish();
+               T result = (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
                return result;
 
        }
@@ -2038,10 +2034,8 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
                assert (request != null);
 
-               ITask task = ThreadLogger.task(request);
                ListenerBase listener = getListenerBase(procedure);
                T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true); 
-               task.finish();
                return result;
 
        }
@@ -5160,7 +5154,11 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
                assert (request != null);
                assert (procedure != null);
-       
+               
+               AsyncBarrierImpl barrier = asyncBarrier;
+               if(barrier != null)
+                   barrier.inc();
+               
                processor.scheduleNow(new SessionTask(this) {
 
                        @Override
@@ -5170,6 +5168,9 @@ public class ReadGraphImpl implements AsyncReadGraph {
                                        QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
                                } catch (DatabaseException e) {
                                        Logger.defaultLogError(e);
+                               } finally {
+                               if(barrier != null)
+                                   barrier.dec();
                                }
                        }
                        
@@ -5241,29 +5242,39 @@ public class ReadGraphImpl implements AsyncReadGraph {
                assert (request != null);
                assert (procedure != null);
 
-               ITask task = ThreadLogger.task(request);
+        AsyncBarrierImpl barrier = asyncBarrier;
+        if(barrier != null)
+            barrier.inc();
 
                processor.scheduleNow(new SessionTask(this) {
 
                        @Override
                        public void run0(int thread) {
-                               try {
+
+                           if(barrier != null)
+                           barrier.inc();
+
+                           try {
                                        final ListenerBase listener = getListenerBase(procedure);
                                        QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
 
                                                @Override
                                                public void execute(AsyncReadGraph graph, T result) {
-                                                       task.finish();
                                                        procedure.execute(graph, result);
+                                               if(barrier != null)
+                                                   barrier.dec();
                                                }
 
                                                @Override
                                                public void exception(AsyncReadGraph graph, Throwable throwable) {
-                                                       task.finish();
                                                        procedure.exception(graph, throwable);
+                            if(barrier != null)
+                                barrier.dec();
                                                }
                                                
                                        }, false);
+                               if(barrier != null)
+                                   barrier.dec();
                                } catch (DatabaseException e) {
                                        Logger.defaultLogError(e);
                                }
@@ -5651,27 +5662,47 @@ public class ReadGraphImpl implements AsyncReadGraph {
                this.parentGraph = parentGraph;
                this.parent = parent;
                this.processor = support;
-               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
+               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null);
        }
 
+    ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
+        this.parentGraph = parentGraph;
+        this.parent = parent;
+        this.processor = support;
+        this.asyncBarrier = asyncBarrier;
+    }
+
        ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
                this(graph, parent, graph.processor);
        }
 
+    ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) {
+        this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback));
+    }
+
        ReadGraphImpl(ReadGraphImpl graph) {
                this(graph, graph.parent);
        }
 
-       public ReadGraphImpl withParent(CacheEntry parent) {
-               return new ReadGraphImpl(this, parent);
+       public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) {
+               return new ReadGraphImpl(this, parent, callback);
        }
 
+    public ReadGraphImpl withParent(CacheEntry parent) {
+        return withParent(parent, null);
+    }
+
+       public ReadGraphImpl syncWithParent(CacheEntry parent) {
+        return new ReadGraphImpl(this, parent, processor, null);
+    }
+
        public ReadGraphImpl forRecompute(CacheEntry parent) {
                return new ReadGraphImpl(null, parent, processor);
        }
 
        public static ReadGraphImpl create(QueryProcessor support) {
-               return new ReadGraphImpl(null, null, support);
+           ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+           return result;
        }
 
        public ReadGraphImpl newRestart(ReadGraphImpl impl) {
@@ -6357,6 +6388,12 @@ public class ReadGraphImpl implements AsyncReadGraph {
         else return 1 + getLevelStatic(impl.parentGraph);
     }
     
+    public boolean isParent(ReadGraphImpl impl) {
+        if(impl == null) return false;
+        if(this == impl) return true;
+        return isParent(impl.parentGraph);
+    }
+    
     public ReadGraphImpl getTopLevelGraph() {
         return getTopLevelGraphStatic(this);
     }
index e13ecab72cd0819f71c3c337af62ad242db709c2..0dd5730a5f3a1f5c6c232063741bf4343303feb4 100644 (file)
  *******************************************************************************/
 package org.simantics.db.impl.query;
 
+import java.util.Collection;
+
 import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.DevelopmentKeys;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.BlockingAsyncProcedure;
-import org.simantics.db.impl.graph.AsyncBarrierImpl;
-import org.simantics.db.impl.graph.BarrierTracing;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncProcedure;
@@ -26,11 +26,12 @@ import org.simantics.utils.Development;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T> {
+final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T>, IPending {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class);
 
     protected AsyncRead<T> id;
+    protected PendingTaskSupport pendingTaskSupport;
 
     AsyncReadEntry(AsyncRead<T> request) {
         this.id = request;
@@ -77,7 +78,7 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
                 try {
 
-                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
+                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(graph, AsyncReadEntry.this, new AsyncProcedure<T>() {
 
                         @Override
                         public void execute(AsyncReadGraph graph, T result) {
@@ -90,10 +91,11 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
                             except(t);
                         }
 
-                    }, id);
-
-                    id.perform(graph, proc);
+                    }, id, true);
 
+                    id.perform(proc.queryGraph, proc);
+                    
+                    proc.dec();
                     proc.get();
 
                 } catch (Throwable t) {
@@ -152,103 +154,24 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     }
 
-    public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+    public static <T> T computeForEach(ReadGraphImpl callerGraph, AsyncRead<T> request, AsyncReadEntry<T> entry,
             AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
 
-        AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
-
-        ReadGraphImpl queryGraph = graph.withParent(entry);
-        queryGraph.asyncBarrier.inc();
-        
-        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
-        
-        class AsyncTask extends SessionTask {
-
-            int counter = 0;
-            T result;
-            DatabaseException exception;
-            
-            public AsyncTask(ReadGraphImpl graph) {
-                this(graph, 1);
-            }
-
-            public AsyncTask(ReadGraphImpl graph, int pos) {
-                super(graph);
-                this.position = pos;
-                if(this.position < 1024)
-                    this.position *= 2;
-            }
-
-            @Override
-            public void run0(int thread) {
-                if(needsToBlock) proc.waitBarrier();
-                if(proc.isDone()) {
-                    ReadGraphImpl executeGraph = graph.withParent(graph.parent);
-                    executeGraph.asyncBarrier.inc();
-                    try {
-                        result = (T)proc.get();
-                        if(procedure != null) {
-                            procedure.execute(executeGraph, result);
-                        }
-                    } catch (DatabaseException e) {
-                        if(procedure != null) procedure.exception(executeGraph, e);
-                        exception = e;
-                    } catch (Throwable t) {
-                        DatabaseException dbe = new DatabaseException(t);
-                        if(procedure != null) procedure.exception(executeGraph, dbe);
-                        exception = dbe;
-                    } finally {
-                        if (entry != null) {
-                            // This does not throw
-                            entry.performFromCache(executeGraph, procedure_);
-                        }
-                        executeGraph.asyncBarrier.dec();
-                        executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
-                    }
-                } else {
-                    if(counter++ > 10000) {
-                        if(BarrierTracing.BOOKKEEPING) {
-                            AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
-                            AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
-                            while(caller != null) {
-                                System.err.println("called by " + AsyncBarrierImpl.report(caller));
-                                caller = caller.caller;
-                            }
-                            for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) {
-                                AsyncBarrierImpl.printReverse(ab, 2);
-                            }
-                        }
-                        throw new IllegalStateException("Eternal loop in queries.");
-                    }
-                    graph.processor.scheduleLater(new AsyncTask(graph, position));
-                }
-            }
-            
-            @Override
-            public boolean maybeReady() {
-               return proc.isDone();
-            }
-            
-        }
+        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(callerGraph, entry, procedure_, request, needsToBlock);
 
         try {
-            request.perform(queryGraph, proc);
+            request.perform(proc.queryGraph, proc);
         } finally {
-            queryGraph.asyncBarrier.dec();
+            proc.queryGraph.asyncBarrier.dec();
         }
 
-        AsyncTask task = new AsyncTask(graph);
-
-        if(needsToBlock) task.run(0);
-        else if (proc.isDone()) task.run(0);
-        else  {
-            graph.processor.scheduleLater(task);
+        if(needsToBlock) {
+            proc.waitBarrier();
+            return proc.get();
+        } else {
             return null;
         }
 
-        if(task.exception != null) throw task.exception;
-        else return task.result;
-
     }
 
     @Override
@@ -263,13 +186,41 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     @Override
     public void execute(AsyncReadGraph graph, T result) {
-        setResult(result);
-        setReady();
+        Collection<SessionTask> tasks = null;
+        synchronized(this) {
+            setResult(result);
+            setReady();
+            if(pendingTaskSupport != null)
+                tasks = pendingTaskSupport.executePending();
+        }
+        if(tasks != null)
+            for(SessionTask task : tasks)
+                ((ReadGraphImpl)graph).processor.scheduleNow(task);
     }
 
     @Override
-    public void exception(AsyncReadGraph graph, Throwable throwable) {
-        except(throwable);
+    public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
+        Collection<SessionTask> tasks = null;
+        synchronized(this) {
+            except(throwable);
+            if(pendingTaskSupport != null)
+                tasks = pendingTaskSupport.executePending();
+        }
+        if(tasks != null)
+            for(SessionTask task : tasks)
+                ((ReadGraphImpl)graph).processor.scheduleNow(task);
+    }
+
+    public void executeWhenResultIsAvailable(QueryProcessor processor, SessionTask task) {
+        boolean ready = false;
+        synchronized(this) {
+            if(pendingTaskSupport == null)
+                pendingTaskSupport = new PendingTaskSupport(this);
+            ready = pendingTaskSupport.executeWhenResultIsAvailable(task);
+        }
+        if(ready) {
+            processor.scheduleNow(task);
+        }
     }
 
 }
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java
new file mode 100644 (file)
index 0000000..c738135
--- /dev/null
@@ -0,0 +1,7 @@
+package org.simantics.db.impl.query;
+
+public interface IPending {
+    
+    boolean isPending();
+
+}
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java
new file mode 100644 (file)
index 0000000..f63bad5
--- /dev/null
@@ -0,0 +1,45 @@
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+
+/*
+ * Support class for queuing pending tasks to be executed when result gets ready
+ */
+public class PendingTaskSupport {
+
+    private ArrayList<SessionTask> pendingTasks;
+    private IPending pending;
+    
+    public PendingTaskSupport(IPending pending) {
+        this.pending = pending;
+    }
+    
+    /*
+     * We assume here that the associated IPending performs this atomically
+     * The caller is responsible for execution of the returned task after the critical section
+     */
+    public boolean executeWhenResultIsAvailable(SessionTask task) {
+        if(pending.isPending()) {
+            if(pendingTasks == null)
+                pendingTasks = new ArrayList<SessionTask>();
+            pendingTasks.add(task);
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    /*
+     * We assume here that the associated IPending performs this atomically after changing the pending result
+     * The caller is responsible for execution of the returned task after the critical section
+     */
+    public Collection<SessionTask> executePending() {
+        ArrayList<SessionTask> ret = pendingTasks;
+        pendingTasks = null;
+        return ret;
+    }
+
+}
index cc0ca919b584942a4d49a1a4c61b97b741e631f2..35ebdbc65ba4ce82e5e7e7dced301310c30f8b17 100644 (file)
@@ -625,7 +625,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
+    private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
         ReadEntry existing = null;
         synchronized(readEntryMap) {
             existing = (ReadEntry)readEntryMap.get(r);
@@ -670,11 +670,13 @@ public class QueryCache extends QueryCacheBase {
         }
         ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-          graph.processor.scheduleNow(new SessionTask(graph) {
+          graph.asyncBarrier.inc();  
+          graph.processor.scheduleNow(new SessionTask() {
             @Override
             public void run0(int thread) {
               try {
                 runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+                graph.asyncBarrier.dec();  
               } catch (DatabaseException e) {
                 Logger.defaultLogError(e);
               }
@@ -704,7 +706,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
         AsyncReadEntry existing = null;
         synchronized(asyncReadEntryMap) {
             existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
@@ -721,11 +723,21 @@ public class QueryCache extends QueryCacheBase {
             }
         }
         if(existing.isPending()) {
-          if(needsToBlock)
-            waitPending(graph, existing);
-          else {
-            return null;
-          }
+            if(needsToBlock)
+                waitPending(graph, existing);
+            else {
+                existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+                    @Override
+                    public void run0(int thread) {
+                        try {
+                            runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
+                    }
+                });
+                return null;
+            }
         }
         return existing;
     }
@@ -747,19 +759,10 @@ public class QueryCache extends QueryCacheBase {
             }
             return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
         }
-        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
         if(entry == null) {
-          graph.processor.scheduleNow(new SessionTask(graph) {
-            @Override
-            public void run0(int thread) {
-              try {
-                runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
-              } catch (DatabaseException e) {
-                Logger.defaultLogError(e);
-              }
-            }
-          });
-          return null;
+            // Entry was pending and this request has been queued  
+            return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
         if(entry.isReady()) {
index 54efda83976d8255f549ec0cd1cf41df8fc07e6c..deeb65fddb92a3c161e6315d6ce75ef1d00282aa 100644 (file)
@@ -421,7 +421,7 @@ public class QueryCacheBase {
                int counter = 0;
                while(entry.isPending()) {
                        try {
-                           boolean performed = graph.performPending();
+                           boolean performed = false;//graph.performPending();
                            if(!performed) {
                                        Thread.sleep(1);
                                        counter++;
@@ -905,10 +905,6 @@ public class QueryCacheBase {
                return wrap.get();
        }
 
-       public static <T> T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
-               return (T)QueryCache.runnerReadEntry(graph, r, parent, listener, procedure, true);
-       }
-
        public static <T> T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
                return (T)QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, procedure, true);
        }
index ee4b11175e4d2767cee0058d3938d8cd47125997..eaaa9b3730976265c0673dc4547bbe35b3d4904c 100644 (file)
@@ -143,6 +143,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        QueryThread[]                                   executors;
        
        public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
+    
+    public LinkedList<SessionTask>                           topLevelTasks = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -159,13 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-    public SessionTask getSubTask(ReadGraphImpl impl) {
+    public SessionTask getSubTask(ReadGraphImpl parent) {
         synchronized(querySupportLock) {
             int index = 0;
             while(index < freeScheduling.size()) {
                 SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
-                    queueLength.decrementAndGet();
+                if(task.isSubtask(parent) && task.maybeReady()) {
                     return freeScheduling.remove(index);
                 }
                 index++;
@@ -178,82 +179,46 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
      * We are running errands while waiting for requests to complete.
      * We can only run work that is part of the current root request to avoid any deadlocks
      */
-    public boolean performPending(ReadGraphImpl graph) {
-        SessionTask task = getSubTask(graph);
+    public boolean performPending(ReadGraphImpl under) {
+        SessionTask task = getSubTask(under);
                if(task != null) {
-                       task.run(QueryProcessor.thread.get());
+                       task.run(thread.get());
                        return true;
                }
                return false;
        }
-
+    
     final public void scheduleNow(SessionTask request) {
-        schedule(request, false);
+        SessionTask toExecute = scheduleOrReturnForExecution(request);
+        if(toExecute != null)
+            toExecute.run(thread.get());
     }
 
-    final public void scheduleLater(SessionTask request) {
-        schedule(request, true);
-    }
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
 
-    AtomicInteger queueLength = new AtomicInteger(0);
-    
-    final public void schedule(SessionTask request, boolean late) {
-        
-        int queueLengthEstimate = queueLength.get();
-        if(!late && queueLengthEstimate > 80) {
-            request.run(thread.get());
-            return;
-        }
-        
-               assert(request != null);
-               
-               synchronized(querySupportLock) {
+        assert(request != null);
 
-                   if(BarrierTracing.BOOKKEEPING) {
-                       Exception current = new Exception();
-                       Exception previous = BarrierTracing.tasks.put(request, current);
-                       if(previous != null) {
-                           previous.printStackTrace();
-                           current.printStackTrace();
-                       }
-                   }
+        synchronized(querySupportLock) {
 
-                   if(late) {
-                       int pos = request.position - 1;
-                       if(pos < freeScheduling.size()) { 
-                           freeScheduling.add(pos, request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                       } else {
-                           freeScheduling.addLast(request);
-                    queueLength.incrementAndGet();
-                    requests.release();
-                       }
-                   }
-                   else {
-                       if(request.getLevel() < 4) {
-                           if(freeScheduling.size() < 100) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       } else {
-                           if(freeScheduling.size() < 20) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       }
-                   }
+            LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
+            
+            if(BarrierTracing.BOOKKEEPING) {
+                Exception current = new Exception();
+                Exception previous = BarrierTracing.tasks.put(request, current);
+                if(previous != null) {
+                    previous.printStackTrace();
+                    current.printStackTrace();
+                }
+            }
 
+            queue.addFirst(request);
+            requests.release();
 
-               }
+        }
 
-       }
+        return null;
+
+    }
 
 
        final int THREADS;
@@ -263,20 +228,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        public static abstract class SessionTask {
 
-               public final ReadGraphImpl graph;
+           final protected ReadGraphImpl rootGraph;
                private int counter = 0;
                protected int position = 1;
                private Exception trace;
 
-               public SessionTask(ReadGraphImpl graph) {
-                       this.graph = graph;
-                       if(graph != null) graph.asyncBarrier.inc();
+               public SessionTask() {
+                   this(null);
                }
-
-        public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
-            if(r1 == null || r2 == null) return false;
-            return r1.getTopLevelGraph() == r2.getTopLevelGraph();
-        } 
+               
+        public SessionTask(ReadGraphImpl rootGraph) {
+            this.rootGraph = rootGraph;
+        }
+        
+        public boolean isSubtask(ReadGraphImpl graph) {
+            return graph.isParent(rootGraph);
+        }
 
         public abstract void run0(int thread);
 
@@ -292,7 +259,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        trace = new Exception();
                    }
                    run0(thread);
-                   if(graph != null) graph.asyncBarrier.dec();
                }
                
                public boolean maybeReady() {
@@ -301,15 +267,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                @Override
                public String toString() {
-                       if(graph == null)
+                       if(rootGraph == null)
                                return "SessionTask[no graph]";
                        else
-                               return "SessionTask[" + graph.parent + "]";
-               }
-               
-               public int getLevel() {
-                   if(graph == null) return 0;
-                   else return graph.getLevel();
+                               return "SessionTask[" + rootGraph.parent + "]";
                }
 
        }
@@ -327,37 +288,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       long waitingTime = 0;
-
-       static int koss = 0;
-       static int koss2 = 0;
-
        public boolean resume(ReadGraphImpl graph) {
                return executors[0].runSynchronized();
        }
-       
-       //private WeakReference<GarbageTracker> garbageTracker;
-       
-       private class GarbageTracker    {
-               
-               @Override
-               protected void finalize() throws Throwable {
-                       
-//                     System.err.println("GarbageTracker");
-//                     
-//                     garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-                       
-                       super.finalize();
-                       
-               }
-               
-       }
 
        public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
                        throws DatabaseException {
 
-               //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-               
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
@@ -368,31 +305,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-//             queues = new ArrayList[THREADS];
-//             threadLocks = new ReentrantLock[THREADS];
-//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-//             ownTasks = new ArrayList[THREADS];
-//             ownSyncTasks = new ArrayList[THREADS];
-//             delayQueues = new ArrayList[THREADS * THREADS];
-
-               //        freeSchedule = new AtomicInteger(0);
-
-//             for (int i = 0; i < THREADS * THREADS; i++) {
-//                     delayQueues[i] = new ArrayList<SessionTask>();
-//             }
 
                for (int i = 0; i < THREADS; i++) {
-
-                       //            tasks[i] = new ArrayList<Runnable>();
-//                     ownTasks[i] = new ArrayList<SessionTask>();
-//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
-//                     queues[i] = new ArrayList<SessionTask>();
-//                     threadLocks[i] = new ReentrantLock();
-//                     threadConditions[i] = threadLocks[i].newCondition();
-                       //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
-
                }
 
                for (int i = 0; i < THREADS; i++) {
index 5510944dc319265e7e033e38a5462e62b280ee16..cfa088a2dbfd7a966e0af3db7f4d9a5aa5d1ad4c 100644 (file)
@@ -98,11 +98,16 @@ class QueryThread extends Thread implements SessionThread {
        }
 
        private boolean pumpTask() {
-               if(!processor.freeScheduling.isEmpty()) {
-                       tasks.add(processor.freeScheduling.removeFirst());
-                       processor.queueLength.decrementAndGet();
-                       return true;
-               }
+           // First finish existing executions
+        if(!processor.freeScheduling.isEmpty()) {
+            tasks.add(processor.freeScheduling.removeFirst());
+            return true;
+        }
+        // Check for new tasks
+        if(!processor.topLevelTasks.isEmpty()) {
+            tasks.add(processor.topLevelTasks.removeFirst());
+            return true;
+        }
                return false;
        }
        
@@ -226,8 +231,6 @@ class QueryThread extends Thread implements SessionThread {
        @Override
        public void run() {
 
-               processor.thread.set(index);
-               
                QuerySupport support = this.querySupport;
 
                try {
@@ -242,23 +245,10 @@ class QueryThread extends Thread implements SessionThread {
                                while(!tasks.isEmpty()) {
 
                                        SessionTask task = tasks.remove(tasks.size()-1);
-//                                     System.err.println("QT " + index + " runs " + task);
-                                       task.run(index);
+                                       task.run(0);
 
                                }
 
-//                             for(int performer=0;performer<THREADS;performer++) {
-//                                     if(!delayQueues[index * THREADS + performer].isEmpty()) {
-//                                             synchronized(executors[performer]) {
-//                                                     threadLocks[performer].lock();
-//                                                     queues[performer].addAll(delayQueues[index * THREADS + performer]);
-//                                                     delayQueues[index * THREADS + performer].clear();
-//                                                     executors[performer].notify();
-//                                                     threadLocks[performer].unlock();
-//                                             }
-//                                     }
-//                             }
-
                        }
 
                } catch (Throwable t) {
index 72f132288bf79990c5bf56ce591463993ccd8acd..9edcbb997b97554946ea510728fe9395259d5741 100644 (file)
@@ -139,7 +139,8 @@ public final class ReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implem
             } finally {
  
                 executeGraph.asyncBarrier.dec();
-                executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
+                if(needsToBlock)
+                    executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
 
             }
                 
index 8220ba74385221cfc2064d6ff368c023c72f42b0..030d512eb736e2f6303f0d68c1d0b0e3d2e57b97 100644 (file)
@@ -1639,7 +1639,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     } else {
 
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request) {
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
                                
                                public void execute(AsyncReadGraph graph_, T result) {
                                        task.finish();
@@ -1656,6 +1656,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                        try {
 
                             request.perform(newGraph, wrap);
+                            wrap.dec();
                             wrap.get();
 
                         } catch (DatabaseException e) {
diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java
new file mode 100644 (file)
index 0000000..36a01b5
--- /dev/null
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.diagram.adapter;
+
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.common.primitiverequest.Adapter;
+import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.diagram.synchronization.ErrorHandler;
+import org.simantics.g2d.canvas.ICanvasContext;
+import org.simantics.g2d.diagram.IDiagram;
+import org.simantics.g2d.element.ElementClass;
+import org.simantics.g2d.element.IElement;
+import org.simantics.scl.runtime.tuple.Tuple3;
+
+/**
+ * @author Antti Villberg
+ */
+public class ConnectionRequest2 extends BaseRequest2<Resource, Tuple3> {
+
+    final IDiagram diagram;
+    final ErrorHandler errorHandler;
+
+    public ConnectionRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource, ErrorHandler errorHandler) {
+        super(canvas, resource);
+        this.diagram = diagram;
+        this.errorHandler = errorHandler;
+    }
+
+    @Override
+    public void perform(AsyncReadGraph graph, final AsyncProcedure<Tuple3> procedure) {
+
+        graph.forHasStatement(data, new AsyncProcedure<Boolean>() {
+
+            @Override
+            public void exception(AsyncReadGraph graph, Throwable throwable) {
+                procedure.exception(graph, throwable);
+            }
+
+            @Override
+            public void execute(AsyncReadGraph graph, Boolean result) {
+
+                if (!result) {
+                    procedure.execute(graph, null);
+                    return;
+                }
+
+                graph.asyncRequest(new Adapter<ElementFactory>(data, ElementFactory.class), new TransientCacheAsyncListener<ElementFactory>() {
+
+                    @Override
+                    public void exception(AsyncReadGraph graph, Throwable throwable) {
+                        errorHandler.error("Unexpected ElementFactory adaption failure", throwable);
+                        procedure.execute(graph, null);
+                    }
+
+                    @Override
+                    public void execute(AsyncReadGraph graph, final ElementFactory factory) {
+
+                        graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener<ElementClass>() {
+
+                            @Override
+                            public void exception(AsyncReadGraph graph, Throwable throwable) {
+                                errorHandler.error("Unexpected ElementClass creation failure", throwable);
+                                procedure.execute(graph, null);
+                            }
+
+                            @Override
+                            public void execute(AsyncReadGraph graph, final ElementClass ec) {
+
+                                graph.asyncRequest(new SpawnRequest(canvas, ec, data), new TransientCacheAsyncListener<IElement>() {
+
+                                    @Override
+                                    public void exception(AsyncReadGraph graph, Throwable throwable) {
+                                        errorHandler.error("Unexpected SpawnRequest failure", throwable);
+                                        procedure.execute(graph, null);
+                                    }
+
+                                    @Override
+                                    public void execute(AsyncReadGraph graph, IElement element) {
+                                        procedure.execute(graph, new Tuple3(element, ec, factory));
+                                    }
+                                });
+                            }
+                        });
+                    }
+                });
+            }
+        });
+    }
+
+}
index 226f47daa76b62ea90f572d561aa75e6bc830295..37471eb3a1186fa8dc1c993e7d0f267f178a5f91 100644 (file)
@@ -138,6 +138,7 @@ import org.simantics.scenegraph.INode;
 import org.simantics.scenegraph.profile.DataNodeConstants;
 import org.simantics.scenegraph.profile.DataNodeMap;
 import org.simantics.scenegraph.profile.common.ProfileObserver;
+import org.simantics.scl.runtime.tuple.Tuple3;
 import org.simantics.structural2.modelingRules.IModelingRules;
 import org.simantics.utils.datastructures.ArrayMap;
 import org.simantics.utils.datastructures.MapSet;
@@ -1649,6 +1650,90 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
             this.removedRouteGraphConnections.clear();
         }
 
+        class LoadNodeListener extends DisposableListener<IElement> {
+
+            final Resource element;
+            public IElement lastLoaded;
+
+            public LoadNodeListener(ListenerSupport support, Resource element) {
+                super(support);
+                this.element = element;
+            }
+
+            @Override
+            public String toString() {
+                return "Node load listener for " + element;
+            }
+
+            public void applyFirst(IElement loaded) {
+
+                Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+                if (addedElementMap.containsKey(data)) {
+                    // This element was just loaded, in
+                    // which case its hints need to
+                    // uploaded to the real mapped
+                    // element immediately.
+                    IElement mappedElement = getMappedElement(data);
+                    if (DebugPolicy.DEBUG_NODE_LISTENER)
+                        System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement);
+                    if (mappedElement != null && (mappedElement instanceof Element)) {
+                        if (DebugPolicy.DEBUG_NODE_LISTENER) {
+                            System.out.println("  mapped hints: " + mappedElement.getHints());
+                            System.out.println("  loaded hints: " + loaded.getHints());
+                        }
+                        updateMappedElement((Element) mappedElement, loaded);
+                    }
+                }
+
+            }
+
+            @Override
+            public void execute(IElement loaded) {
+
+                // Invoked when the element has been loaded.
+                if (DebugPolicy.DEBUG_NODE_LISTENER)
+                    System.out.println("NODE LoadListener for " + loaded);
+
+                if (loaded == null) {
+                    disposeListener();
+                    return;
+                }
+
+
+                boolean first = lastLoaded == null;
+
+                lastLoaded = loaded;
+
+                /*
+                 * The first invocation is postponed
+                 */
+                if(first) {
+                    applyFirst(loaded);
+                    return;
+                }
+
+                Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+                // Logic for disposing listener
+                if (!previousContent.nodeSet.contains(data)) {
+                    if (DebugPolicy.DEBUG_NODE_LISTENER)
+                        System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing.");
+                    disposeListener();
+                    return;
+                }
+
+                // This element was already loaded.
+                // Just schedule an update some time
+                // in the future.
+                if (DebugPolicy.DEBUG_NODE_LISTENER)
+                    System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future");
+                offerGraphUpdate( nodeUpdater(element, loaded) );
+
+            }
+
+        }
+
         void processNodes(AsyncReadGraph graph) throws DatabaseException {
 
             for (Map.Entry<Resource, Change> entry : changes.elements.entrySet()) {
@@ -1745,6 +1830,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                                     @Override
                                     public void execute(AsyncReadGraph graph, final IElement e) {
 
+                                        mapElement(element, e);
+                                        synchronized (GraphToDiagramUpdater.this) {
+                                            addedElements.add(e);
+                                            addedElementMap.put(element, e);
+                                            addedConnectionMap.put(element, e);
+                                        }
+
                                         // Read connection type
                                         graph.forSingleType(element, br.DIA.Connection, new Procedure<Resource>() {
                                             @Override
@@ -1778,87 +1870,33 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                                 });
                             } else if (content.nodeSet.contains(element)) {
 
-                                Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
-
-                                    boolean firstTime = true;
+                                graph.asyncRequest(new ReadRequest() {
 
                                     @Override
-                                    public String toString() {
-                                        return "Node load listener for " + element;
-                                    }
-                                    @Override
-                                    public void execute(IElement loaded) {
-                                        // Invoked when the element has been loaded.
-                                        if (DebugPolicy.DEBUG_NODE_LISTENER)
-                                            System.out.println("NODE LoadListener for " + loaded);
-
-                                        if (loaded == null) {
-                                            disposeListener();
-                                            return;
-                                        }
-
-                                        if (firstTime) {
-
-                                            // This is invoked before the element is actually loaded.
-                                            //System.out.println("NodeRequestProcedure " + e);
-                                            if (DebugPolicy.DEBUG_NODE_LOAD)
-                                                System.out.println("MAPPING ADDED NODE: " + element + " -> " + loaded);
-                                            mapElement(element, loaded);
-                                            synchronized (GraphToDiagramUpdater.this) {
-                                                addedElements.add(loaded);
-                                                addedElementMap.put(element, loaded);
-                                            }
-
-                                            firstTime = false;
-
-                                        }
-
-                                        Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+                                    public void run(ReadGraph graph) throws DatabaseException {
 
-                                        // Logic for disposing listener
-                                        if (!previousContent.nodeSet.contains(data)) {
-                                            if (DebugPolicy.DEBUG_NODE_LISTENER)
-                                                System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing.");
-                                            disposeListener();
+                                        LoadNodeListener loadListener = new LoadNodeListener(canvasListenerSupport, element);
+                                        Tuple3 t = graph.syncRequest(new NodeRequest2(canvas, diagram, element));
+                                        IElement e = (IElement)t.c0;
+                                        ElementClass ec = (ElementClass)t.c1;
+                                        org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2;
+                                        if (e == null)
                                             return;
+                                        
+                                        // This is invoked before the element is actually loaded.
+                                        //System.out.println("NodeRequestProcedure " + e);
+                                        if (DebugPolicy.DEBUG_NODE_LOAD)
+                                            System.out.println("MAPPING ADDED NODE: " + element + " -> " + e);
+                                        mapElement(element, e);
+                                        synchronized (GraphToDiagramUpdater.this) {
+                                            addedElements.add(e);
+                                            addedElementMap.put(element, e);
                                         }
 
-                                        if (addedElementMap.containsKey(data)) {
-                                            // This element was just loaded, in
-                                            // which case its hints need to
-                                            // uploaded to the real mapped
-                                            // element immediately.
-                                            IElement mappedElement = getMappedElement(data);
-                                            if (DebugPolicy.DEBUG_NODE_LISTENER)
-                                                System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement);
-                                            if (mappedElement != null && (mappedElement instanceof Element)) {
-                                                if (DebugPolicy.DEBUG_NODE_LISTENER) {
-                                                    System.out.println("  mapped hints: " + mappedElement.getHints());
-                                                    System.out.println("  loaded hints: " + loaded.getHints());
-                                                }
-                                                updateMappedElement((Element) mappedElement, loaded);
-                                            }
-                                        } else {
-                                            // This element was already loaded.
-                                            // Just schedule an update some time
-                                            // in the future.
-                                            if (DebugPolicy.DEBUG_NODE_LISTENER)
-                                                System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future");
-                                            offerGraphUpdate( nodeUpdater(element, loaded) );
-                                        }
-                                    }
-                                };
-
-                                //System.out.println("NODE REQUEST: " + element);
-                                graph.asyncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
-                                    @Override
-                                    public void execute(AsyncReadGraph graph, IElement e) {
-                                    }
-
-                                    @Override
-                                    public void exception(AsyncReadGraph graph, Throwable throwable) {
-                                        error(throwable);
+                                        graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, element), loadListener);
+                                        
                                     }
+                                    
                                 });
 
                             } else {
@@ -1980,8 +2018,121 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                 }
             }
         }
+        
+        class LoadRouteGraphConnectionListener extends DisposableListener<IElement> {
+            
+            final Resource connection;
+            public IElement lastLoaded;
+
+            public LoadRouteGraphConnectionListener(ListenerSupport support, Resource connection) {
+                super(support);
+                this.connection = connection;
+            }
+
+            @Override
+            public String toString() {
+                return "processRouteGraphConnections " + connection;
+            }
+            
+            public void applyFirst(IElement loaded) {
+                
+                Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+                if (addedElementMap.containsKey(data)) {
+                    // This element was just loaded, in
+                    // which case its hints need to
+                    // uploaded to the real mapped
+                    // element immediately.
+                    IElement mappedElement = getMappedElement(data);
+                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                        System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
+                    if (mappedElement instanceof Element) {
+                        if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
+                            System.out.println("  mapped hints: " + mappedElement.getHints());
+                            System.out.println("  loaded hints: " + loaded.getHints());
+                        }
+                        updateMappedElement((Element) mappedElement, loaded);
+                    }
+                }
+                
+            }
+            
+            @Override
+            public void execute(IElement loaded) {
+                
+                // Invoked when the element has been loaded.
+                if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                    System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded);
+
+                if (loaded == null) {
+                    disposeListener();
+                    return;
+                }
+
+                boolean first = lastLoaded == null;
+
+                lastLoaded = loaded;
+                
+                /*
+                 * The first invocation is postponed
+                 */
+                if(first) {
+                    applyFirst(loaded);
+                    return;
+                }
+
+                Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+                // Logic for disposing listener
+                if (!previousContent.routeGraphConnectionSet.contains(data)) {
+                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                        System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing.");
+                    disposeListener();
+                    return;
+                }
+
+                if (addedElementMap.containsKey(data)) {
+                    // This element was just loaded, in
+                    // which case its hints need to
+                    // uploaded to the real mapped
+                    // element immediately.
+                    IElement mappedElement = getMappedElement(data);
+                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                        System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
+                    if (mappedElement instanceof Element) {
+                        if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
+                            System.out.println("  mapped hints: " + mappedElement.getHints());
+                            System.out.println("  loaded hints: " + loaded.getHints());
+                        }
+                        updateMappedElement((Element) mappedElement, loaded);
+                    }
+                } else {
+                    // This element was already loaded.
+                    // Just schedule an update some time
+                    // in the future.
+                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                        System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection);
+
+                    Set<Object> dirtyNodes = new THashSet<Object>(4);
+                    IElement mappedElement = getMappedElement(connection);
+                    ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY);
+                    if (ce != null) {
+                        for (Connection conn : ce.getTerminalConnections(null)) {
+                            Object o = conn.node.getHint(ElementHints.KEY_OBJECT);
+                            if (o != null) {
+                                dirtyNodes.add(o);
+                                if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+                                    System.out.println("Marked connectivity dirty for node: " + conn.node);
+                            }
+                        }
+                    }
+
+                    offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) );
+                }
+            }
+        };
 
-        void processRouteGraphConnections(ReadGraph graph) throws DatabaseException {
+
+        void processRouteGraphConnections(AsyncReadGraph graph) throws DatabaseException {
             for (Map.Entry<Resource, Change> entry : changes.routeGraphConnections.entrySet()) {
                 final Resource connection = entry.getKey();
 
@@ -1992,97 +2143,39 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                         if (mappedElement != null)
                             continue;
 
-                        Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
-
-                            boolean firstTime = true;
+                        
+                        graph.asyncRequest(new ReadRequest() {
 
                             @Override
-                            public String toString() {
-                                return "processRouteGraphConnections " + connection;
-                            }
-                            @Override
-                            public void execute(IElement loaded) {
-                                // Invoked when the element has been loaded.
-                                if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
-                                    System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded);
+                            public void run(ReadGraph graph) throws DatabaseException {
 
-                                if (loaded == null) {
-                                    disposeListener();
-                                    return;
-                                }
-
-                                if(firstTime) {
-                                    if (DebugPolicy.DEBUG_NODE_LOAD)
-                                        System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + loaded);
-                                    mapElement(connection, loaded);
-                                    synchronized (GraphToDiagramUpdater.this) {
-                                        addedElements.add(loaded);
-                                        addedElementMap.put(connection, loaded);
-                                        addedRouteGraphConnectionMap.put(connection, loaded);
-                                    }
-                                    firstTime = false;
-                                }
+                                LoadRouteGraphConnectionListener loadListener = new LoadRouteGraphConnectionListener(canvasListenerSupport, connection);
 
-                                Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+                                Tuple3 t = graph.syncRequest(new ConnectionRequest2(canvas, diagram, connection, errorHandler));
+                                IElement e = (IElement)t.c0;
+                                ElementClass ec = (ElementClass)t.c1;
+                                org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2;
 
-                                // Logic for disposing listener
-                                if (!previousContent.routeGraphConnectionSet.contains(data)) {
-                                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
-                                        System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing.");
-                                    disposeListener();
+                                if (e == null)
                                     return;
+
+                                //System.out.println("ConnectionRequestProcedure " + e);
+                                if (DebugPolicy.DEBUG_NODE_LOAD)
+                                    System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + e);
+                                mapElement(connection, e);
+                                synchronized (GraphToDiagramUpdater.this) {
+                                    addedElements.add(e);
+                                    addedElementMap.put(connection, e);
+                                    addedRouteGraphConnectionMap.put(connection, e);
                                 }
 
-                                if (addedElementMap.containsKey(data)) {
-                                    // This element was just loaded, in
-                                    // which case its hints need to
-                                    // uploaded to the real mapped
-                                    // element immediately.
-                                    IElement mappedElement = getMappedElement(data);
-                                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
-                                        System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
-                                    if (mappedElement instanceof Element) {
-                                        if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
-                                            System.out.println("  mapped hints: " + mappedElement.getHints());
-                                            System.out.println("  loaded hints: " + loaded.getHints());
-                                        }
-                                        updateMappedElement((Element) mappedElement, loaded);
-                                    }
-                                } else {
-                                    // This element was already loaded.
-                                    // Just schedule an update some time
-                                    // in the future.
-                                    if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
-                                        System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection);
-
-                                    Set<Object> dirtyNodes = new THashSet<Object>(4);
-                                    IElement mappedElement = getMappedElement(connection);
-                                    ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY);
-                                    if (ce != null) {
-                                        for (Connection conn : ce.getTerminalConnections(null)) {
-                                            Object o = conn.node.getHint(ElementHints.KEY_OBJECT);
-                                            if (o != null) {
-                                                dirtyNodes.add(o);
-                                                if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
-                                                    System.out.println("Marked connectivity dirty for node: " + conn.node);
-                                            }
-                                        }
-                                    }
+                                graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, connection), loadListener);
 
-                                    offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) );
-                                }
                             }
-                        };
 
-                        graph.syncRequest(new ConnectionRequest(canvas, diagram, connection, errorHandler, loadListener), new Procedure<IElement>() {
-                            @Override
-                            public void execute(final IElement e) {
-                            }
-                            @Override
-                            public void exception(Throwable throwable) {
-                                error(throwable);
-                            }
                         });
+
+
                         break;
                     }
                     case REMOVED: {
@@ -2127,8 +2220,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
 
                             Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
 
-                                boolean firstTime = true;
-
                                 @Override
                                 public String toString() {
                                     return "processBranchPoints for " + element;
@@ -2144,21 +2235,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                                         return;
                                     }
 
-                                    if (firstTime) {
-
-                                        mapElement(element, loaded);
-                                        synchronized (GraphToDiagramUpdater.this) {
-                                            addedBranchPoints.add(loaded);
-                                            addedElementMap.put(element, loaded);
-                                            ConnectionEntityImpl ce = getConnectionEntity(element);
-                                            loaded.setHint(ElementHints.KEY_CONNECTION_ENTITY, ce);
-                                            loaded.setHint(ElementHints.KEY_PARENT_ELEMENT, ce.getConnectionElement());
-                                        }
-
-                                        firstTime = false;
-
-                                    }
-
                                     Object data = loaded.getHint(ElementHints.KEY_OBJECT);
                                     if (addedElementMap.containsKey(data)) {
                                         // This element was just loaded, in
@@ -2360,6 +2436,7 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
                         return "processBranchPoints";
                     }
                 });
+
             }
             //System.out.println("---- PROCESS BRANCH POINTS END");
 
@@ -2394,9 +2471,9 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
 
             task = Timing.BEGIN("processRouteGraphConnections");
             if (!changes.routeGraphConnections.isEmpty()) {
-                graph.syncRequest(new ReadRequest() {
+                graph.syncRequest(new AsyncReadRequest() {
                     @Override
-                    public void run(ReadGraph graph) throws DatabaseException {
+                    public void run(AsyncReadGraph graph) throws DatabaseException {
                         processRouteGraphConnections(graph);
                     }
                     @Override
@@ -2724,8 +2801,11 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID
 
                 diagramUpdateLock.lock();
                 try {
-                    if (DebugPolicy.DEBUG_DIAGRAM_UPDATE)
+                    if (DebugPolicy.DEBUG_DIAGRAM_UPDATE) {
                         System.out.println("In diagramGraphUpdater:");
+                        System.out.println("-content = " + content);
+                        System.out.println("-previousContent = " + previousContent);
+                    }
 
                     // Find out what has changed since the last query.
                     Object task = Timing.BEGIN("diagramContentDifference");
diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java
new file mode 100644 (file)
index 0000000..ddc2cc2
--- /dev/null
@@ -0,0 +1,128 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.diagram.adapter;
+
+import java.util.List;
+
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.common.primitiverequest.Adapter;
+import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.diagram.synchronization.ErrorHandler;
+import org.simantics.g2d.canvas.ICanvasContext;
+import org.simantics.g2d.diagram.IDiagram;
+import org.simantics.g2d.diagram.handler.SubstituteElementClass;
+import org.simantics.g2d.element.ElementClass;
+import org.simantics.g2d.element.IElement;
+import org.simantics.scl.runtime.tuple.Tuple3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Antti Villberg
+ */
+public class NodeRequest2 extends BaseRequest2<Resource, Tuple3> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NodeRequest2.class);
+
+    final IDiagram diagram;
+
+    public NodeRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource) {
+        super(canvas, resource);
+        this.diagram = diagram;
+    }
+
+    @Override
+    public void perform(AsyncReadGraph graph, final AsyncProcedure<Tuple3> procedure) {
+        // Keep this code from crashing the whole DB client by unexpected
+        // throwing of NPE's somewhere in the following code that leads to
+        // procedure not getting called properly.
+        if (diagram == null) {
+            procedure.exception(graph, new NullPointerException("null diagram specified for resource " + data));
+            return;
+        }
+
+        final ErrorHandler eh = ElementFactoryUtil.getErrorHandler(diagram);
+
+        graph.forHasStatement(data, new AsyncProcedure<Boolean>() {
+
+            @Override
+            public void exception(AsyncReadGraph graph, Throwable throwable) {
+                eh.error("NodeRequest.forHasStatement failed", throwable);
+                procedure.execute(graph, null);
+            }
+
+            @Override
+            public void execute(AsyncReadGraph graph, Boolean result) {
+
+                if(!result) {
+                    procedure.execute(graph, null);
+                    return;
+                }
+
+                graph.asyncRequest(new Adapter<ElementFactory>(data, ElementFactory.class), new TransientCacheAsyncListener<ElementFactory>() {
+
+                    @Override
+                    public void exception(AsyncReadGraph graph, Throwable throwable) {
+                        eh.error("NodeRequest.asyncRequest(Adapter<ElementFactory>) failed", throwable);
+                        procedure.execute(graph, null);
+                    }
+
+                    @Override
+                    public void execute(AsyncReadGraph graph, final ElementFactory factory) {
+
+                        graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener<ElementClass>() {
+
+                            @Override
+                            public void exception(AsyncReadGraph graph, Throwable throwable) {
+                                LOGGER.error("Unexpected error in GetElementClassRequest", throwable);
+                                procedure.execute(graph, null);
+                            }
+
+                            @Override
+                            public void execute(AsyncReadGraph graph, ElementClass mutableClazz) {
+                                List<SubstituteElementClass> substitutes = diagram.getDiagramClass().getItemsByClass(SubstituteElementClass.class);
+                                for (SubstituteElementClass subs : substitutes) {
+                                    mutableClazz = subs.substitute(diagram, mutableClazz);
+                                }
+                                final ElementClass clazz = mutableClazz;
+                                graph.asyncRequest(new SpawnRequest(canvas, clazz, data), new TransientCacheAsyncListener<IElement>() {
+
+                                    @Override
+                                    public void exception(AsyncReadGraph graph, Throwable throwable) {
+                                        LOGGER.error("Unexpected error in SpawnRequest", throwable);
+                                        procedure.execute(graph, null);
+                                    }
+
+                                    @Override
+                                    public void execute(AsyncReadGraph graph, IElement element) {
+                                        procedure.execute(graph, new Tuple3(element, clazz, factory));
+                                    }
+
+                                });
+
+                            }
+
+                        });
+
+                    }
+
+                });
+
+            }
+
+        });
+
+    }
+
+}
index 9a53060bef2c0b246a21e465da1f0e53756d2f17..8acc2f0d192899316c870cfba8ce27366b93d699 100644 (file)
@@ -7,11 +7,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.ReadGraph;
 import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
+import org.simantics.db.common.request.AsyncReadRequest;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.layer0.request.VariableRead;
 import org.simantics.db.layer0.variable.Variable;
+import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.document.server.JSONObject;
 
 public class DocumentRequest extends VariableRead<List<JSONObject>> {
@@ -35,20 +38,43 @@ public class DocumentRequest extends VariableRead<List<JSONObject>> {
         if(nodes.isEmpty()) {
             return Collections.emptyList();
         }
-        
-        
-        /*TreeMap<String, Variable> nodeMap = new TreeMap<String, Variable>();
-        
-        for (Variable node : nodes) {
-               nodeMap.put(node.getURI(graph), node);
+
+        if(PROFILE) {
+            long dura = System.nanoTime()-s;
+            System.err.println("DocumentRequest1 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph));
         }
-        System.out.println("*************************************************************************");
-        for (Variable node : nodeMap.values()) {
-               System.out.println("               " + node.getURI(graph));
-        }*/
-        
-        for(Variable node : nodes) {
-            rs.add(graph.syncRequest(new NodeRequest(node), TransientCacheAsyncListener.<JSONObject>instance()));
+
+        graph.syncRequest(new AsyncReadRequest() {
+
+            @Override
+            public void run(AsyncReadGraph graph) throws DatabaseException {
+
+                for(Variable node : nodes) {
+                    graph.asyncRequest(new NodeRequest(node), new AsyncProcedure<JSONObject> () {
+
+                        @Override
+                        public void execute(AsyncReadGraph graph, JSONObject result) {
+                            synchronized (rs) {
+                                rs.add(result);
+                            }
+                        }
+
+                        @Override
+                        public void exception(AsyncReadGraph graph, Throwable throwable) {
+                        }
+
+                    });
+                    
+                }
+                
+            }
+            
+        });
+
+
+        if(PROFILE) {
+            long dura = System.nanoTime()-s;
+            System.err.println("DocumentRequest2 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph));
         }
 
                ArrayList<JSONObject> result = new ArrayList<JSONObject>(rs);
@@ -63,7 +89,7 @@ public class DocumentRequest extends VariableRead<List<JSONObject>> {
         
         if(PROFILE) {
                long dura = System.nanoTime()-s;
-               System.err.println("DocumentRequest " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph));
+               System.err.println("DocumentRequest3 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph));
         }
 
                return result;
index f3f1177fc4d1f613950bc7b4f7d56e2ec010105f..99526368e8cc3e21e1b92451f4dd915db45d40ba 100644 (file)
@@ -1,47 +1,72 @@
 package org.simantics.document.server.request;
 
-import gnu.trove.set.hash.THashSet;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 
+import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.ReadGraph;
+import org.simantics.db.common.request.AsyncReadRequest;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.layer0.request.VariableChildren;
 import org.simantics.db.layer0.request.VariableRead;
 import org.simantics.db.layer0.variable.Variable;
+import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.structural.stubs.StructuralResource2;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+
+import gnu.trove.set.hash.THashSet;
 
 public class NodesRequest extends VariableRead<Set<Variable>> {
 
     public NodesRequest(Variable var) {
         super(var);
-       }
-
-       @Override
-       public Set<Variable> perform(ReadGraph graph) throws DatabaseException {
-
-               long s = System.nanoTime();
-               
-               StructuralResource2.getInstance(graph);
-               if(variable == null)
-                       return Collections.emptySet();
-               
-               Set<Variable> nodes = new THashSet<Variable>();
-               Collection<Variable> children = graph.syncRequest(new VariableChildren(variable));
-               for(Variable child : children) {
-                       Set<Variable> childNodes = graph.syncRequest(new NodesRequest2(child));
-                       nodes.addAll(childNodes);
-               }
-               
-        if(DocumentRequest.PROFILE) {
-               long dura = System.nanoTime()-s;
-               System.err.println("NodesRequest " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph));
-        }
-               
-               return nodes;
-
-       }
+    }
+
+    @Override
+    public Set<Variable> perform(ReadGraph graph) throws DatabaseException {
+
+        ITask task = DocumentRequest.PROFILE ? ThreadLogger.task(this) : null;
+
+        StructuralResource2.getInstance(graph);
+        if(variable == null)
+            return Collections.emptySet();
+
+        Set<Variable> nodes = new THashSet<Variable>();
+
+        Collection<Variable> children = graph.syncRequest(new VariableChildren(variable));
+
+        graph.syncRequest(new AsyncReadRequest() {
+
+            @Override
+            public void run(AsyncReadGraph graph) throws DatabaseException {
+
+                for(Variable child : children) {
+                    graph.asyncRequest(new NodesRequest2(child), new AsyncProcedure<Set<Variable>>() {
+
+                        @Override
+                        public void execute(AsyncReadGraph graph, Set<Variable> result) {
+                            synchronized(nodes) {
+                                nodes.addAll(result);
+                            }
+                        }
+
+                        @Override
+                        public void exception(AsyncReadGraph graph, Throwable throwable) {
+                        }
+                        
+                    });
+                }
+
+            }
+
+        });
+
+        if(DocumentRequest.PROFILE) task.finish();
+
+        return nodes;
+
+    }
 
 }
\ No newline at end of file
index 1103b5f262af788774072ef3d400ada1800fea51..80f22b7852e85f1ad968f10aec0ebc1666b51cc6 100644 (file)
  *******************************************************************************/
 package org.simantics.issues.common;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.common.request.TernaryAsyncRead;
-import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.issues.Severity;
 
 /**
  * @author Tuukka Lehtonen
  */
-public class ChildMaxIssueSeverity extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity>{
+public class ChildMaxIssueSeverity extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity> {
 
+    static class AsyncReadResult<T> {
+        private AtomicReference<T> resultRef;
+        private Throwable throwable;
+        private AtomicInteger counter = new AtomicInteger(1);
+        private AsyncProcedure<T> procedure;
+        AsyncReadResult(AsyncProcedure<T> procedure, AtomicReference<T> resultRef) {
+            this.procedure = procedure;
+            this.resultRef = resultRef;
+        }
+        void except(AsyncReadGraph graph, Throwable throwable) {
+            this.throwable = throwable;
+            dec(graph);
+        }
+        void set(AsyncReadGraph graph, T result) {
+            resultRef.set(result);
+            dec(graph);
+        }
+        void inc() {
+            counter.incrementAndGet();
+        }
+        void dec(AsyncReadGraph graph) {
+            if(counter.decrementAndGet() == 0) {
+                if(throwable != null)
+                    procedure.exception(graph, throwable);
+                else
+                    procedure.execute(graph, resultRef.get());
+            }
+        }
+        
+    }
+    
     public ChildMaxIssueSeverity(Resource resource, Resource childRelation, Set<Resource> typesToRecurse) {
         super(resource, childRelation, typesToRecurse);
     }
 
-//    @Override
-//    public Severity perform(ReadGraph graph) throws DatabaseException {
-//        Severity maxSeverity = null;
-//        //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource));
-//        for (Resource child : graph.getObjects(resource, resource2)) {
-//            Severity s = graph.syncRequest(new MaxIssueSeverityRecursive(child));
-//            maxSeverity = Severity.moreSevere(maxSeverity, s);
-//        }
-//        //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource) + " : " + maxSeverity);
-//        return maxSeverity;
-//    }
-
     @Override
     public void perform(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
-        //System.out.println(getClass().getSimpleName() + ": " + parameter);
         
-        graph.forTypes(parameter, new AsyncProcedure<Set<Resource>>() {
-            @Override
-            public void execute(AsyncReadGraph graph, Set<Resource> result) {
-                if (!Collections.disjoint(parameter3, result)) {
-                    checkChildren(graph, procedure);
-                } else {
-                    procedure.execute(graph, null);
-                }
-            }
-            @Override
-            public void exception(AsyncReadGraph graph, Throwable throwable) {
-                procedure.exception(graph, throwable);
+        try {
+            Set<Resource> types = graph.getTypes(parameter);
+            if (!Collections.disjoint(parameter3, types)) {
+                checkChildren(graph, procedure);
+            } else {
+                procedure.execute(graph, null);
             }
-        });
+        } catch (DatabaseException e) {
+            procedure.exception(graph, e);
+        }
+        
     }
 
     protected void checkChildren(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
-        graph.forEachObject(parameter, parameter2, new AsyncMultiProcedure<Resource>() {
-            AtomicReference<Severity> maxSeverity = new AtomicReference<Severity>();
-            @Override
-            public void execute(AsyncReadGraph graph, Resource child) {
+        
+        AsyncReadResult<Severity> maxSeverity = new AsyncReadResult<Severity>(procedure, new AtomicReference<Severity>());
+        
+        try {
+            Collection<Resource> children = graph.getObjects(parameter, parameter2);
+            for(Resource child : children) {
+                maxSeverity.inc();
                 graph.asyncRequest(new MaxIssueSeverityRecursive(child, parameter2, parameter3), new AsyncProcedure<Severity>() {
                     @Override
                     public void execute(AsyncReadGraph graph, Severity severity) {
                         if (severity != null) {
                             synchronized (maxSeverity) {
-                                maxSeverity.set(Severity.moreSevere(maxSeverity.get(), severity));
+                                maxSeverity.set(graph, Severity.moreSevere(maxSeverity.resultRef.get(), severity));
                             }
+                        } else {
+                            maxSeverity.dec(graph);
                         }
                     }
                     @Override
                     public void exception(AsyncReadGraph graph, Throwable throwable) {
-                        procedure.exception(graph, throwable);
+                        maxSeverity.except(graph, throwable);
                     }
                 });
             }
-            @Override
-            public void finished(AsyncReadGraph graph) {
-                procedure.execute(graph, maxSeverity.get());
-            }
-            @Override
-            public void exception(AsyncReadGraph graph, Throwable throwable) {
-                procedure.exception(graph, throwable);
-            }
-        });
+            maxSeverity.dec(graph);
+        } catch (DatabaseException e) {
+            maxSeverity.except(graph, e);
+            return;
+        }
+        
     }
 
 }
index 7646271e3d91443564d9d51fd47af401d2b9ca68..ed3d6c2b97f66c97d7e40a050c0699e9152b2f84 100644 (file)
  *******************************************************************************/
 package org.simantics.issues.common;
 
+import java.util.Collection;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
-import org.simantics.db.common.request.TernaryAsyncRead;
-import org.simantics.db.procedure.AsyncMultiProcedure;
-import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.common.request.TernaryRead;
+import org.simantics.db.exception.DatabaseException;
 import org.simantics.issues.Severity;
 import org.simantics.issues.ontology.IssueResource;
 
 /**
  * @author Tuukka Lehtonen
  */
-public class MaxIssueSeverityRecursive extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity> {
+public class MaxIssueSeverityRecursive extends TernaryRead<Resource, Resource, Set<Resource>, Severity> {
 
     public MaxIssueSeverityRecursive(Resource resource, Resource childRelation, Set<Resource> typesToRecurse) {
         super(resource, childRelation, typesToRecurse);
     }
     
     @Override
-    public void perform(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
+    public Severity perform(ReadGraph graph) throws DatabaseException {
 
         IssueResource ISSUE = graph.getService(IssueResource.class);
 
-        AtomicInteger issues = new AtomicInteger();
-        AtomicBoolean excepted = new AtomicBoolean(false);
-
-        graph.forEachObject(parameter, ISSUE.Issue_HasContext_Inverse, new AsyncMultiProcedure<Resource>() {
-            @Override
-            public void execute(AsyncReadGraph graph, Resource result) {
-                issues.incrementAndGet();
-            }
-            @Override
-            public void finished(AsyncReadGraph graph) {
-            
-            }
-            @Override
-            public void exception(AsyncReadGraph graph, Throwable throwable) {
-                if(excepted.compareAndSet(false, true))
-                    procedure.exception(graph, throwable);
-            }
-        });
-
-        graph.forEachObject(parameter, ISSUE.Issue_ContextList_Element_Inverse, new AsyncMultiProcedure<Resource>() {
-            @Override
-            public void execute(AsyncReadGraph graph, Resource result) {
-                issues.incrementAndGet();
-            }
-            @Override
-            public void finished(AsyncReadGraph graph) {
-            
-            }
-            @Override
-            public void exception(AsyncReadGraph graph, Throwable throwable) {
-                if(excepted.compareAndSet(false, true))
-                  procedure.exception(graph, throwable);
-            }
-        });
-        
-        if(excepted.get()) return;
-
-        if (issues.get() == 0) {
-            // This content does not have directly attached issues, try to look
-            // for some in the child components.
-            graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure);
-        } else {
-            // Try local issues first
-            graph.asyncRequest(new MaxIssueSeveritySingle(parameter), new AsyncProcedure<Severity>() {
-                @Override
-                public void execute(AsyncReadGraph graph, Severity maxSeverity) {
-                    if (maxSeverity == null)
-                        // No severity for local issues, try children next.
-                        graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure);
-                    else
-                        procedure.execute(graph, maxSeverity);
-                }
-                @Override
-                public void exception(AsyncReadGraph graph, Throwable throwable) {
-                    if(excepted.compareAndSet(false, true))
-                      procedure.exception(graph, throwable);
-                }
-            });
+        Collection<Resource> inv = graph.getObjects(parameter, ISSUE.Issue_HasContext_Inverse);
+        Collection<Resource> inv2 = graph.getObjects(parameter, ISSUE.Issue_ContextList_Element_Inverse);
+        if(inv.isEmpty() && inv2.isEmpty()) {
+            return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3));
         }
+        
+        Severity max = graph.syncRequest(new MaxIssueSeveritySingle(parameter));
+        if(max != null)
+            return max;
+        else
+            return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3));
+            
     }
 
 }
index c7a2b6385a276e20e1aecd90ea876e5af243ee41..75d3050b9cd91689c5d136ed82868bd20808fd5f 100644 (file)
@@ -209,7 +209,7 @@ public class MonitorClassFactory2 extends SyncElementFactory {
                 });
             }
 
-            IElement mappedElement = diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element);
+            IElement mappedElement = e;//diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element);
             MonitorListener monitorListener = new MonitorListener(element, canvas, diagram, substitutions);
             if (mappedElement != null) {
                 MonitorListener oldListener = mappedElement.getHint(KEY_VARIABLE_LISTENER);