X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=f61c661d19cbd7eb47fbf15de27ef4bb9035627c;hb=ae2e31aa5eb35410e5b2ce222d42421154f3fecc;hp=eaaa9b3730976265c0673dc4547bbe35b3d4904c;hpb=785f638bab44e70ec6103c3891daea95bcda9a07;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index eaaa9b373..f61c661d1 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.DevelopmentKeys; @@ -42,6 +43,7 @@ import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; +import org.simantics.db.common.ByteFileReader; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; @@ -68,9 +70,11 @@ import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.RequestFlags; +import org.simantics.db.service.Bytes; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; +import org.simantics.utils.FileUtils; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.simantics.utils.datastructures.disposable.AbstractDisposable; @@ -141,10 +145,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - - public LinkedList freeScheduling = new LinkedList(); - - public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -152,6 +152,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } + final Scheduling scheduling; + public ThreadState[] threadStates; final Object querySupportLock; @@ -161,26 +163,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl parent) { - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.isSubtask(parent) && task.maybeReady()) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } /* * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ public boolean performPending(ReadGraphImpl under) { - SessionTask task = getSubTask(under); + SessionTask task = scheduling.getSubTask(under); if(task != null) { task.run(thread.get()); return true; @@ -195,28 +184,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - - assert(request != null); - - synchronized(querySupportLock) { - - LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - queue.addFirst(request); - requests.release(); - - } - - return null; + + return scheduling.scheduleOrReturnForExecution(request); } @@ -298,6 +267,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -1102,6 +1073,82 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } + public synchronized void save() throws IOException { + + long start = System.nanoTime(); + + Collection caches = allCaches(new CacheCollectionResult()).toCollection(); + Map> cachesByCluster = new HashMap<>(); + for(CacheEntryBase entry : caches) { + String clazz = entry.classId(); + if(clazz == null) + continue; + long cluster = entry.cluster(this); + List queries = cachesByCluster.get(cluster); + if(queries == null) { + queries = new ArrayList<>(); + cachesByCluster.put(cluster, queries); + } + queries.add(entry); + } + + File workspace = Platform.getLocation().toFile(); + File dir = new File(workspace, "queryData"); + FileUtils.deleteAll(dir); + + dir.mkdir(); + + for(Long cluster : cachesByCluster.keySet()) { + + List queries = cachesByCluster.get(cluster); + QuerySerializer serializer = new QuerySerializer(this); + int count = 0; + int pos = serializer.writeUnknownSize(); + for(CacheEntryBase entry : queries) { + String clazz = entry.classId(); + if(clazz == null) + continue; + try { + entry.serialize(serializer); + count++; + } catch (IllegalStateException e) { + System.err.println(e.getMessage()); + } + } + serializer.setUnknownSize(pos, count); + + System.err.println(serializer.bytes().length + " bytes for cluster " + cluster); + FileUtils.writeFile(new File(dir, "" + cluster + ".queryData"), serializer.bytes()); + + } + + long end = System.nanoTime(); + + System.err.println("saved queries in " + 1e-6*(end-start) + "ms."); + + } + + public void restore() throws IOException { + + long start = System.nanoTime(); + + File workspace = Platform.getLocation().toFile(); + File dir = new File(workspace, "queryData"); + dir.mkdir(); + + for(File f : FileUtils.listFilesByExtension(dir, "queryData")) { + byte[] bytes = FileUtils.readFile(f); + QueryDeserializer qd = new QueryDeserializer(this, bytes); + qd.readHeaders(); + qd.readQueries(); + } + + long end = System.nanoTime(); + + System.err.println("restored queries in " + 1e-6*(end-start) + "ms."); + + } + boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1312,8 +1359,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); ReadGraphImpl parentGraph = graph.forRecompute(entry); - + parentGraph.asyncBarrier.inc(); query.recompute(parentGraph); + parentGraph.asyncBarrier.dec(); if(entry.isExcepted()) return ListenerEntry.NO_VALUE; @@ -1792,6 +1840,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } return result; } + + public long cluster(int resource) { + if(resource <= 0) + return 0; + return querySupport.getClusterId(resource); + } public void assertDone() { } @@ -1888,12 +1942,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support; @@ -4086,6 +4140,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return L0; } + public Layer0 getL0() { + return L0; + } + public static ThreadLocal thread = new ThreadLocal() { protected Integer initialValue() { return -1;