X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=5945589a331c2460f83e0479f4f56f2b7425ebf2;hb=d0e8f57df83c175bac2098412ee71646fb9ff0e9;hp=a6c987a9d0e10e0b2beda36f7a571a8633527063;hpb=061c58a485fbfda0732a8dc597582762a97012e4;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index a6c987a9d..5945589a3 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -16,6 +16,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -150,7 +151,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap QueryThread[] executors; - public ArrayList[] queues; +// public ArrayList[] queues; + + public LinkedList freeScheduling = new LinkedList(); enum ThreadState { @@ -159,17 +162,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } public ThreadState[] threadStates; - public ReentrantLock[] threadLocks; - public Condition[] threadConditions; +// public ReentrantLock[] threadLocks; +// public Condition[] threadConditions; - public ArrayList[] ownTasks; + //public ArrayList[] ownTasks; - public ArrayList[] ownSyncTasks; + //public ArrayList[] ownSyncTasks; - ArrayList[] delayQueues; + //ArrayList[] delayQueues; - public boolean synch = true; - final Object querySupportLock; public Long modificationCounter = 0L; @@ -177,45 +178,85 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - final public void scheduleOwn(int caller, SessionTask request) { - ownTasks[caller].add(request); + SessionTask getOwnTask(int thread) { + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.thread == thread && !task.systemCall) + return freeScheduling.remove(index); + index++; + } + } + return null; } - - final public void scheduleAlways(int caller, SessionTask request) { - - int performer = request.thread; - if(caller == performer) { - ownTasks[caller].add(request); + + public boolean performPending(int thread) { + SessionTask task = getOwnTask(thread); + if(task != null) { + task.run(thread); + return true; } else { - schedule(caller, request); + return false; } - } - final public void schedule(int caller, SessionTask request) { +// final public void scheduleOwn(int caller, SessionTask request) { +// ownTasks[caller].add(request); +// } + + final public void schedule(SessionTask request) { int performer = request.thread; if(DebugPolicy.SCHEDULE) - System.out.println("schedule " + request + " " + caller + " -> " + performer); + System.out.println("schedule " + request + " " + " -> " + performer); - assert(performer >= 0); + //assert(performer >= 0); assert(request != null); - if(caller == performer) { - request.run(caller); - } else { - ReentrantLock queueLock = threadLocks[performer]; - queueLock.lock(); - queues[performer].add(request); - // This thread could have been sleeping - if(queues[performer].size() == 1) { - if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); - threadConditions[performer].signalAll(); - } - queueLock.unlock(); - } +// if(caller == performer) { +// request.run(caller); +// } else { + +// if(performer == THREADS) { + + synchronized(querySupportLock) { + + //new Exception().printStackTrace(); + + freeScheduling.add(request); + + querySupportLock.notifyAll(); + + //System.err.println("schedule free task " + request + " => " + freeScheduling.size()); + +// for(int i=0;i throwable; - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread) { - super(object, thread, thread); - this.throwable = throwable; - this.notify = notify; - } - - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread, int syncThread) { - super(object, thread, syncThread); + public SessionRead(DataContainer throwable, Semaphore notify) { + super(true); this.throwable = throwable; this.notify = notify; } @@ -279,10 +316,29 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } + + //private WeakReference garbageTracker; + + private class GarbageTracker { + + @Override + protected void finalize() throws Throwable { + +// System.err.println("GarbageTracker"); +// +// garbageTracker = new WeakReference(new GarbageTracker()); + + super.finalize(); + + } + + } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { + //garbageTracker = new WeakReference(new GarbageTracker()); + THREADS = threads; THREAD_MASK = threads - 1; @@ -293,28 +349,28 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; - queues = new ArrayList[THREADS]; - threadLocks = new ReentrantLock[THREADS]; - threadConditions = new Condition[THREADS]; +// queues = new ArrayList[THREADS]; +// threadLocks = new ReentrantLock[THREADS]; +// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; - ownTasks = new ArrayList[THREADS]; - ownSyncTasks = new ArrayList[THREADS]; - delayQueues = new ArrayList[THREADS * THREADS]; +// ownTasks = new ArrayList[THREADS]; +// ownSyncTasks = new ArrayList[THREADS]; +// delayQueues = new ArrayList[THREADS * THREADS]; // freeSchedule = new AtomicInteger(0); - for (int i = 0; i < THREADS * THREADS; i++) { - delayQueues[i] = new ArrayList(); - } +// for (int i = 0; i < THREADS * THREADS; i++) { +// delayQueues[i] = new ArrayList(); +// } for (int i = 0; i < THREADS; i++) { // tasks[i] = new ArrayList(); - ownTasks[i] = new ArrayList(); - ownSyncTasks[i] = new ArrayList(); - queues[i] = new ArrayList(); - threadLocks[i] = new ReentrantLock(); - threadConditions[i] = threadLocks[i].newCondition(); +// ownTasks[i] = new ArrayList(); +// ownSyncTasks[i] = new ArrayList(); +// queues[i] = new ArrayList(); +// threadLocks[i] = new ReentrantLock(); +// threadConditions[i] = threadLocks[i].newCondition(); // limits[i] = false; threadStates[i] = ThreadState.INIT; @@ -552,7 +608,17 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap }); } catch (DatabaseException e) { - Logger.defaultLogError(e); + + try { + + procedure.exception(graph, e); + + } catch (DatabaseException e1) { + + Logger.defaultLogError(e1); + + } + } } @@ -1340,7 +1406,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap CacheEntry entry = e.entry; - System.err.println("updateQuery " + entry); + //System.err.println("updateQuery " + entry); /* * If the dependency graph forms a DAG, some entries are inserted in the @@ -2324,11 +2390,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable { - - return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null, null); - + return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); } - @Override final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, @@ -3424,7 +3487,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap assert(subject != null); - return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null, null); + return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null); } @@ -3647,17 +3710,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException { - return QueryCache.resultValueQuery(impl, subject, impl.parent, null, null); + return QueryCache.resultValueQuery(impl, subject, impl.parent, null); } @Override - final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); + assert(procedure != null); int sId = querySupport.getId(subject); - if(procedure != null) { +// if(procedure != null) { final ListenerBase listener = getListenerBase(procedure); @@ -3699,23 +3763,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // else impl.state.barrier.inc(null, null); try { - return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, listener, ip); + QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip); } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - - } else { + throw new IllegalStateException("Internal error"); + } - try { - return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, null, null); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - } - - throw new IllegalStateException("Internal error"); +// } else { +// +// return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null); +// +// } +// +// throw new IllegalStateException("Internal error"); } @@ -3973,7 +4032,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final ListenerBase listener = getListenerBase(procedure); try { - IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener,QueryCache.emptyProcedureDirectPredicates); + IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener); procedure.execute(impl, !result.isEmpty()); } catch (DatabaseException e) { procedure.exception(impl, e); @@ -4323,48 +4382,48 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - @Override - final public void query(final ReadGraphImpl impl, final ExternalRead request, final CacheEntry parent, final Procedure procedure, ListenerBase listener) { - - assert(request != null); - assert(procedure != null); - - try { - - queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure() { - - @Override - public String toString() { - return procedure.toString(); - } - - @Override - public void execute(AsyncReadGraph graph, T result) { - try { - procedure.execute(result); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - try { - procedure.exception(throwable); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }); - - } catch (DatabaseException e) { - - throw new IllegalStateException(e); - - } - - } +// @Override +// final public void query(final ReadGraphImpl impl, final ExternalRead request, final CacheEntry parent, final Procedure procedure, ListenerBase listener) throws DatabaseException { +// +// assert(request != null); +// assert(procedure != null); +// +// try { +// +// queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure() { +// +// @Override +// public String toString() { +// return procedure.toString(); +// } +// +// @Override +// public void execute(AsyncReadGraph graph, T result) { +// try { +// procedure.execute(result); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +// } +// +// @Override +// public void exception(AsyncReadGraph graph, Throwable throwable) { +// try { +// procedure.exception(throwable); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +// } +// +// }); +// +// } catch (DatabaseException e) { +// +// throw new IllegalStateException(e); +// +// } +// +// } @Override public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) { @@ -4410,5 +4469,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } return L0; } + + public static ThreadLocal thread = new ThreadLocal() { + protected Integer initialValue() { + return -1; + } + }; }