import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.simantics.db.ReadGraph;
import org.simantics.db.RequestProcessor;
import org.simantics.db.Resource;
+import org.simantics.db.Session;
import org.simantics.db.common.request.ReadRequest;
import org.simantics.db.common.request.UniqueRead;
import org.simantics.db.common.utils.NameUtils;
}
}
+ @Override
+ public void flush(IProgressMonitor progress, Session session) throws IndexException {
+ long startTime = System.currentTimeMillis();
+ SubMonitor monitor = SubMonitor.convert(progress);
+ MemoryIndexing mem = MemoryIndexing.getInstance(session);
+
+ try {
+ List<IndexedRelationsSearcherBase> searchers = mem.getAllSearchers();
+ int count = searchers.size();
+ Semaphore sema = new Semaphore(0);
+ searchers.stream().parallel().forEach(s -> {
+ LockHandle handle = lock(session, Pair.make(s.getRelation(), s.getInput()), true);
+ try {
+ if (s.isIndexAvailable() && s instanceof IndexedRelationsSearcher) {
+ IndexedRelationsSearcher searcher = (IndexedRelationsSearcher) s;
+ try {
+ List<Object[]> os = searcher.cache.allDocs(monitor, session);
+ searcher.applyChanges(monitor, session, searcher.cache.r, os);
+ } catch (Exception e) {
+ LOGGER.error("Could not flush in-memory changes to on-disk index", e);
+ }
+ }
+ monitor.worked(1);
+ s.changeState(monitor, session, State.READY);
+ } finally {
+ handle.unlock();
+ sema.release();
+ }
+ });
+ sema.acquire(count);
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("Database index cache flush done in {} ms", totalTime);
+ } catch (InterruptedException e) {
+ LOGGER.error("Index searcher flush interrupted", e);
+ }
+ }
+
}