]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Diagram loading concurrency problem with Sysdyn
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index 9c60691fad7a0a850228997e85ffb47168ca1aae..5a37257e2531c051c39de0bea04af271e1adccce 100644 (file)
@@ -51,6 +51,7 @@ import org.simantics.db.exception.NoInverseException;
 import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.graph.ReadGraphSupport;
 import org.simantics.db.impl.graph.WriteGraphImpl;
@@ -171,23 +172,40 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       SessionTask getOwnTask(int thread) {
+       public SessionTask getOwnTask(ReadGraphImpl impl) {
+               Set<ReadGraphImpl> ancestors = impl.ancestorSet();
                synchronized(querySupportLock) {
                        int index = 0;
                        while(index < freeScheduling.size()) {
                                SessionTask task = freeScheduling.get(index);
-                               if(task.thread == thread && !task.systemCall)
+                               if(task.hasCommonParent(ancestors)) {
                                        return freeScheduling.remove(index);
+                               }
                                index++;
                        }
                }
                return null;
        }
-       
-       public boolean performPending(int thread) {
-               SessionTask task = getOwnTask(thread);
+
+    public SessionTask getSubTask(ReadGraphImpl impl) {
+        Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
+        synchronized(querySupportLock) {
+            int index = 0;
+            while(index < freeScheduling.size()) {
+                SessionTask task = freeScheduling.get(index);
+                if(task.hasCommonParent(onlyThis)) {
+                    return freeScheduling.remove(index);
+                }
+                index++;
+            }
+        }
+        return null;
+    }
+
+       public boolean performPending(ReadGraphImpl graph) {
+               SessionTask task = getOwnTask(graph);
                if(task != null) {
-                       task.run(thread);
+                       task.run(QueryProcessor.thread.get());
                        return true;
                } else {
                        return false;
@@ -199,11 +217,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //     }
 
        final public void schedule(SessionTask request) {
+           
+               //int performer = request.thread;
 
-               int performer = request.thread;
-
-               if(DebugPolicy.SCHEDULE)
-                       System.out.println("schedule " + request + " " + " -> " + performer);
+//             if(DebugPolicy.SCHEDULE)
+//                     System.out.println("schedule " + request + " " + " -> " + performer);
 
                //assert(performer >= 0);
 
@@ -216,24 +234,20 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //                     if(performer == THREADS) {
                                
                                synchronized(querySupportLock) {
-                                       
-                                       //new Exception().printStackTrace();
-                                       
+
+                                       if(BarrierTracing.BOOKKEEPING) {
+                                               Exception current = new Exception();
+                                               Exception previous = BarrierTracing.tasks.put(request, current);
+                                               if(previous != null) {
+                                                       previous.printStackTrace();
+                                                       current.printStackTrace();
+                                               }
+                                       }
+                                   
                                        freeScheduling.add(request);
                                        
                                        querySupportLock.notifyAll();
 
-                                       //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
-
-//                                     for(int i=0;i<THREADS;i++) {
-//                                             ReentrantLock queueLock = threadLocks[i];
-//                                             queueLock.lock();
-//                                             //queues[performer].add(request);
-//                                             //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
-//                                             threadConditions[i].signalAll();
-//                                             queueLock.unlock();
-//                                     }
-
                                }
 
                                return;
@@ -256,34 +270,47 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        final int THREADS;
        final public int  THREAD_MASK;
-       
-       final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); 
+
+       final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
 
        public static abstract class SessionTask {
 
-               final public int thread;
-               final public boolean systemCall;
-//             final public int syncCaller;
-               //final public Object object;
+               public final ReadGraphImpl graph;
+               private Set<ReadGraphImpl> ancestors;
+               private int counter = 0;
+               private Exception trace;
 
-               public SessionTask(boolean systemCall) {
-                       this.thread = QueryProcessor.thread.get();
-                       this.systemCall = systemCall;
-//                     this.syncCaller = -1;
-                       //this.object = object;
+               public SessionTask(ReadGraphImpl graph) {
+                       this.graph = graph;
+                       if(graph != null) graph.asyncBarrier.inc();
                }
 
-//             public SessionTask(Object object, int syncCaller) {
-//                     this.thread = QueryProcessor.thread.get();
-//                     this.syncCaller = syncCaller;
-//                     this.object = object;
-//             }
+               public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
+                       if(graph == null) return false;
+                       if(ancestors == null) ancestors = graph.ancestorSet();
+                       return !Collections.disjoint(ancestors, otherAncestors);
+               }
 
-               public abstract void run(int thread);
+               public abstract void run0(int thread);
+
+               public final void run(int thread) {
+                   if(counter++ > 0) {
+                       if(BarrierTracing.BOOKKEEPING) {
+                           trace.printStackTrace();
+                           new Exception().printStackTrace();
+                       }
+                       throw new IllegalStateException("Multiple invocations of SessionTask!");
+                   }
+                   if(BarrierTracing.BOOKKEEPING) {
+                       trace = new Exception();
+                   }
+                   run0(thread);
+                   if(graph != null) graph.asyncBarrier.dec();
+               }
 
                @Override
                public String toString() {
-                       return "SessionTask[" + super.toString() + "]";
+                       return "SessionTask[" + graph.parent + "]";
                }
 
        }
@@ -294,7 +321,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final public DataContainer<Throwable> throwable; 
 
                public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
-                       super(true);
+                       super(null);
                        this.throwable = throwable;
                        this.notify = notify;
                }
@@ -1593,7 +1620,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                        entry.prepareRecompute(querySupport);
                        
-                       ReadGraphImpl parentGraph = graph.withParent(entry);
+                       ReadGraphImpl parentGraph = graph.forRecompute(entry);
 
                        query.recompute(parentGraph);
 
@@ -1765,6 +1792,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                                                } else {
                                                        // If not changed, keep the old value
                                                        immediate.setResult(oldValue);
+                                                       immediate.setReady();
                                                        listenersUnknown = true;
                                                }
 
@@ -2268,67 +2296,16 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        @Override
        final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
 
-               throw new UnsupportedOperationException();
+        try {
 
-//             assert(subject != null);
-//             assert(procedure != null);
-//
-//             final ListenerBase listener = getListenerBase(procedure);
-//
-//             IntProcedure ip = new IntProcedure() {
-//
-//                     AtomicBoolean first = new AtomicBoolean(true);
-//
-//                     @Override
-//                     public void execute(ReadGraphImpl graph, int i) {
-//                             try {
-//                                     if(first.get()) {
-//                                             procedure.execute(graph, querySupport.getResource(i));
-//                                     } else {
-//                                             procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
-//                                     }
-//                             } catch (Throwable t2) {
-//                                     Logger.defaultLogError(t2);
-//                             }
-//                     }
-//
-//                     @Override
-//                     public void finished(ReadGraphImpl graph) {
-//                             try {
-//                                     if(first.compareAndSet(true, false)) {
-//                                             procedure.finished(graph);
-////                                           impl.state.barrier.dec(this);
-//                                     } else {
-//                                             procedure.finished(impl.newRestart(graph));
-//                                     }
-//
-//                             } catch (Throwable t2) {
-//                                     Logger.defaultLogError(t2);
-//                             }
-//                     }
-//
-//                     @Override
-//                     public void exception(ReadGraphImpl graph, Throwable t) {
-//                             try {
-//                                     if(first.compareAndSet(true, false)) {
-//                                             procedure.exception(graph, t);
-//                                     } else {
-//                                             procedure.exception(impl.newRestart(graph), t);
-//                                     }
-//                             } catch (Throwable t2) {
-//                                     Logger.defaultLogError(t2);
-//                             }
-//                     }
-//
-//             };
-//
-//             int sId = querySupport.getId(subject);
-//
-//             try {
-//                     QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
-//             } catch (DatabaseException e) {
-//                     Logger.defaultLogError(e);
-//             }
+               for(Resource predicate : getPredicates(impl, subject))
+                   procedure.execute(impl, predicate);
+
+               procedure.finished(impl);
+
+           } catch (Throwable e) {
+               procedure.exception(impl, e);
+           }
 
        }