]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Add locking for IndexUtils.flushIndexCaches 01/4901/1
authorMarko Luukkainen <marko.luukkainen@semantum.fi>
Mon, 28 Feb 2022 08:00:55 +0000 (10:00 +0200)
committerMarko Luukkainen <marko.luukkainen@semantum.fi>
Mon, 28 Feb 2022 08:00:55 +0000 (10:00 +0200)
To gain access to index locks, the flush implementation needed to be
moved from MemoryIndexing into IndexedRelationsImpl.
IndexedRelations.flush was added to the interface as well.

Merged master/85ee3813 to release/1.43.1

gitlab #807

Change-Id: I66b149dfda378b33ff7574036cabbe86f96beb78

bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexUtils.java
bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java
bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java
bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java
bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/genericrelation/IndexedRelations.java

index 760acd68c465578b269cdf12e164dafbb65a2516..0cb5d9c4d9f5a78fedb3d8038bc1785bcbc6bb88 100644 (file)
@@ -119,23 +119,20 @@ public class IndexUtils {
         }
         return results;
     }
-    
-    public static void flushIndexCaches(IProgressMonitor progress, Session session) throws Exception {
 
-       MemoryIndexing mem = MemoryIndexing.getInstance(session);
-       mem.flush(progress);
-       
+    public static void flushIndexCaches(IProgressMonitor monitor, Session session) throws Exception {
+        session.getService(IndexedRelations.class).flush(monitor, session);
     }
-    
+
     public static List<Object> list(IProgressMonitor progress, Session session, Resource indexRoot) throws Exception {
-       
+
        if(progress == null) progress = new NullProgressMonitor();
 
        MemoryIndexing mem = MemoryIndexing.getInstance(session);
        Layer0X L0X = Layer0X.getInstance(session);
 
-       mem.flush(progress);
-               
+       session.getService(IndexedRelations.class).flush(progress, session);
+
        IndexedRelationsSearcher searcher = mem.get(session, L0X.DependenciesRelation, indexRoot);
         List<Object> results;
         try {
index b2b65a67f74a3e4725fc51a5d29257e6849e123a..4caa102529d4adb5851fc799377a42637719abf5 100644 (file)
@@ -23,6 +23,7 @@ import java.util.WeakHashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -32,6 +33,7 @@ import org.eclipse.core.runtime.SubMonitor;
 import org.simantics.db.ReadGraph;
 import org.simantics.db.RequestProcessor;
 import org.simantics.db.Resource;
+import org.simantics.db.Session;
 import org.simantics.db.common.request.ReadRequest;
 import org.simantics.db.common.request.UniqueRead;
 import org.simantics.db.common.utils.NameUtils;
@@ -627,4 +629,41 @@ public class IndexedRelationsImpl implements IndexedRelations {
         }
     }
 
+    @Override
+    public void flush(IProgressMonitor progress, Session session) throws IndexException {
+        long startTime = System.currentTimeMillis();
+        SubMonitor monitor = SubMonitor.convert(progress);
+        MemoryIndexing mem = MemoryIndexing.getInstance(session);
+
+        try {
+            List<IndexedRelationsSearcherBase> searchers = mem.getAllSearchers();
+            int count = searchers.size();
+            Semaphore sema = new Semaphore(0);
+            searchers.stream().parallel().forEach(s -> {
+                LockHandle handle = lock(session, Pair.make(s.getRelation(), s.getInput()), true);
+                try {
+                    if (s.isIndexAvailable() && s instanceof IndexedRelationsSearcher) {
+                        IndexedRelationsSearcher searcher = (IndexedRelationsSearcher) s;
+                        try {
+                            List<Object[]> os = searcher.cache.allDocs(monitor, session);
+                            searcher.applyChanges(monitor, session, searcher.cache.r, os);
+                        } catch (Exception e) {
+                            LOGGER.error("Could not flush in-memory changes to on-disk index", e);
+                        }
+                    }
+                    monitor.worked(1);
+                    s.changeState(monitor, session, State.READY);
+                } finally {
+                    handle.unlock();
+                    sema.release();
+                }
+            });
+            sema.acquire(count);
+            long totalTime = System.currentTimeMillis() - startTime;
+            LOGGER.info("Database index cache flush done in {} ms", totalTime);
+        } catch (InterruptedException e) {
+            LOGGER.error("Index searcher flush interrupted", e);
+        }
+    }
+
 }
index 5df5d68c114d37dc98de59bd6a45100e81dfda3d..dad5ad07b13ea49011ea03d0d70b5a254c62d32c 100644 (file)
@@ -505,7 +505,7 @@ abstract public class IndexedRelationsSearcherBase {
      */
     final IndexSchema      schema;
 
-    Resource         input;
+    final Resource         input;
 
     Path             indexPath;
 
@@ -530,6 +530,14 @@ abstract public class IndexedRelationsSearcherBase {
         this.schema = IndexSchema.readFromRelation(session, relation);
     }
 
+    public Resource getRelation() {
+        return relation;
+    }
+
+    public Resource getInput() {
+        return input;
+    }
+
     Directory getDirectory(Session session) throws IOException {
         return FSDirectory.open(indexPath.toFile());
     }
index cee5eb26bfb616799c522baca31cf186b2a46a17..7f86d2bc1635fe762d40c4321632448ab916ff87 100644 (file)
@@ -2,12 +2,10 @@ package org.simantics.db.indexing;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.stream.Stream;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.IndexWriter;
@@ -17,13 +15,13 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.RAMDirectory;
 import org.apache.lucene.util.Version;
 import org.eclipse.core.runtime.IProgressMonitor;
-import org.eclipse.core.runtime.SubMonitor;
 import org.simantics.db.RequestProcessor;
 import org.simantics.db.Resource;
 import org.simantics.db.Session;
+import org.simantics.db.common.SimanticsInternal;
 import org.simantics.db.common.request.Adapt;
-import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;
 import org.simantics.db.layer0.adapter.GenericRelation;
+import org.simantics.db.layer0.genericrelation.IndexedRelations;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -52,6 +50,13 @@ public class MemoryIndexing {
         return DatabaseIndexing.getIndexLocation(session, relation, input);
     }
 
+    List<IndexedRelationsSearcherBase> getAllSearchers() {
+        List<IndexedRelationsSearcherBase> r = new ArrayList<>();
+        r.addAll(searchers.values());
+        r.addAll(immutableSearchers.values());
+        return r;
+    }
+
     public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) {
         Path location = getIndexDirectory(relation, input);
         String key = location.toAbsolutePath().toString();
@@ -109,35 +114,15 @@ public class MemoryIndexing {
         directories.remove(path);
     }
 
+    /**
+     * @param progress
+     * @throws Exception
+     * @deprecated Use {@link IndexUtils#flushIndexCaches(IProgressMonitor, Session)} instead
+     */
+    @Deprecated
     public void flush(IProgressMonitor progress) throws Exception {
-        long startTime = System.currentTimeMillis();
-        SubMonitor monitor = SubMonitor.convert(progress);
-        Collection<IndexedRelationsSearcher> searcherEntries = searchers.values();
-        Collection<IndexedRelationsSearcherBase> immutableSearcherEntries = immutableSearchers.values();
-        int count = searcherEntries.size() + immutableSearcherEntries.size();
-        Semaphore sema = new Semaphore(0);
-        Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> {
-            try {
-                if (base.isIndexAvailable()) {
-                    if (base instanceof IndexedRelationsSearcher) {
-                        IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache;
-                        try {
-                            List<Object[]> os = searcher.allDocs(monitor, session);
-                            ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os);
-                        } catch (Exception e) {
-                            LOGGER.error("Could not flush", e);
-                        }
-                    }
-                }
-                monitor.worked(1);
-                base.changeState(monitor, session, State.READY);
-            } finally {
-                sema.release();
-            }
-        });
-        sema.acquire(count);
-        long totalTime = System.currentTimeMillis() - startTime;
-        LOGGER.info("index flush " + totalTime);
+        Session s = SimanticsInternal.getSession();
+        s.getService(IndexedRelations.class).flush(progress, s);
     }
-    
+
 }
index c334691d34130696df26c16f6dbfc18349031090..a7b08deb3290775e723d830f8a6f36588ee579e2 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Map;
 import org.eclipse.core.runtime.IProgressMonitor;
 import org.simantics.db.RequestProcessor;
 import org.simantics.db.Resource;
+import org.simantics.db.Session;
 import org.simantics.db.layer0.adapter.GenericRelation;
 
 /**
@@ -98,4 +99,16 @@ public interface IndexedRelations {
 
     public void fullRebuild(IProgressMonitor monitor, RequestProcessor processor) throws IndexException;
 
+    /**
+     * Flush any memory-based index caches to disk and close down any resources
+     * related to each separate index.
+     * 
+     * Must be invoked outside of any database transactions.
+     * 
+     * @param monitor
+     * @param processor
+     * @throws IndexException
+     */
+    public void flush(IProgressMonitor monitor, Session session) throws IndexException;
+
 }