]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Work in progress
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
index 07a390a51eafa254965f919fb8f03839b56576ff..f7c56a213d83ba92844fae162b002d713fe8931d 100644 (file)
@@ -2,6 +2,7 @@ package org.simantics.db.impl.query;
 
 import java.util.ArrayList;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -27,15 +28,15 @@ class QueryThread extends Thread implements SessionThread {
 //     final private ArrayList<SessionTask> own;
 //     final private ArrayList<SessionTask> ownSync;
 //     final private ArrayList<SessionTask> queue;
-       final private ReentrantLock lock;
-       final private Condition condition;
+//     final private ReentrantLock lock;
+//     final private Condition condition;
        final private Object querySupportLock;
        final private int THREADS;
        final private AtomicInteger sleepers;
        final private ThreadState[] threadStates;
 //     final private ArrayList<SessionTask>[] delayQueues;
-       final private QueryThread[] executors;
-       final private ReentrantLock[] threadLocks;
+//     final private QueryThread[] executors;
+//     final private ReentrantLock[] threadLocks;
 //     final private ArrayList<SessionTask>[] queues;
 //     final private ArrayList<SessionTask>[] ownSyncTasks;
 
@@ -47,16 +48,16 @@ class QueryThread extends Thread implements SessionThread {
 //             own = processor.ownTasks[index];
 //             ownSync = processor.ownSyncTasks[index];
 //             queue = processor.queues[index];
-               lock = processor.threadLocks[index];
-               condition = processor.threadConditions[index];
+//             lock = processor.threadLocks[index];
+//             condition = processor.threadConditions[index];
                querySupportLock = processor.querySupportLock;
                THREADS = processor.THREADS;
                sleepers = processor.sleepers;
                querySupport = processor.querySupport;
                threadStates = processor.threadStates;
 //             delayQueues = processor.delayQueues;
-               executors = processor.executors;
-               threadLocks = processor.threadLocks;
+//             executors = processor.executors;
+//             threadLocks = processor.threadLocks;
 //             queues = processor.queues;
 //             ownSyncTasks = processor.ownSyncTasks;
        }
@@ -66,9 +67,9 @@ class QueryThread extends Thread implements SessionThread {
 //             System.err.println("qt dispose");
 
                disposed = true;
-               lock.lock();
-               condition.signalAll();
-               lock.unlock();
+//             lock.lock();
+//             condition.signalAll();
+//             lock.unlock();
                
                try {
                        exited.acquire();
@@ -96,29 +97,20 @@ class QueryThread extends Thread implements SessionThread {
 
        }
 
+       private boolean pumpTask() {
+               if(!processor.freeScheduling.isEmpty()) {
+                       tasks.add(processor.freeScheduling.removeFirst());
+                       return true;
+               }
+               return false;
+       }
+       
        ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
 
                try {
 
                        while(true) {
 
-                               // Perform own tasks first
-//                             if(tasks.addAll(own)) {
-//                                     own.clear();
-//                             } else if (doWait && !ownSync.isEmpty()) {
-//                                     tasks.add(ownSync.remove(ownSync.size()-1));
-//                             }
-
-                               // Try some queued tasks
-//                             lock.lock();
-//                             if(tasks.addAll(queue)) {
-//                                     queue.clear();
-//                                     lock.unlock();
-//                                     return tasks;
-//                             } else {
-//                                     lock.unlock();
-//                             }
-
                                // Return tasks if some were found
                                if(!tasks.isEmpty()) return tasks;
                                
@@ -126,23 +118,10 @@ class QueryThread extends Thread implements SessionThread {
 
                                synchronized (querySupportLock) {
 
-//                                     System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")");
-                                       
-                                       if(!processor.freeScheduling.isEmpty()) {
-                                               tasks.add(processor.freeScheduling.removeFirst());
+                                       if(pumpTask())
                                                return tasks;
-                                       }
 
-                                       lock.lock();
-                                       
-                                       // Just maybe someone inserted tasks and notified just before synchronized block
-//                                     if(tasks.addAll(queue)) {
-//                                             queue.clear();
-//                                             lock.unlock();
-//                                             return tasks;
-//                                     }
-                                       
-//                                     System.err.println("QT " + index + ", sleepers = " + sleepers);
+//                                     lock.lock();
 
                                        // We are the last one awake
                                        if(sleepers.incrementAndGet() == THREADS) {
@@ -153,12 +132,8 @@ class QueryThread extends Thread implements SessionThread {
                                                if(querySupport == null) System.err.println("null qs");
                                                querySupport.ceased(index);
 
-//                                             if(tasks.addAll(own)) {
-//                                                     own.clear();
-//                                             }
-                                               //                                      System.err.println("tasks after ceased: " + tasks.size());
-                                               if(!tasks.isEmpty()) {
-                                                       lock.unlock();
+                                               if(pumpTask()) {
+//                                                     lock.unlock();
                                                        return tasks;
                                                }
 
@@ -175,25 +150,38 @@ class QueryThread extends Thread implements SessionThread {
                                // We are done
                                if(isDisposed()) {
                                        threadStates[index] = ThreadState.DISPOSED;
-                                       lock.unlock();
+//                                     lock.unlock();
                                        return null;
                                }
 
+                               
                                threadStates[index] = ThreadState.SLEEP;
-                               condition.await();
+                               
+                               synchronized (querySupportLock) {
+                                       querySupportLock.wait(100);
+                                       
+                               }
+                               
+//                             boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
+//                             if(!woken) {
+//                                     synchronized (querySupportLock) {
+//                                             if(!processor.freeScheduling.isEmpty())
+//                                                     System.err.println("some tasks are available!");
+//                                     }
+//                             }
 
                                sleepers.decrementAndGet();
 
                                // We are done
                                if(isDisposed()) {
                                        threadStates[index] = ThreadState.DISPOSED;
-                                       lock.unlock();
+                                       //lock.unlock();
                                        return null;
                                }
 
                                threadStates[index] = ThreadState.RUN;
 
-                               lock.unlock();
+                               //lock.unlock();
 
                        }
 
@@ -249,6 +237,8 @@ class QueryThread extends Thread implements SessionThread {
        @Override
        public void run() {
 
+               processor.thread.set(index);
+               
                QuerySupport support = this.querySupport;
 
                try {