}
public ThreadState[] threadStates;
- public ReentrantLock[] threadLocks;
- public Condition[] threadConditions;
+// public ReentrantLock[] threadLocks;
+// public Condition[] threadConditions;
//public ArrayList<SessionTask>[] ownTasks;
public void close() {
}
+ SessionTask getOwnTask(int thread) {
+ synchronized(querySupportLock) {
+ int index = 0;
+ while(index < freeScheduling.size()) {
+ SessionTask task = freeScheduling.get(index);
+ if(task.thread == thread && !task.systemCall)
+ return freeScheduling.remove(index);
+ index++;
+ }
+ }
+ return null;
+ }
+
+ public boolean performPending(int thread) {
+ SessionTask task = getOwnTask(thread);
+ if(task != null) {
+ task.run(thread);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
// final public void scheduleOwn(int caller, SessionTask request) {
// ownTasks[caller].add(request);
// }
- final public void scheduleAlways(int caller, SessionTask request) {
-
-// int performer = request.thread;
-// if(caller == performer) {
-// ownTasks[caller].add(request);
-// } else {
-// schedule(caller, request);
-// }
-
- schedule(caller, request);
-
- }
-
- final public void schedule(int caller, SessionTask request) {
+ final public void schedule(SessionTask request) {
int performer = request.thread;
if(DebugPolicy.SCHEDULE)
- System.out.println("schedule " + request + " " + caller + " -> " + performer);
+ System.out.println("schedule " + request + " " + " -> " + performer);
- assert(performer >= 0);
+ //assert(performer >= 0);
assert(request != null);
synchronized(querySupportLock) {
+ //new Exception().printStackTrace();
+
freeScheduling.add(request);
+
+ querySupportLock.notifyAll();
//System.err.println("schedule free task " + request + " => " + freeScheduling.size());
- for(int i=0;i<THREADS;i++) {
- ReentrantLock queueLock = threadLocks[i];
- queueLock.lock();
- //queues[performer].add(request);
- //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
- threadConditions[i].signalAll();
- queueLock.unlock();
- }
+// for(int i=0;i<THREADS;i++) {
+// ReentrantLock queueLock = threadLocks[i];
+// queueLock.lock();
+// //queues[performer].add(request);
+// //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+// threadConditions[i].signalAll();
+// queueLock.unlock();
+// }
}
public static abstract class SessionTask {
final public int thread;
- final public int syncCaller;
- final public Object object;
-
- public SessionTask(WriteTraits object, int thread) {
- this.thread = thread;
- this.syncCaller = -1;
- this.object = object;
+ final public boolean systemCall;
+// final public int syncCaller;
+ //final public Object object;
+
+ public SessionTask(boolean systemCall) {
+ this.thread = QueryProcessor.thread.get();
+ this.systemCall = systemCall;
+// this.syncCaller = -1;
+ //this.object = object;
}
- public SessionTask(Object object, int thread, int syncCaller) {
- this.thread = thread;
- this.syncCaller = syncCaller;
- this.object = object;
- }
+// public SessionTask(Object object, int syncCaller) {
+// this.thread = QueryProcessor.thread.get();
+// this.syncCaller = syncCaller;
+// this.object = object;
+// }
public abstract void run(int thread);
@Override
public String toString() {
- return "SessionTask[" + object + "]";
+ return "SessionTask[" + super.toString() + "]";
}
}
final public Semaphore notify;
final public DataContainer<Throwable> throwable;
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
- super(object, thread, thread);
- this.throwable = throwable;
- this.notify = notify;
- }
-
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
- super(object, thread, syncThread);
+ public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+ super(true);
this.throwable = throwable;
this.notify = notify;
}
executors = new QueryThread[THREADS];
// queues = new ArrayList[THREADS];
- threadLocks = new ReentrantLock[THREADS];
- threadConditions = new Condition[THREADS];
+// threadLocks = new ReentrantLock[THREADS];
+// threadConditions = new Condition[THREADS];
threadStates = new ThreadState[THREADS];
// ownTasks = new ArrayList[THREADS];
// ownSyncTasks = new ArrayList[THREADS];
// ownTasks[i] = new ArrayList<SessionTask>();
// ownSyncTasks[i] = new ArrayList<SessionTask>();
// queues[i] = new ArrayList<SessionTask>();
- threadLocks[i] = new ReentrantLock();
- threadConditions[i] = threadLocks[i].newCondition();
+// threadLocks[i] = new ReentrantLock();
+// threadConditions[i] = threadLocks[i].newCondition();
// limits[i] = false;
threadStates[i] = ThreadState.INIT;
}
return L0;
}
+
+ public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return -1;
+ }
+ };
}