import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import org.simantics.databoard.Bindings;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.exception.ResourceNotFoundException;
import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.ResourceImpl;
-import org.simantics.db.impl.graph.MultiIntProcedure;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.graph.ReadGraphSupport;
import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.procedure.AsyncSetListener;
import org.simantics.db.procedure.ListenerBase;
import org.simantics.db.procedure.MultiProcedure;
-import org.simantics.db.procedure.Procedure;
import org.simantics.db.procedure.StatementProcedure;
+import org.simantics.db.procedure.SyncMultiProcedure;
import org.simantics.db.request.AsyncMultiRead;
-import org.simantics.db.request.AsyncRead;
import org.simantics.db.request.ExternalRead;
import org.simantics.db.request.MultiRead;
-import org.simantics.db.request.Read;
import org.simantics.db.request.RequestFlags;
-import org.simantics.db.request.WriteTraits;
import org.simantics.layer0.Layer0;
import org.simantics.utils.DataContainer;
import org.simantics.utils.Development;
QueryThread[] executors;
- public ArrayList<SessionTask>[] queues;
+// public ArrayList<SessionTask>[] queues;
+
+ public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
enum ThreadState {
}
public ThreadState[] threadStates;
- public ReentrantLock[] threadLocks;
- public Condition[] threadConditions;
+// public ReentrantLock[] threadLocks;
+// public Condition[] threadConditions;
- public ArrayList<SessionTask>[] ownTasks;
+ //public ArrayList<SessionTask>[] ownTasks;
- public ArrayList<SessionTask>[] ownSyncTasks;
+ //public ArrayList<SessionTask>[] ownSyncTasks;
- ArrayList<SessionTask>[] delayQueues;
+ //ArrayList<SessionTask>[] delayQueues;
- public boolean synch = true;
-
final Object querySupportLock;
public Long modificationCounter = 0L;
public void close() {
}
- final public void scheduleOwn(int caller, SessionTask request) {
- ownTasks[caller].add(request);
+ SessionTask getOwnTask(int thread) {
+ synchronized(querySupportLock) {
+ int index = 0;
+ while(index < freeScheduling.size()) {
+ SessionTask task = freeScheduling.get(index);
+ if(task.thread == thread && !task.systemCall)
+ return freeScheduling.remove(index);
+ index++;
+ }
+ }
+ return null;
}
-
- final public void scheduleAlways(int caller, SessionTask request) {
-
- int performer = request.thread;
- if(caller == performer) {
- ownTasks[caller].add(request);
+
+ public boolean performPending(int thread) {
+ SessionTask task = getOwnTask(thread);
+ if(task != null) {
+ task.run(thread);
+ return true;
} else {
- schedule(caller, request);
+ return false;
}
-
}
- final public void schedule(int caller, SessionTask request) {
+// final public void scheduleOwn(int caller, SessionTask request) {
+// ownTasks[caller].add(request);
+// }
+
+ final public void schedule(SessionTask request) {
int performer = request.thread;
if(DebugPolicy.SCHEDULE)
- System.out.println("schedule " + request + " " + caller + " -> " + performer);
+ System.out.println("schedule " + request + " " + " -> " + performer);
- assert(performer >= 0);
+ //assert(performer >= 0);
assert(request != null);
- if(caller == performer) {
- request.run(caller);
- } else {
- ReentrantLock queueLock = threadLocks[performer];
- queueLock.lock();
- queues[performer].add(request);
- // This thread could have been sleeping
- if(queues[performer].size() == 1) {
- if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
- threadConditions[performer].signalAll();
- }
- queueLock.unlock();
- }
+// if(caller == performer) {
+// request.run(caller);
+// } else {
+
+// if(performer == THREADS) {
+
+ synchronized(querySupportLock) {
+
+ //new Exception().printStackTrace();
+
+ freeScheduling.add(request);
+
+ querySupportLock.notifyAll();
+
+ //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
+
+// for(int i=0;i<THREADS;i++) {
+// ReentrantLock queueLock = threadLocks[i];
+// queueLock.lock();
+// //queues[performer].add(request);
+// //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+// threadConditions[i].signalAll();
+// queueLock.unlock();
+// }
+
+ }
+
+ return;
+
+// }
+//
+// ReentrantLock queueLock = threadLocks[performer];
+// queueLock.lock();
+// queues[performer].add(request);
+// // This thread could have been sleeping
+// if(queues[performer].size() == 1) {
+// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
+// threadConditions[performer].signalAll();
+// }
+// queueLock.unlock();
+// }
}
public static abstract class SessionTask {
final public int thread;
- final public int syncCaller;
- final public Object object;
-
- public SessionTask(WriteTraits object, int thread) {
- this.thread = thread;
- this.syncCaller = -1;
- this.object = object;
+ final public boolean systemCall;
+// final public int syncCaller;
+ //final public Object object;
+
+ public SessionTask(boolean systemCall) {
+ this.thread = QueryProcessor.thread.get();
+ this.systemCall = systemCall;
+// this.syncCaller = -1;
+ //this.object = object;
}
- public SessionTask(Object object, int thread, int syncCaller) {
- this.thread = thread;
- this.syncCaller = syncCaller;
- this.object = object;
- }
+// public SessionTask(Object object, int syncCaller) {
+// this.thread = QueryProcessor.thread.get();
+// this.syncCaller = syncCaller;
+// this.object = object;
+// }
public abstract void run(int thread);
@Override
public String toString() {
- return "SessionTask[" + object + "]";
+ return "SessionTask[" + super.toString() + "]";
}
}
final public Semaphore notify;
final public DataContainer<Throwable> throwable;
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
- super(object, thread, thread);
- this.throwable = throwable;
- this.notify = notify;
- }
-
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
- super(object, thread, syncThread);
+ public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+ super(true);
this.throwable = throwable;
this.notify = notify;
}
public boolean resume(ReadGraphImpl graph) {
return executors[0].runSynchronized();
}
+
+ //private WeakReference<GarbageTracker> garbageTracker;
+
+ private class GarbageTracker {
+
+ @Override
+ protected void finalize() throws Throwable {
+
+// System.err.println("GarbageTracker");
+//
+// garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
+
+ super.finalize();
+
+ }
+
+ }
public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
throws DatabaseException {
+ //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
+
THREADS = threads;
THREAD_MASK = threads - 1;
querySupportLock = core.getLock();
executors = new QueryThread[THREADS];
- queues = new ArrayList[THREADS];
- threadLocks = new ReentrantLock[THREADS];
- threadConditions = new Condition[THREADS];
+// queues = new ArrayList[THREADS];
+// threadLocks = new ReentrantLock[THREADS];
+// threadConditions = new Condition[THREADS];
threadStates = new ThreadState[THREADS];
- ownTasks = new ArrayList[THREADS];
- ownSyncTasks = new ArrayList[THREADS];
- delayQueues = new ArrayList[THREADS * THREADS];
+// ownTasks = new ArrayList[THREADS];
+// ownSyncTasks = new ArrayList[THREADS];
+// delayQueues = new ArrayList[THREADS * THREADS];
// freeSchedule = new AtomicInteger(0);
- for (int i = 0; i < THREADS * THREADS; i++) {
- delayQueues[i] = new ArrayList<SessionTask>();
- }
+// for (int i = 0; i < THREADS * THREADS; i++) {
+// delayQueues[i] = new ArrayList<SessionTask>();
+// }
for (int i = 0; i < THREADS; i++) {
// tasks[i] = new ArrayList<Runnable>();
- ownTasks[i] = new ArrayList<SessionTask>();
- ownSyncTasks[i] = new ArrayList<SessionTask>();
- queues[i] = new ArrayList<SessionTask>();
- threadLocks[i] = new ReentrantLock();
- threadConditions[i] = threadLocks[i].newCondition();
+// ownTasks[i] = new ArrayList<SessionTask>();
+// ownSyncTasks[i] = new ArrayList<SessionTask>();
+// queues[i] = new ArrayList<SessionTask>();
+// threadLocks[i] = new ReentrantLock();
+// threadConditions[i] = threadLocks[i].newCondition();
// limits[i] = false;
threadStates[i] = ThreadState.INIT;
});
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
+
+ try {
+
+ procedure.exception(graph, e);
+
+ } catch (DatabaseException e1) {
+
+ Logger.defaultLogError(e1);
+
+ }
+
}
}
}
- final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
+ final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
try {
QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
//
// }
- public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
+ public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
CacheEntry entry = e.entry;
- System.err.println("updateQuery " + entry);
+ //System.err.println("updateQuery " + entry);
/*
* If the dependency graph forms a DAG, some entries are inserted in the
@Override
final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
-
- return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null, null);
-
+ return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
}
-
@Override
final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
}
@Override
- final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
-
- throw new UnsupportedOperationException();
-
-// assert(subject != null);
-// assert(procedure != null);
-//
-// final ListenerBase listener = getListenerBase(procedure);
-//
-// MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
-//
-// int sId = querySupport.getId(subject);
-//
-// try {
-// QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
-// } catch (DatabaseException e) {
-// Logger.defaultLogError(e);
-// }
-
- }
-
- @Override
- final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
+ final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
assert(subject != null);
assert(procedure != null);
final ListenerBase listener = getListenerBase(procedure);
- org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+ int sId = querySupport.getId(subject);
- }
+ try {
+ QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
- @Override
- final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
+ @Override
+ public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
+ procedure.execute(graph, result);
+ }
- assert(subject != null);
- assert(procedure != null);
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
+ procedure.exception(graph, throwable);
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
- final ListenerBase listener = getListenerBase(procedure);
+ }
+
+ final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
- org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
+// assert(subject != null);
+// assert(procedure != null);
+//
+// final ListenerBase listener = getListenerBase(procedure);
+//
+// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+
+ return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
}
+// @Override
+// final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
+//
+// assert(subject != null);
+// assert(procedure != null);
+//
+// final ListenerBase listener = getListenerBase(procedure);
+//
+// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
+//
+// }
+
private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
@Override
assert(subject != null);
- return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null, null);
+ return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
}
@Override
- final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
+ final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
assert(subject != null);
- assert(procedure != null);
- final ListenerBase listener = getListenerBase(procedure);
- assert(listener == null);
-
- try {
-
- QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
-
- @Override
- public void execute(final ReadGraphImpl graph, RelationInfo set) {
- procedure.execute(graph, set);
- }
-
- @Override
- public void exception(ReadGraphImpl graph, Throwable t) {
- procedure.exception(graph, t);
- }
-
- });
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
+ return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
}
}
final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
- return QueryCache.resultValueQuery(impl, subject, impl.parent, null, null);
+ return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
}
@Override
- final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
+ final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
assert(subject != null);
+ assert(procedure != null);
int sId = querySupport.getId(subject);
- if(procedure != null) {
+// if(procedure != null) {
final ListenerBase listener = getListenerBase(procedure);
// else impl.state.barrier.inc(null, null);
try {
- return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, listener, ip);
+ QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
-
-
- } else {
+ throw new IllegalStateException("Internal error");
+ }
- try {
- return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, null, null);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
-
- }
-
- throw new IllegalStateException("Internal error");
+// } else {
+//
+// return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
+//
+// }
+//
+// throw new IllegalStateException("Internal error");
}
final ListenerBase listener = getListenerBase(procedure);
try {
- IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener,QueryCache.emptyProcedureDirectPredicates);
+ IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
procedure.execute(impl, !result.isEmpty());
} catch (DatabaseException e) {
procedure.exception(impl, e);
// }
@Override
- final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
+ final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
assert(request != null);
assert(procedure != null);
}
- @Override
- final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
-
- assert(request != null);
- assert(procedure != null);
-
- try {
-
- queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
-
- @Override
- public String toString() {
- return procedure.toString();
- }
-
- @Override
- public void execute(AsyncReadGraph graph, T result) {
- try {
- procedure.execute(result);
- } catch (Throwable t2) {
- Logger.defaultLogError(t2);
- }
- }
-
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- try {
- procedure.exception(throwable);
- } catch (Throwable t2) {
- Logger.defaultLogError(t2);
- }
- }
-
- });
-
- } catch (DatabaseException e) {
-
- throw new IllegalStateException(e);
-
- }
-
- }
+// @Override
+// final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
+//
+// assert(request != null);
+// assert(procedure != null);
+//
+// try {
+//
+// queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
+//
+// @Override
+// public String toString() {
+// return procedure.toString();
+// }
+//
+// @Override
+// public void execute(AsyncReadGraph graph, T result) {
+// try {
+// procedure.execute(result);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// @Override
+// public void exception(AsyncReadGraph graph, Throwable throwable) {
+// try {
+// procedure.exception(throwable);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// });
+//
+// } catch (DatabaseException e) {
+//
+// throw new IllegalStateException(e);
+//
+// }
+//
+// }
@Override
public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
}
return L0;
}
+
+ public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return -1;
+ }
+ };
}