From da749c56c5caad4252cd27276bf19b08f4dcb778 Mon Sep 17 00:00:00 2001 From: Marko Luukkainen Date: Mon, 28 Feb 2022 10:00:55 +0200 Subject: [PATCH] Add locking for IndexUtils.flushIndexCaches To gain access to index locks, the flush implementation needed to be moved from MemoryIndexing into IndexedRelationsImpl. IndexedRelations.flush was added to the interface as well. Merged master/85ee3813 to release/1.43.1 gitlab #807 Change-Id: I66b149dfda378b33ff7574036cabbe86f96beb78 --- .../org/simantics/db/indexing/IndexUtils.java | 15 +++--- .../db/indexing/IndexedRelationsImpl.java | 39 ++++++++++++++ .../IndexedRelationsSearcherBase.java | 10 +++- .../simantics/db/indexing/MemoryIndexing.java | 53 +++++++------------ .../genericrelation/IndexedRelations.java | 13 +++++ 5 files changed, 86 insertions(+), 44 deletions(-) diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexUtils.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexUtils.java index 760acd68c..0cb5d9c4d 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexUtils.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexUtils.java @@ -119,23 +119,20 @@ public class IndexUtils { } return results; } - - public static void flushIndexCaches(IProgressMonitor progress, Session session) throws Exception { - MemoryIndexing mem = MemoryIndexing.getInstance(session); - mem.flush(progress); - + public static void flushIndexCaches(IProgressMonitor monitor, Session session) throws Exception { + session.getService(IndexedRelations.class).flush(monitor, session); } - + public static List list(IProgressMonitor progress, Session session, Resource indexRoot) throws Exception { - + if(progress == null) progress = new NullProgressMonitor(); MemoryIndexing mem = MemoryIndexing.getInstance(session); Layer0X L0X = Layer0X.getInstance(session); - mem.flush(progress); - + session.getService(IndexedRelations.class).flush(progress, session); + IndexedRelationsSearcher searcher = mem.get(session, L0X.DependenciesRelation, indexRoot); List results; try { diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java index b2b65a67f..4caa10252 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java @@ -23,6 +23,7 @@ import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -32,6 +33,7 @@ import org.eclipse.core.runtime.SubMonitor; import org.simantics.db.ReadGraph; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; +import org.simantics.db.Session; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.common.request.UniqueRead; import org.simantics.db.common.utils.NameUtils; @@ -627,4 +629,41 @@ public class IndexedRelationsImpl implements IndexedRelations { } } + @Override + public void flush(IProgressMonitor progress, Session session) throws IndexException { + long startTime = System.currentTimeMillis(); + SubMonitor monitor = SubMonitor.convert(progress); + MemoryIndexing mem = MemoryIndexing.getInstance(session); + + try { + List searchers = mem.getAllSearchers(); + int count = searchers.size(); + Semaphore sema = new Semaphore(0); + searchers.stream().parallel().forEach(s -> { + LockHandle handle = lock(session, Pair.make(s.getRelation(), s.getInput()), true); + try { + if (s.isIndexAvailable() && s instanceof IndexedRelationsSearcher) { + IndexedRelationsSearcher searcher = (IndexedRelationsSearcher) s; + try { + List os = searcher.cache.allDocs(monitor, session); + searcher.applyChanges(monitor, session, searcher.cache.r, os); + } catch (Exception e) { + LOGGER.error("Could not flush in-memory changes to on-disk index", e); + } + } + monitor.worked(1); + s.changeState(monitor, session, State.READY); + } finally { + handle.unlock(); + sema.release(); + } + }); + sema.acquire(count); + long totalTime = System.currentTimeMillis() - startTime; + LOGGER.info("Database index cache flush done in {} ms", totalTime); + } catch (InterruptedException e) { + LOGGER.error("Index searcher flush interrupted", e); + } + } + } diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java index 5df5d68c1..dad5ad07b 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java @@ -505,7 +505,7 @@ abstract public class IndexedRelationsSearcherBase { */ final IndexSchema schema; - Resource input; + final Resource input; Path indexPath; @@ -530,6 +530,14 @@ abstract public class IndexedRelationsSearcherBase { this.schema = IndexSchema.readFromRelation(session, relation); } + public Resource getRelation() { + return relation; + } + + public Resource getInput() { + return input; + } + Directory getDirectory(Session session) throws IOException { return FSDirectory.open(indexPath.toFile()); } diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java index cee5eb26b..7f86d2bc1 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java @@ -2,12 +2,10 @@ package org.simantics.db.indexing; import java.io.IOException; import java.nio.file.Path; -import java.util.Collection; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; -import java.util.stream.Stream; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; @@ -17,13 +15,13 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.Version; import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.SubMonitor; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Session; +import org.simantics.db.common.SimanticsInternal; import org.simantics.db.common.request.Adapt; -import org.simantics.db.indexing.IndexedRelationsSearcherBase.State; import org.simantics.db.layer0.adapter.GenericRelation; +import org.simantics.db.layer0.genericrelation.IndexedRelations; import org.slf4j.LoggerFactory; /** @@ -52,6 +50,13 @@ public class MemoryIndexing { return DatabaseIndexing.getIndexLocation(session, relation, input); } + List getAllSearchers() { + List r = new ArrayList<>(); + r.addAll(searchers.values()); + r.addAll(immutableSearchers.values()); + return r; + } + public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) { Path location = getIndexDirectory(relation, input); String key = location.toAbsolutePath().toString(); @@ -109,35 +114,15 @@ public class MemoryIndexing { directories.remove(path); } + /** + * @param progress + * @throws Exception + * @deprecated Use {@link IndexUtils#flushIndexCaches(IProgressMonitor, Session)} instead + */ + @Deprecated public void flush(IProgressMonitor progress) throws Exception { - long startTime = System.currentTimeMillis(); - SubMonitor monitor = SubMonitor.convert(progress); - Collection searcherEntries = searchers.values(); - Collection immutableSearcherEntries = immutableSearchers.values(); - int count = searcherEntries.size() + immutableSearcherEntries.size(); - Semaphore sema = new Semaphore(0); - Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> { - try { - if (base.isIndexAvailable()) { - if (base instanceof IndexedRelationsSearcher) { - IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache; - try { - List os = searcher.allDocs(monitor, session); - ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os); - } catch (Exception e) { - LOGGER.error("Could not flush", e); - } - } - } - monitor.worked(1); - base.changeState(monitor, session, State.READY); - } finally { - sema.release(); - } - }); - sema.acquire(count); - long totalTime = System.currentTimeMillis() - startTime; - LOGGER.info("index flush " + totalTime); + Session s = SimanticsInternal.getSession(); + s.getService(IndexedRelations.class).flush(progress, s); } - + } diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/genericrelation/IndexedRelations.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/genericrelation/IndexedRelations.java index c334691d3..a7b08deb3 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/genericrelation/IndexedRelations.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/genericrelation/IndexedRelations.java @@ -18,6 +18,7 @@ import java.util.Map; import org.eclipse.core.runtime.IProgressMonitor; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; +import org.simantics.db.Session; import org.simantics.db.layer0.adapter.GenericRelation; /** @@ -98,4 +99,16 @@ public interface IndexedRelations { public void fullRebuild(IProgressMonitor monitor, RequestProcessor processor) throws IndexException; + /** + * Flush any memory-based index caches to disk and close down any resources + * related to each separate index. + * + * Must be invoked outside of any database transactions. + * + * @param monitor + * @param processor + * @throws IndexException + */ + public void flush(IProgressMonitor monitor, Session session) throws IndexException; + } -- 2.47.1