]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Yet another fixing commit
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index 2908bd43a83680fc7f24e3fe2faafb1b25bc1b53..9c60691fad7a0a850228997e85ffb47168ca1aae 100644 (file)
@@ -16,7 +16,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -32,8 +31,6 @@ import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
@@ -54,7 +51,6 @@ import org.simantics.db.exception.NoInverseException;
 import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.ResourceImpl;
-import org.simantics.db.impl.graph.MultiIntProcedure;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.graph.ReadGraphSupport;
 import org.simantics.db.impl.graph.WriteGraphImpl;
@@ -68,15 +64,12 @@ import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.db.procedure.AsyncSetListener;
 import org.simantics.db.procedure.ListenerBase;
 import org.simantics.db.procedure.MultiProcedure;
-import org.simantics.db.procedure.Procedure;
 import org.simantics.db.procedure.StatementProcedure;
+import org.simantics.db.procedure.SyncMultiProcedure;
 import org.simantics.db.request.AsyncMultiRead;
-import org.simantics.db.request.AsyncRead;
 import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.MultiRead;
-import org.simantics.db.request.Read;
 import org.simantics.db.request.RequestFlags;
-import org.simantics.db.request.WriteTraits;
 import org.simantics.layer0.Layer0;
 import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
@@ -162,8 +155,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        public ThreadState[]                                                                    threadStates;
-       public ReentrantLock[]                                                                  threadLocks;
-       public Condition[]                                                                          threadConditions;
+//     public ReentrantLock[]                                                                  threadLocks;
+//     public Condition[]                                                                          threadConditions;
 
        //public ArrayList<SessionTask>[]                           ownTasks;
 
