]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Merge branch 'private/db-threads' into private/threads-testing
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index 2908bd43a83680fc7f24e3fe2faafb1b25bc1b53..5945589a331c2460f83e0479f4f56f2b7425ebf2 100644 (file)
@@ -162,8 +162,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        public ThreadState[]                                                                    threadStates;
-       public ReentrantLock[]                                                                  threadLocks;
-       public Condition[]                                                                          threadConditions;
+//     public ReentrantLock[]                                                                  threadLocks;
+//     public Condition[]                                                                          threadConditions;
 
        //public ArrayList<SessionTask>[]                           ownTasks;
 
@@ -178,31 +178,41 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
+       SessionTask getOwnTask(int thread) {
+               synchronized(querySupportLock) {
+                       int index = 0;
+                       while(index < freeScheduling.size()) {
+                               SessionTask task = freeScheduling.get(index);
+                               if(task.thread == thread && !task.systemCall)
+                                       return freeScheduling.remove(index);
+                               index++;
+                       }
+               }
+               return null;
+       }
+       
+       public boolean performPending(int thread) {
+               SessionTask task = getOwnTask(thread);
+               if(task != null) {
+                       task.run(thread);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
 //     final public void scheduleOwn(int caller, SessionTask request) {
 //             ownTasks[caller].add(request);
 //     }
 
-       final public void scheduleAlways(int caller, SessionTask request) {
-
-//             int performer = request.thread;
-//             if(caller == performer) {
-//                     ownTasks[caller].add(request);
-//             } else {
-//                     schedule(caller, request);
-//             }
-
-               schedule(caller, request);
-               
-       }
-
-       final public void schedule(int caller, SessionTask request) {
+       final public void schedule(SessionTask request) {
 
                int performer = request.thread;
 
                if(DebugPolicy.SCHEDULE)
-                       System.out.println("schedule " + request + " " + caller + " -> " + performer);
+                       System.out.println("schedule " + request + " " + " -> " + performer);
 
-               assert(performer >= 0);
+               //assert(performer >= 0);
 
                assert(request != null);
 
@@ -214,18 +224,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                                
                                synchronized(querySupportLock) {
                                        
+                                       //new Exception().printStackTrace();
+                                       
                                        freeScheduling.add(request);
+                                       
+                                       querySupportLock.notifyAll();
 
                                        //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
 
-                                       for(int i=0;i<THREADS;i++) {
-                                               ReentrantLock queueLock = threadLocks[i];
-                                               queueLock.lock();
-                                               //queues[performer].add(request);
-                                               //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
-                                               threadConditions[i].signalAll();
-                                               queueLock.unlock();
-                                       }
+//                                     for(int i=0;i<THREADS;i++) {
+//                                             ReentrantLock queueLock = threadLocks[i];
+//                                             queueLock.lock();
+//                                             //queues[performer].add(request);
+//                                             //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+//                                             threadConditions[i].signalAll();
+//                                             queueLock.unlock();
+//                                     }
 
                                }
 
@@ -255,26 +269,28 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public static abstract class SessionTask {
 
                final public int thread;
-               final public int syncCaller;
-               final public Object object;
-
-               public SessionTask(WriteTraits object, int thread) {
-                       this.thread = thread;
-                       this.syncCaller = -1;
-                       this.object = object;
+               final public boolean systemCall;
+//             final public int syncCaller;
+               //final public Object object;
+
+               public SessionTask(boolean systemCall) {
+                       this.thread = QueryProcessor.thread.get();
+                       this.systemCall = systemCall;
+//                     this.syncCaller = -1;
+                       //this.object = object;
                }
 
-               public SessionTask(Object object, int thread, int syncCaller) {
-                       this.thread = thread;
-                       this.syncCaller = syncCaller;
-                       this.object = object;
-               }
+//             public SessionTask(Object object, int syncCaller) {
+//                     this.thread = QueryProcessor.thread.get();
+//                     this.syncCaller = syncCaller;
+//                     this.object = object;
+//             }
 
                public abstract void run(int thread);
 
                @Override
                public String toString() {
-                       return "SessionTask[" + object + "]";
+                       return "SessionTask[" + super.toString() + "]";
                }
 
        }
@@ -284,14 +300,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final public Semaphore notify;
                final public DataContainer<Throwable> throwable; 
 
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
-                       super(object, thread, thread);
-                       this.throwable = throwable;
-                       this.notify = notify;
-               }
-
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
-                       super(object, thread, syncThread);
+               public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+                       super(true);
                        this.throwable = throwable;
                        this.notify = notify;
                }
@@ -340,8 +350,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                executors = new QueryThread[THREADS];
 //             queues = new ArrayList[THREADS];
-               threadLocks = new ReentrantLock[THREADS];
-               threadConditions = new Condition[THREADS];
+//             threadLocks = new ReentrantLock[THREADS];
+//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
 //             ownTasks = new ArrayList[THREADS];
 //             ownSyncTasks = new ArrayList[THREADS];
@@ -359,8 +369,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //                     ownTasks[i] = new ArrayList<SessionTask>();
 //                     ownSyncTasks[i] = new ArrayList<SessionTask>();
 //                     queues[i] = new ArrayList<SessionTask>();
-                       threadLocks[i] = new ReentrantLock();
-                       threadConditions[i] = threadLocks[i].newCondition();
+//                     threadLocks[i] = new ReentrantLock();
+//                     threadConditions[i] = threadLocks[i].newCondition();
                        //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
 
@@ -4459,5 +4469,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                }
                return L0;
        }
+
+    public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+        protected Integer initialValue() {
+            return -1;
+        }
+    };
        
 }