X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=9c60691fad7a0a850228997e85ffb47168ca1aae;hb=8b6de499c9c1bad0e72631719fd165ce39d6ad56;hp=2908bd43a83680fc7f24e3fe2faafb1b25bc1b53;hpb=5bc1c2b950d374ecfcc206981724c3b9286dcbd6;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 2908bd43a..9c60691fa 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -16,7 +16,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -32,8 +31,6 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; @@ -54,7 +51,6 @@ import org.simantics.db.exception.NoInverseException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.ResourceImpl; -import org.simantics.db.impl.graph.MultiIntProcedure; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; import org.simantics.db.impl.graph.WriteGraphImpl; @@ -68,15 +64,12 @@ import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.AsyncSetListener; import org.simantics.db.procedure.ListenerBase; import org.simantics.db.procedure.MultiProcedure; -import org.simantics.db.procedure.Procedure; import org.simantics.db.procedure.StatementProcedure; +import org.simantics.db.procedure.SyncMultiProcedure; import org.simantics.db.request.AsyncMultiRead; -import org.simantics.db.request.AsyncRead; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; -import org.simantics.db.request.Read; import org.simantics.db.request.RequestFlags; -import org.simantics.db.request.WriteTraits; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; @@ -162,8 +155,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } public ThreadState[] threadStates; - public ReentrantLock[] threadLocks; - public Condition[] threadConditions; +// public ReentrantLock[] threadLocks; +// public Condition[] threadConditions; //public ArrayList[] ownTasks; @@ -178,31 +171,41 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } + SessionTask getOwnTask(int thread) { + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.thread == thread && !task.systemCall) + return freeScheduling.remove(index); + index++; + } + } + return null; + } + + public boolean performPending(int thread) { + SessionTask task = getOwnTask(thread); + if(task != null) { + task.run(thread); + return true; + } else { + return false; + } + } + // final public void scheduleOwn(int caller, SessionTask request) { // ownTasks[caller].add(request); // } - final public void scheduleAlways(int caller, SessionTask request) { - -// int performer = request.thread; -// if(caller == performer) { -// ownTasks[caller].add(request); -// } else { -// schedule(caller, request); -// } - - schedule(caller, request); - - } - - final public void schedule(int caller, SessionTask request) { + final public void schedule(SessionTask request) { int performer = request.thread; if(DebugPolicy.SCHEDULE) - System.out.println("schedule " + request + " " + caller + " -> " + performer); + System.out.println("schedule " + request + " " + " -> " + performer); - assert(performer >= 0); + //assert(performer >= 0); assert(request != null); @@ -214,18 +217,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap synchronized(querySupportLock) { + //new Exception().printStackTrace(); + freeScheduling.add(request); + + querySupportLock.notifyAll(); //System.err.println("schedule free task " + request + " => " + freeScheduling.size()); - for(int i=0;i throwable; - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread) { - super(object, thread, thread); - this.throwable = throwable; - this.notify = notify; - } - - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread, int syncThread) { - super(object, thread, syncThread); + public SessionRead(DataContainer throwable, Semaphore notify) { + super(true); this.throwable = throwable; this.notify = notify; } @@ -340,8 +343,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap executors = new QueryThread[THREADS]; // queues = new ArrayList[THREADS]; - threadLocks = new ReentrantLock[THREADS]; - threadConditions = new Condition[THREADS]; +// threadLocks = new ReentrantLock[THREADS]; +// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; // ownTasks = new ArrayList[THREADS]; // ownSyncTasks = new ArrayList[THREADS]; @@ -359,8 +362,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // ownTasks[i] = new ArrayList(); // ownSyncTasks[i] = new ArrayList(); // queues[i] = new ArrayList(); - threadLocks[i] = new ReentrantLock(); - threadConditions[i] = threadLocks[i].newCondition(); +// threadLocks[i] = new ReentrantLock(); +// threadConditions[i] = threadLocks[i].newCondition(); // limits[i] = false; threadStates[i] = ThreadState.INIT; @@ -624,7 +627,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - final void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure procedure) { + final void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure procedure) { try { QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure); @@ -656,7 +659,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // // } - public void queryMultiRead(final ReadGraphImpl graph, final MultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { + public void queryMultiRead(final ReadGraphImpl graph, final MultiRead query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure procedure) throws DatabaseException { QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure); @@ -2752,51 +2755,60 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } @Override - final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - throw new UnsupportedOperationException(); - -// assert(subject != null); -// assert(procedure != null); -// -// final ListenerBase listener = getListenerBase(procedure); -// -// MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport); -// -// int sId = querySupport.getId(subject); -// -// try { -// QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc); -// } catch (DatabaseException e) { -// Logger.defaultLogError(e); -// } - - } - - @Override - final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure procedure) { + final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); - org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); + int sId = querySupport.getId(subject); - } + try { + QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure() { - @Override - final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure, boolean ignoreVirtual) { + @Override + public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException { + procedure.execute(graph, result); + } - assert(subject != null); - assert(procedure != null); + @Override + public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException { + procedure.exception(graph, throwable); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } - final ListenerBase listener = getListenerBase(procedure); + } - org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual); + final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) { + +// assert(subject != null); +// assert(procedure != null); +// +// final ListenerBase listener = getListenerBase(procedure); +// +// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); + + return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual); } +// @Override +// final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure procedure, boolean ignoreVirtual) { +// +// assert(subject != null); +// assert(procedure != null); +// +// final ListenerBase listener = getListenerBase(procedure); +// +// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual); +// +// } + private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE); @Override @@ -3482,32 +3494,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } @Override - final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException { assert(subject != null); - assert(procedure != null); - final ListenerBase listener = getListenerBase(procedure); - assert(listener == null); - - try { - - QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure() { - - @Override - public void execute(final ReadGraphImpl graph, RelationInfo set) { - procedure.execute(graph, set); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - procedure.exception(graph, t); - } - - }); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } + return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null); } @@ -4301,7 +4292,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // } @Override - final public void query(final ReadGraphImpl impl, final MultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { + final public void query(final ReadGraphImpl impl, final MultiRead request, final CacheEntry parent, final SyncMultiProcedure procedure, ListenerBase listener) { assert(request != null); assert(procedure != null); @@ -4459,5 +4450,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } return L0; } + + public static ThreadLocal thread = new ThreadLocal() { + protected Integer initialValue() { + return -1; + } + }; }