X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=f61c661d19cbd7eb47fbf15de27ef4bb9035627c;hb=86617be247efb3b904b3180d0569049aff232d75;hp=7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb;hpb=0db1af75cd14f99f16c42c920562708abf37be93;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 7bfa0f4c6..f61c661d1 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.DevelopmentKeys; @@ -42,6 +43,7 @@ import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; +import org.simantics.db.common.ByteFileReader; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; @@ -68,9 +70,11 @@ import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.RequestFlags; +import org.simantics.db.service.Bytes; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; +import org.simantics.utils.FileUtils; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.simantics.utils.datastructures.disposable.AbstractDisposable; @@ -136,29 +140,21 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public Session session; final public ResourceSupport resourceSupport; + final public Semaphore requests = new Semaphore(1); + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; -// public ArrayList[] queues; - - public LinkedList freeScheduling = new LinkedList(); - enum ThreadState { INIT, RUN, SLEEP, DISPOSED } + final Scheduling scheduling; + public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -167,100 +163,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl under) { + SessionTask task = scheduling.getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; - } else { - return false; } + return false; } + + final public void scheduleNow(SessionTask request) { + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); + } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } - - final public void schedule(SessionTask request) { - - //int performer = request.thread; - -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); - - //assert(performer >= 0); - - assert(request != null); - -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); - - } - - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { + + return scheduling.scheduleOrReturnForExecution(request); - } + } final int THREADS; @@ -270,23 +197,24 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; - private Set ancestors; + final protected ReadGraphImpl rootGraph; private int counter = 0; + protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); - } - - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); + public SessionTask() { + this(null); } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -300,12 +228,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); + } + + public boolean maybeReady() { + return true; } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(rootGraph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -323,40 +257,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -364,31 +276,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { @@ -1182,6 +1073,82 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } + public synchronized void save() throws IOException { + + long start = System.nanoTime(); + + Collection caches = allCaches(new CacheCollectionResult()).toCollection(); + Map> cachesByCluster = new HashMap<>(); + for(CacheEntryBase entry : caches) { + String clazz = entry.classId(); + if(clazz == null) + continue; + long cluster = entry.cluster(this); + List queries = cachesByCluster.get(cluster); + if(queries == null) { + queries = new ArrayList<>(); + cachesByCluster.put(cluster, queries); + } + queries.add(entry); + } + + File workspace = Platform.getLocation().toFile(); + File dir = new File(workspace, "queryData"); + FileUtils.deleteAll(dir); + + dir.mkdir(); + + for(Long cluster : cachesByCluster.keySet()) { + + List queries = cachesByCluster.get(cluster); + QuerySerializer serializer = new QuerySerializer(this); + int count = 0; + int pos = serializer.writeUnknownSize(); + for(CacheEntryBase entry : queries) { + String clazz = entry.classId(); + if(clazz == null) + continue; + try { + entry.serialize(serializer); + count++; + } catch (IllegalStateException e) { + System.err.println(e.getMessage()); + } + } + serializer.setUnknownSize(pos, count); + + System.err.println(serializer.bytes().length + " bytes for cluster " + cluster); + FileUtils.writeFile(new File(dir, "" + cluster + ".queryData"), serializer.bytes()); + + } + + long end = System.nanoTime(); + + System.err.println("saved queries in " + 1e-6*(end-start) + "ms."); + + } + + public void restore() throws IOException { + + long start = System.nanoTime(); + + File workspace = Platform.getLocation().toFile(); + File dir = new File(workspace, "queryData"); + dir.mkdir(); + + for(File f : FileUtils.listFilesByExtension(dir, "queryData")) { + byte[] bytes = FileUtils.readFile(f); + QueryDeserializer qd = new QueryDeserializer(this, bytes); + qd.readHeaders(); + qd.readQueries(); + } + + long end = System.nanoTime(); + + System.err.println("restored queries in " + 1e-6*(end-start) + "ms."); + + } + boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1392,8 +1359,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); ReadGraphImpl parentGraph = graph.forRecompute(entry); - + parentGraph.asyncBarrier.inc(); query.recompute(parentGraph); + parentGraph.asyncBarrier.dec(); if(entry.isExcepted()) return ListenerEntry.NO_VALUE; @@ -1793,6 +1761,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); } @@ -1870,6 +1840,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } return result; } + + public long cluster(int resource) { + if(resource <= 0) + return 0; + return querySupport.getClusterId(resource); + } public void assertDone() { } @@ -1966,12 +1942,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support; @@ -4164,6 +4140,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return L0; } + public Layer0 getL0() { + return L0; + } + public static ThreadLocal thread = new ThreadLocal() { protected Integer initialValue() { return -1;