From: Antti Villberg Date: Tue, 24 Sep 2019 08:00:28 +0000 (+0300) Subject: Fix livelock situation in QueryProcessor X-Git-Tag: v1.43.0~136^2~52^2~1 X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F74%2F3274%2F5;p=simantics%2Fplatform.git Fix livelock situation in QueryProcessor gitlab #386 Change-Id: I91bb154e6410b125eb13171f9152996286a6d54d --- diff --git a/bundles/org.simantics.browsing.ui.common/src/org/simantics/browsing/ui/common/internal/GECache.java b/bundles/org.simantics.browsing.ui.common/src/org/simantics/browsing/ui/common/internal/GECache.java index b5545df00..014a5c0d5 100644 --- a/bundles/org.simantics.browsing.ui.common/src/org/simantics/browsing/ui/common/internal/GECache.java +++ b/bundles/org.simantics.browsing.ui.common/src/org/simantics/browsing/ui/common/internal/GECache.java @@ -174,13 +174,13 @@ public class GECache implements IGECache { private TObjectIntHashMap references = new TObjectIntHashMap(); @Override - public void incRef(NodeContext context) { + synchronized public void incRef(NodeContext context) { int exist = references.get(context); references.put(context, exist+1); } @Override - public void decRef(NodeContext context) { + synchronized public void decRef(NodeContext context) { int exist = references.get(context); references.put(context, exist-1); if(exist == 1) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index cffbba51c..3e955febc 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -5152,7 +5152,7 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - processor.schedule(new SessionTask(this) { + processor.scheduleNow(new SessionTask(this) { @Override public void run0(int thread) { @@ -5232,7 +5232,7 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - processor.schedule(new SessionTask(this) { + processor.scheduleNow(new SessionTask(this) { @Override public void run0(int thread) { @@ -6312,7 +6312,7 @@ public class ReadGraphImpl implements AsyncReadGraph { public boolean performPending() { return processor.performPending(this); } - + public Set ancestorSet() { HashSet result = new HashSet<>(); ReadGraphImpl g = this; @@ -6322,5 +6322,23 @@ public class ReadGraphImpl implements AsyncReadGraph { } return result; } + + public int getLevel() { + return getLevelStatic(this); + } + + private static int getLevelStatic(ReadGraphImpl impl) { + if(impl == null) return 0; + else return 1 + getLevelStatic(impl.parentGraph); + } + + public ReadGraphImpl getTopLevelGraph() { + return getTopLevelGraphStatic(this); + } + + private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) { + if(impl.parentGraph == null) return impl; + else return getTopLevelGraphStatic(impl.parentGraph); + } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java index 72582ee60..e13ecab72 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java @@ -169,7 +169,14 @@ final public class AsyncReadEntry extends CacheEntryBase> i DatabaseException exception; public AsyncTask(ReadGraphImpl graph) { + this(graph, 1); + } + + public AsyncTask(ReadGraphImpl graph, int pos) { super(graph); + this.position = pos; + if(this.position < 1024) + this.position *= 2; } @Override @@ -213,10 +220,15 @@ final public class AsyncReadEntry extends CacheEntryBase> i } throw new IllegalStateException("Eternal loop in queries."); } - graph.processor.schedule(new AsyncTask(graph)); + graph.processor.scheduleLater(new AsyncTask(graph, position)); } } + @Override + public boolean maybeReady() { + return proc.isDone(); + } + } try { @@ -230,7 +242,7 @@ final public class AsyncReadEntry extends CacheEntryBase> i if(needsToBlock) task.run(0); else if (proc.isDone()) task.run(0); else { - graph.processor.schedule(task); + graph.processor.scheduleLater(task); return null; } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 5c18d0510..cc0ca919b 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -670,7 +670,7 @@ public class QueryCache extends QueryCacheBase { } ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(graph) { + graph.processor.scheduleNow(new SessionTask(graph) { @Override public void run0(int thread) { try { @@ -749,7 +749,7 @@ public class QueryCache extends QueryCacheBase { } AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(graph) { + graph.processor.scheduleNow(new SessionTask(graph) { @Override public void run0(int thread) { try { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index e524a080a..9daab950e 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -14,8 +14,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import org.simantics.databoard.Bindings; @@ -41,7 +41,7 @@ public class QueryListening { private THashSet scheduledListeners = new THashSet(); private boolean firingListeners = false; final THashMap> listeners = new THashMap>(10, 0.75f); - private BlockingQueue tasks = new LinkedBlockingQueue(); + private BlockingQueue tasks = new ArrayBlockingQueue(2048); private Map addedEntries = new HashMap<>(); QueryListening(QueryProcessor processor) { @@ -51,10 +51,10 @@ public class QueryListening { public void sync() { Semaphore s = new Semaphore(0); - tasks.add(() -> { - s.release(); - }); try { + tasks.put(() -> { + s.release(); + }); s.acquire(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); @@ -93,26 +93,30 @@ public class QueryListening { void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - tasks.offer(() -> { + try { + tasks.put(() -> { - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - LOGGER.error("Error while registering query dependencies", e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); + if (parent != null && !inferred) { + try { + if(!child.isImmutable(graph)) + child.addParent(parent); + } catch (DatabaseException e) { + LOGGER.error("Error while registering query dependencies", e); + } + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } } } - } - if (listener != null) - registerListener(child, listener, procedure); + if (listener != null) + registerListener(child, listener, procedure); - }); + }); + } catch (InterruptedException e) { + LOGGER.error("Error while registering dependencies", e); + } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 7bfa0f4c6..ee4b11175 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -136,11 +136,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public Session session; final public ResourceSupport resourceSupport; + final public Semaphore requests = new Semaphore(1); + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - -// public ArrayList[] queues; public LinkedList freeScheduling = new LinkedList(); @@ -151,14 +151,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -167,28 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { + if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { + queueLength.decrementAndGet(); return freeScheduling.remove(index); } index++; @@ -197,68 +174,84 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return null; } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl graph) { + SessionTask task = getSubTask(graph); if(task != null) { task.run(QueryProcessor.thread.get()); return true; - } else { - return false; } + return false; } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } - - final public void schedule(SessionTask request) { - - //int performer = request.thread; - -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); + final public void scheduleNow(SessionTask request) { + schedule(request, false); + } - //assert(performer >= 0); + final public void scheduleLater(SessionTask request) { + schedule(request, true); + } + AtomicInteger queueLength = new AtomicInteger(0); + + final public void schedule(SessionTask request, boolean late) { + + int queueLengthEstimate = queueLength.get(); + if(!late && queueLengthEstimate > 80) { + request.run(thread.get()); + return; + } + assert(request != null); + + synchronized(querySupportLock) { -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } - } + if(late) { + int pos = request.position - 1; + if(pos < freeScheduling.size()) { + freeScheduling.add(pos, request); + queueLength.incrementAndGet(); + requests.release(); + } else { + freeScheduling.addLast(request); + queueLength.incrementAndGet(); + requests.release(); + } + } + else { + if(request.getLevel() < 4) { + if(freeScheduling.size() < 100) { + freeScheduling.addFirst(request); + queueLength.incrementAndGet(); + requests.release(); + } else { + request.run(thread.get()); + } + } else { + if(freeScheduling.size() < 20) { + freeScheduling.addFirst(request); + queueLength.incrementAndGet(); + requests.release(); + } else { + request.run(thread.get()); + } + } + } - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + + } } @@ -271,8 +264,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { public final ReadGraphImpl graph; - private Set ancestors; private int counter = 0; + protected int position = 1; private Exception trace; public SessionTask(ReadGraphImpl graph) { @@ -280,13 +273,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap if(graph != null) graph.asyncBarrier.inc(); } - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); - } + public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { + if(r1 == null || r2 == null) return false; + return r1.getTopLevelGraph() == r2.getTopLevelGraph(); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -302,10 +294,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap run0(thread); if(graph != null) graph.asyncBarrier.dec(); } + + public boolean maybeReady() { + return true; + } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(graph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + graph.parent + "]"; + } + + public int getLevel() { + if(graph == null) return 0; + else return graph.getLevel(); } } @@ -1793,6 +1797,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index ab10efa0b..5510944dc 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -22,6 +22,7 @@ class QueryThread extends Thread implements SessionThread { private Session session; private QuerySupport querySupport; private final QueryProcessor processor; + private final Semaphore requests; final private ArrayList tasks = new ArrayList(); // final private ArrayList own; @@ -54,6 +55,7 @@ class QueryThread extends Thread implements SessionThread { sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; + requests = processor.requests; // delayQueues = processor.delayQueues; // executors = processor.executors; // threadLocks = processor.threadLocks; @@ -98,6 +100,7 @@ class QueryThread extends Thread implements SessionThread { private boolean pumpTask() { if(!processor.freeScheduling.isEmpty()) { tasks.add(processor.freeScheduling.removeFirst()); + processor.queueLength.decrementAndGet(); return true; } return false; @@ -155,18 +158,7 @@ class QueryThread extends Thread implements SessionThread { threadStates[index] = ThreadState.SLEEP; - synchronized (querySupportLock) { - querySupportLock.wait(100); - - } - -// boolean woken = condition.await(10, TimeUnit.MILLISECONDS); -// if(!woken) { -// synchronized (querySupportLock) { -// if(!processor.freeScheduling.isEmpty()) -// System.err.println("some tasks are available!"); -// } -// } + requests.acquire(); sleepers.decrementAndGet(); diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/EntityInstances.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/EntityInstances.java index 93903fb9b..ffbec2f5e 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/EntityInstances.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/EntityInstances.java @@ -185,15 +185,17 @@ public class EntityInstances implements Instances { CollectionSupport coll = graph.getService(CollectionSupport.class); THashSet visited = new THashSet<>(); - List rec = findRec(graph, index, filter, visited); + List rec_ = findRec(graph, index, filter, visited); + // We must not modify rec_! + List rec = rec_; for(Resource global : Layer0Utils.listGlobalOntologies(graph)) { if(!visited.add(global)) continue; List rs = graph.syncRequest(new QueryIndex(global, type, filter), TransientCacheListener.>instance()); - if(rec.isEmpty() && !rs.isEmpty()) { - // TODO: rec could be an immutable empty list - rec = new ArrayList(); + if(!rs.isEmpty()) { + if(rec == rec_) + rec = new ArrayList<>(rec); + rec.addAll(rs); } - rec.addAll(rs); } Collection result = coll.asSortedList(rec); return result; diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java index a93476355..fb421b610 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -118,7 +118,7 @@ public class SessionRequestManager { public synchronized void startRead(int thread, final SessionRead task) { - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) { @@ -142,7 +142,7 @@ public class SessionRequestManager { public synchronized void startReadUpdate(int thread) { - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) { @@ -163,7 +163,7 @@ public class SessionRequestManager { public synchronized void startWrite(int thread, final SessionTask task) { - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) { @@ -184,7 +184,7 @@ public class SessionRequestManager { public synchronized void startWriteUpdate(int thread) { - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) { @@ -271,7 +271,7 @@ public class SessionRequestManager { if (!reads.isEmpty()) { final SessionRead read = reads.poll(); - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) { @@ -312,7 +312,7 @@ public class SessionRequestManager { assert(State.INIT != state); if(State.READ == state) { - session.queryProvider2.schedule(new SessionTask(null) { + session.queryProvider2.scheduleNow(new SessionTask(null) { @Override public void run0(int thread) {