]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
DB query swapping to file system
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index eaaa9b3730976265c0673dc4547bbe35b3d4904c..f61c661d19cbd7eb47fbf15de27ef4bb9035627c 100644 (file)
@@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.eclipse.core.runtime.Platform;
 import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.DevelopmentKeys;
@@ -42,6 +43,7 @@ import org.simantics.db.Resource;
 import org.simantics.db.Session;
 import org.simantics.db.Statement;
 import org.simantics.db.VirtualGraph;
+import org.simantics.db.common.ByteFileReader;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.DatabaseException;
@@ -68,9 +70,11 @@ import org.simantics.db.request.AsyncMultiRead;
 import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.MultiRead;
 import org.simantics.db.request.RequestFlags;
+import org.simantics.db.service.Bytes;
 import org.simantics.layer0.Layer0;
 import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
+import org.simantics.utils.FileUtils;
 import org.simantics.utils.datastructures.Pair;
 import org.simantics.utils.datastructures.collections.CollectionUtils;
 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
@@ -141,10 +145,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
-       
-       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
-    
-    public LinkedList<SessionTask>                           topLevelTasks = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -152,6 +152,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
+       final Scheduling scheduling;
+       
        public ThreadState[]                                                                    threadStates;
        
        final Object querySupportLock;
@@ -161,26 +163,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-    public SessionTask getSubTask(ReadGraphImpl parent) {
-        synchronized(querySupportLock) {
-            int index = 0;
-            while(index < freeScheduling.size()) {
-                SessionTask task = freeScheduling.get(index);
-                if(task.isSubtask(parent) && task.maybeReady()) {
-                    return freeScheduling.remove(index);
-                }
-                index++;
-            }
-        }
-        return null;
-    }
 
     /*
      * We are running errands while waiting for requests to complete.
      * We can only run work that is part of the current root request to avoid any deadlocks
      */
     public boolean performPending(ReadGraphImpl under) {
-        SessionTask task = getSubTask(under);
+        SessionTask task = scheduling.getSubTask(under);
                if(task != null) {
                        task.run(thread.get());
                        return true;
@@ -195,28 +184,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
     }
 
     final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
-
-        assert(request != null);
-
-        synchronized(querySupportLock) {
-
-            LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
-            
-            if(BarrierTracing.BOOKKEEPING) {
-                Exception current = new Exception();
-                Exception previous = BarrierTracing.tasks.put(request, current);
-                if(previous != null) {
-                    previous.printStackTrace();
-                    current.printStackTrace();
-                }
-            }
-
-            queue.addFirst(request);
-            requests.release();
-
-        }
-
-        return null;
+        
+        return scheduling.scheduleOrReturnForExecution(request);
 
     }
 
@@ -298,6 +267,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
+               scheduling = new Scheduling(requests);
+               
                querySupport = core;
                cache = new QueryCache(core, threads);
                session = querySupport.getSession();
@@ -1102,6 +1073,82 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
+    public synchronized void save() throws IOException {
+
+        long start = System.nanoTime();
+
+        Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
+        Map<Long,List<CacheEntryBase>> cachesByCluster = new HashMap<>();
+        for(CacheEntryBase entry : caches) {
+            String clazz = entry.classId();
+            if(clazz == null)
+                continue;
+            long cluster = entry.cluster(this);
+            List<CacheEntryBase> queries = cachesByCluster.get(cluster);
+            if(queries == null) {
+                queries = new ArrayList<>();
+                cachesByCluster.put(cluster, queries);
+            }
+            queries.add(entry);
+        }
+
+        File workspace = Platform.getLocation().toFile();
+        File dir = new File(workspace, "queryData");
+        FileUtils.deleteAll(dir);
+
+        dir.mkdir();
+
+        for(Long cluster : cachesByCluster.keySet()) {
+
+            List<CacheEntryBase> queries = cachesByCluster.get(cluster);
+            QuerySerializer serializer = new QuerySerializer(this);
+            int count = 0;
+            int pos = serializer.writeUnknownSize();
+            for(CacheEntryBase entry : queries) {
+                String clazz = entry.classId();
+                if(clazz == null)
+                    continue;
+                try {
+                    entry.serialize(serializer);
+                    count++;
+                } catch (IllegalStateException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+            serializer.setUnknownSize(pos, count);
+
+            System.err.println(serializer.bytes().length + " bytes for cluster " + cluster);
+            FileUtils.writeFile(new File(dir, "" + cluster + ".queryData"), serializer.bytes());
+
+        }
+
+        long end = System.nanoTime();
+
+        System.err.println("saved queries in " + 1e-6*(end-start) + "ms.");
+
+    }
+
+    public void restore() throws IOException {
+
+        long start = System.nanoTime();
+
+        File workspace = Platform.getLocation().toFile();
+        File dir = new File(workspace, "queryData");
+        dir.mkdir();
+
+        for(File f : FileUtils.listFilesByExtension(dir, "queryData")) {
+            byte[] bytes = FileUtils.readFile(f);
+            QueryDeserializer qd = new QueryDeserializer(this, bytes);
+            qd.readHeaders();
+            qd.readQueries();
+        }
+
+        long end = System.nanoTime();
+
+        System.err.println("restored queries in " + 1e-6*(end-start) + "ms.");
+
+    }
+
        boolean removeQuery(CacheEntry entry) {
 
                // This entry has been removed before. No need to do anything here.
@@ -1312,8 +1359,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        entry.prepareRecompute(querySupport);
                        
                        ReadGraphImpl parentGraph = graph.forRecompute(entry);
-
+                       parentGraph.asyncBarrier.inc();
                        query.recompute(parentGraph);
+                       parentGraph.asyncBarrier.dec();
 
                        if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
 
@@ -1792,6 +1840,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                }
                return result;
        }
+       
+       public long cluster(int resource) {
+           if(resource <= 0)
+               return 0;
+           return querySupport.getClusterId(resource);
+       }
 
        public void assertDone() {
        }
@@ -1888,12 +1942,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        Exception callerException = null;
 
-       public interface AsyncBarrier {
-               public void inc(); 
-               public void dec();
-               //        public void inc(String debug); 
-               //        public void dec(String debug);
-       }
+    public interface AsyncBarrier {
+        public void inc(); 
+        public void dec();
+        public void waitBarrier(Object request, ReadGraphImpl impl);
+        public boolean isBlocking();
+    }
 
 //     final public QueryProcessor processor;
 //     final public QuerySupport support;
@@ -4086,6 +4140,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                return L0;
        }
 
+       public Layer0 getL0() {
+               return L0;
+       }
+
     public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
         protected Integer initialValue() {
             return -1;