@@ -178,31 +171,41 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
+       SessionTask getOwnTask(int thread) {
+               synchronized(querySupportLock) {
+                       int index = 0;
+                       while(index < freeScheduling.size()) {
+                               SessionTask task = freeScheduling.get(index);
+                               if(task.thread == thread && !task.systemCall)
+                                       return freeScheduling.remove(index);
+                               index++;
+                       }
+               }
+               return null;
+       }
+       
+       public boolean performPending(int thread) {
+               SessionTask task = getOwnTask(thread);
+               if(task != null) {
+                       task.run(thread);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
 //     final public void scheduleOwn(int caller, SessionTask request) {
 //             ownTasks[caller].add(request);
 //     }
 
-       final public void scheduleAlways(int caller, SessionTask request) {
-
-//             int performer = request.thread;
-//             if(caller == performer) {
-//                     ownTasks[caller].add(request);
-//             } else {
-//                     schedule(caller, request);
-//             }
-
-               schedule(caller, request);
-               
-       }
-
-       final public void schedule(int caller, SessionTask request) {
+       final public void schedule(SessionTask request) {
 
                int performer = request.thread;
 
                if(DebugPolicy.SCHEDULE)
-                       System.out.println("schedule " + request + " " + caller + " -> " + performer);
+                       System.out.println("schedule " + request + " " + " -> " + performer);
 
-               assert(performer >= 0);
+               //assert(performer >= 0);
 
                assert(request != null);
 
@@ -214,18 +217,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                                
                                synchronized(querySupportLock) {
                                        
+                                       //new Exception().printStackTrace();
+                                       
                                        freeScheduling.add(request);
+                                       
+                                       querySupportLock.notifyAll();
 
                                        //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
 
-                                       for(int i=0;i<THREADS;i++) {
-                                               ReentrantLock queueLock = threadLocks[i];
-                                               queueLock.lock();
-                                               //queues[performer].add(request);
-                                               //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
-                                               threadConditions[i].signalAll();
-                                               queueLock.unlock();
-                                       }
+//                                     for(int i=0;i<THREADS;i++) {
+//                                             ReentrantLock queueLock = threadLocks[i];
+//                                             queueLock.lock();
+//                                             //queues[performer].add(request);
+//                                             //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+//                                             threadConditions[i].signalAll();
+//                                             queueLock.unlock();
+//                                     }
 
                                }
 
@@ -255,26 +262,28 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public static abstract class SessionTask {
 
                final public int thread;
-               final public int syncCaller;
-               final public Object object;
-
-               public SessionTask(WriteTraits object, int thread) {
-                       this.thread = thread;
-                       this.syncCaller = -1;
-                       this.object = object;
+               final public boolean systemCall;
+//             final public int syncCaller;
+               //final public Object object;
+
+               public SessionTask(boolean systemCall) {
+                       this.thread = QueryProcessor.thread.get();
+                       this.systemCall = systemCall;
+//                     this.syncCaller = -1;
+                       //this.object = object;
                }
 
-               public SessionTask(Object object, int thread, int syncCaller) {
-                       this.thread = thread;
-                       this.syncCaller = syncCaller;
-                       this.object = object;
-               }
+//             public SessionTask(Object object, int syncCaller) {
+//                     this.thread = QueryProcessor.thread.get();
+//                     this.syncCaller = syncCaller;
+//                     this.object = object;
+//             }
 
                public abstract void run(int thread);
 
                @Override
                public String toString() {
-                       return "SessionTask[" + object + "]";
+                       return "SessionTask[" + super.toString() + "]";
                }
 
        }
@@ -284,14 +293,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final public Semaphore notify;
                final public DataContainer<Throwable> throwable; 
 
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
-                       super(object, thread, thread);
-                       this.throwable = throwable;
-                       this.notify = notify;
-               }
-
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
-                       super(object, thread, syncThread);
+               public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+                       super(true);
                        this.throwable = throwable;
                        this.notify = notify;
                }
@@ -340,8 +343,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                executors = new QueryThread[THREADS];
 //             queues = new ArrayList[THREADS];
-               threadLocks = new ReentrantLock[THREADS];
-               threadConditions = new Condition[THREADS];
+//             threadLocks = new ReentrantLock[THREADS];
+//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
 //             ownTasks = new ArrayList[THREADS];
 //             ownSyncTasks = new ArrayList[THREADS];
@@ -359,8 +362,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //                     ownTasks[i] = new ArrayList<SessionTask>();
 //                     ownSyncTasks[i] = new ArrayList<SessionTask>();
 //                     queues[i] = new ArrayList<SessionTask>();
-                       threadLocks[i] = new ReentrantLock();
-                       threadConditions[i] = threadLocks[i].newCondition();
+//                     threadLocks[i] = new ReentrantLock();
+//                     threadConditions[i] = threadLocks[i].newCondition();
                        //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
 
@@ -624,7 +627,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
+       final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
 
                try {
                        QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
@@ -656,7 +659,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //
 //     }
 
-       public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
+       public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
 
                QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
 
@@ -2752,51 +2755,60 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        @Override
-       final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
-
-               throw new UnsupportedOperationException();
-               
-//             assert(subject != null);
-//             assert(procedure != null);
-//
-//             final ListenerBase listener = getListenerBase(procedure);
-//
-//             MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
-//
-//             int sId = querySupport.getId(subject);
-//
-//             try {
-//                     QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
-//             } catch (DatabaseException e) {
-//                     Logger.defaultLogError(e);
-//             }
-
-       }
-
-       @Override
-       final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
+       final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
 
                assert(subject != null);
                assert(procedure != null);
 
                final ListenerBase listener = getListenerBase(procedure);
 
-               org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+               int sId = querySupport.getId(subject);
 
-       }
+               try {
+                       QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
 
-       @Override
-       final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
+                               @Override
+                               public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
+                                       procedure.execute(graph, result);
+                               }
 
-               assert(subject != null);
-               assert(procedure != null);
+                               @Override
+                               public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
+                                       procedure.exception(graph, throwable);
+                               }
+                               
+                       });
+               } catch (DatabaseException e) {
+                       Logger.defaultLogError(e);
+               }
 
-               final ListenerBase listener = getListenerBase(procedure);
+       }
 
-               org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
+       final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
+
+//             assert(subject != null);
+//             assert(procedure != null);
+//
+//             final ListenerBase listener = getListenerBase(procedure);
+//
+//             org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+               
+               return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
 
        }
 
+//     @Override
+//     final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
+//
+//             assert(subject != null);
+//             assert(procedure != null);
+//
+//             final ListenerBase listener = getListenerBase(procedure);
+//
+//             org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
+//
+//     }
+       
        private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
 
        @Override
@@ -3482,32 +3494,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        @Override
-       final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
+       final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
                
                assert(subject != null);
-               assert(procedure != null);
 
-               final ListenerBase listener = getListenerBase(procedure);
-               assert(listener == null);
-
-               try {
-                       
-                       QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
-
-                               @Override
-                               public void execute(final ReadGraphImpl graph, RelationInfo set) {
-                                       procedure.execute(graph, set);                                  
-                               }
-
-                               @Override
-                               public void exception(ReadGraphImpl graph, Throwable t) {
-                                       procedure.exception(graph, t);
-                               }
-
-                       });
-               } catch (DatabaseException e) {
-                       Logger.defaultLogError(e);
-               }
+               return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
 
        }
 
@@ -4301,7 +4292,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //     }
 
        @Override
-       final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
+       final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
 
                assert(request != null);
                assert(procedure != null);
@@ -4459,5 +4450,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                }
                return L0;
        }
+
+    public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+        protected Integer initialValue() {
+            return -1;
+        }
+    };
        
 }