package org.simantics.db.indexing; import java.io.IOException; import java.nio.file.Path; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.stream.Stream; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.Version; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.SubMonitor; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.common.request.Adapt; import org.simantics.db.indexing.IndexedRelationsSearcherBase.State; import org.simantics.db.layer0.adapter.GenericRelation; import org.slf4j.LoggerFactory; /** * @author Tuukka Lehtonen * @author Antti Villberg */ public class MemoryIndexing { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MemoryIndexing.class); final private Session session; final Map>>> persistentCache = new ConcurrentHashMap<>(); final Map>> persistentCacheResources = new ConcurrentHashMap<>(); final private ConcurrentHashMap directories = new ConcurrentHashMap<>(); final private ConcurrentHashMap immutableSearchers = new ConcurrentHashMap<>(); final private ConcurrentHashMap searchers = new ConcurrentHashMap<>(); public MemoryIndexing(Session session) { this.session = session; } protected Path getIndexDirectory(Resource relation, Resource input) { return DatabaseIndexing.getIndexLocation(session, relation, input); } public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) { Path location = getIndexDirectory(relation, input); String key = location.toAbsolutePath().toString(); return searchers.computeIfAbsent(key, t -> { try { GenericRelation r = processor.sync(new Adapt(relation, GenericRelation.class)); return new IndexedRelationsSearcher(processor, relation, input, r); } catch (Exception e) { LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e); return null; } }); } public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) { Path location = getIndexDirectory(relation, input); String key = location.toAbsolutePath().toString(); return immutableSearchers.computeIfAbsent(key, t -> { try { return new ImmutableIndexedRelationsSearcher(processor, relation, input); } catch (Exception e) { LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e); return null; } }); } public static MemoryIndexing getInstance(Session session) { MemoryIndexing ret = session.peekService(MemoryIndexing.class); if(ret == null) { ret = new MemoryIndexing(session); session.registerService(MemoryIndexing.class, ret); } return ret; } public Directory getDirectory(String path, Analyzer analyzer) throws IOException { try { return directories.computeIfAbsent(path, t -> { try { RAMDirectory directory = new RAMDirectory(); IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer); new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close(); return directory; } catch (IOException e) { throw new RuntimeException(e); } }); } catch (RuntimeException e) { throw (IOException) e.getCause(); } } public void remove(String path) { directories.remove(path); } public void flush(IProgressMonitor progress) throws Exception { long startTime = System.currentTimeMillis(); SubMonitor monitor = SubMonitor.convert(progress); Collection searcherEntries = searchers.values(); Collection immutableSearcherEntries = immutableSearchers.values(); int count = searcherEntries.size() + immutableSearcherEntries.size(); Semaphore sema = new Semaphore(0); Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> { try { if (base.isIndexAvailable()) { if (base instanceof IndexedRelationsSearcher) { IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache; try { List os = searcher.allDocs(monitor, session); ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os); } catch (Exception e) { LOGGER.error("Could not flush", e); } } } monitor.worked(1); base.changeState(monitor, session, State.READY); } finally { sema.release(); } }); sema.acquire(count); long totalTime = System.currentTimeMillis() - startTime; LOGGER.info("index flush " + totalTime); } }