QueryThread[] executors;
- public ArrayList<SessionTask>[] queues;
+// public ArrayList<SessionTask>[] queues;
+
+ public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
enum ThreadState {
public ReentrantLock[] threadLocks;
public Condition[] threadConditions;
- public ArrayList<SessionTask>[] ownTasks;
+ //public ArrayList<SessionTask>[] ownTasks;
- public ArrayList<SessionTask>[] ownSyncTasks;
+ //public ArrayList<SessionTask>[] ownSyncTasks;
- ArrayList<SessionTask>[] delayQueues;
+ //ArrayList<SessionTask>[] delayQueues;
- public boolean synch = true;
-
final Object querySupportLock;
public Long modificationCounter = 0L;
public void close() {
}
- final public void scheduleOwn(int caller, SessionTask request) {
- ownTasks[caller].add(request);
- }
+// final public void scheduleOwn(int caller, SessionTask request) {
+// ownTasks[caller].add(request);
+// }
final public void scheduleAlways(int caller, SessionTask request) {
- int performer = request.thread;
- if(caller == performer) {
- ownTasks[caller].add(request);
- } else {
- schedule(caller, request);
- }
+// int performer = request.thread;
+// if(caller == performer) {
+// ownTasks[caller].add(request);
+// } else {
+// schedule(caller, request);
+// }
+ schedule(caller, request);
+
}
final public void schedule(int caller, SessionTask request) {
assert(request != null);
- if(caller == performer) {
- request.run(caller);
- } else {
- ReentrantLock queueLock = threadLocks[performer];
- queueLock.lock();
- queues[performer].add(request);
- // This thread could have been sleeping
- if(queues[performer].size() == 1) {
- if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
- threadConditions[performer].signalAll();
- }
- queueLock.unlock();
- }
+// if(caller == performer) {
+// request.run(caller);
+// } else {
+
+// if(performer == THREADS) {
+
+ synchronized(querySupportLock) {
+
+ freeScheduling.add(request);
+
+ //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
+
+ for(int i=0;i<THREADS;i++) {
+ ReentrantLock queueLock = threadLocks[i];
+ queueLock.lock();
+ //queues[performer].add(request);
+ //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+ threadConditions[i].signalAll();
+ queueLock.unlock();
+ }
+
+ }
+
+ return;
+
+// }
+//
+// ReentrantLock queueLock = threadLocks[performer];
+// queueLock.lock();
+// queues[performer].add(request);
+// // This thread could have been sleeping
+// if(queues[performer].size() == 1) {
+// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
+// threadConditions[performer].signalAll();
+// }
+// queueLock.unlock();
+// }
}
querySupportLock = core.getLock();
executors = new QueryThread[THREADS];
- queues = new ArrayList[THREADS];
+// queues = new ArrayList[THREADS];
threadLocks = new ReentrantLock[THREADS];
threadConditions = new Condition[THREADS];
threadStates = new ThreadState[THREADS];
- ownTasks = new ArrayList[THREADS];
- ownSyncTasks = new ArrayList[THREADS];
- delayQueues = new ArrayList[THREADS * THREADS];
+// ownTasks = new ArrayList[THREADS];
+// ownSyncTasks = new ArrayList[THREADS];
+// delayQueues = new ArrayList[THREADS * THREADS];
// freeSchedule = new AtomicInteger(0);
- for (int i = 0; i < THREADS * THREADS; i++) {
- delayQueues[i] = new ArrayList<SessionTask>();
- }
+// for (int i = 0; i < THREADS * THREADS; i++) {
+// delayQueues[i] = new ArrayList<SessionTask>();
+// }
for (int i = 0; i < THREADS; i++) {
// tasks[i] = new ArrayList<Runnable>();
- ownTasks[i] = new ArrayList<SessionTask>();
- ownSyncTasks[i] = new ArrayList<SessionTask>();
- queues[i] = new ArrayList<SessionTask>();
+// ownTasks[i] = new ArrayList<SessionTask>();
+// ownSyncTasks[i] = new ArrayList<SessionTask>();
+// queues[i] = new ArrayList<SessionTask>();
threadLocks[i] = new ReentrantLock();
threadConditions[i] = threadLocks[i].newCondition();
// limits[i] = false;
});
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
+
+ try {
+
+ procedure.exception(graph, e);
+
+ } catch (DatabaseException e1) {
+
+ Logger.defaultLogError(e1);
+
+ }
+
}
}
CacheEntry entry = e.entry;
- System.err.println("updateQuery " + entry);
+ //System.err.println("updateQuery " + entry);
/*
* If the dependency graph forms a DAG, some entries are inserted in the
}
- @Override
- final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
-
- assert(request != null);
- assert(procedure != null);
-
- try {
-
- queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
-
- @Override
- public String toString() {
- return procedure.toString();
- }
-
- @Override
- public void execute(AsyncReadGraph graph, T result) {
- try {
- procedure.execute(result);
- } catch (Throwable t2) {
- Logger.defaultLogError(t2);
- }
- }
-
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- try {
- procedure.exception(throwable);
- } catch (Throwable t2) {
- Logger.defaultLogError(t2);
- }
- }
-
- });
-
- } catch (DatabaseException e) {
-
- throw new IllegalStateException(e);
-
- }
-
- }
+// @Override
+// final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
+//
+// assert(request != null);
+// assert(procedure != null);
+//
+// try {
+//
+// queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
+//
+// @Override
+// public String toString() {
+// return procedure.toString();
+// }
+//
+// @Override
+// public void execute(AsyncReadGraph graph, T result) {
+// try {
+// procedure.execute(result);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// @Override
+// public void exception(AsyncReadGraph graph, Throwable throwable) {
+// try {
+// procedure.exception(throwable);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// });
+//
+// } catch (DatabaseException e) {
+//
+// throw new IllegalStateException(e);
+//
+// }
+//
+// }
@Override
public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {