1 package org.simantics.db.indexing;
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.Collection;
8 import java.util.concurrent.ConcurrentHashMap;
9 import java.util.concurrent.Semaphore;
10 import java.util.stream.Stream;
12 import org.apache.lucene.analysis.Analyzer;
13 import org.apache.lucene.index.IndexWriter;
14 import org.apache.lucene.index.IndexWriterConfig;
15 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
16 import org.apache.lucene.store.Directory;
17 import org.apache.lucene.store.RAMDirectory;
18 import org.apache.lucene.util.Version;
19 import org.eclipse.core.runtime.IProgressMonitor;
20 import org.eclipse.core.runtime.SubMonitor;
21 import org.simantics.db.RequestProcessor;
22 import org.simantics.db.Resource;
23 import org.simantics.db.Session;
24 import org.simantics.db.common.request.Adapt;
25 import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;
26 import org.simantics.db.layer0.adapter.GenericRelation;
27 import org.slf4j.LoggerFactory;
30 * @author Tuukka Lehtonen
31 * @author Antti Villberg
33 public class MemoryIndexing {
35 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MemoryIndexing.class);
37 final private Session session;
39 final Map<String,Map<String,List<Map<String, Object>>>> persistentCache = new ConcurrentHashMap<>();
40 final Map<String,Map<String,List<Resource>>> persistentCacheResources = new ConcurrentHashMap<>();
42 final private ConcurrentHashMap<String,RAMDirectory> directories = new ConcurrentHashMap<>();
44 final private ConcurrentHashMap<String,IndexedRelationsSearcherBase> immutableSearchers = new ConcurrentHashMap<>();
45 final private ConcurrentHashMap<String,IndexedRelationsSearcher> searchers = new ConcurrentHashMap<>();
47 public MemoryIndexing(Session session) {
48 this.session = session;
51 protected Path getIndexDirectory(Resource relation, Resource input) {
52 return DatabaseIndexing.getIndexLocation(session, relation, input);
55 public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) {
56 Path location = getIndexDirectory(relation, input);
57 String key = location.toAbsolutePath().toString();
58 return searchers.computeIfAbsent(key, t -> {
60 GenericRelation r = processor.sync(new Adapt<GenericRelation>(relation, GenericRelation.class));
61 return new IndexedRelationsSearcher(processor, relation, input, r);
62 } catch (Exception e) {
63 LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e);
69 public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) {
70 Path location = getIndexDirectory(relation, input);
71 String key = location.toAbsolutePath().toString();
72 return immutableSearchers.computeIfAbsent(key, t -> {
74 return new ImmutableIndexedRelationsSearcher(processor, relation, input);
75 } catch (Exception e) {
76 LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e);
82 public static MemoryIndexing getInstance(Session session) {
83 MemoryIndexing ret = session.peekService(MemoryIndexing.class);
85 ret = new MemoryIndexing(session);
86 session.registerService(MemoryIndexing.class, ret);
91 public Directory getDirectory(String path, Analyzer analyzer) throws IOException {
93 return directories.computeIfAbsent(path, t -> {
95 RAMDirectory directory = new RAMDirectory();
96 IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer);
97 new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close();
99 } catch (IOException e) {
100 throw new RuntimeException(e);
103 } catch (RuntimeException e) {
104 throw (IOException) e.getCause();
108 public void remove(String path) {
109 directories.remove(path);
112 public void flush(IProgressMonitor progress) throws Exception {
113 long startTime = System.currentTimeMillis();
114 SubMonitor monitor = SubMonitor.convert(progress);
115 Collection<IndexedRelationsSearcher> searcherEntries = searchers.values();
116 Collection<IndexedRelationsSearcherBase> immutableSearcherEntries = immutableSearchers.values();
117 int count = searcherEntries.size() + immutableSearcherEntries.size();
118 Semaphore sema = new Semaphore(0);
119 Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> {
121 if (base.isIndexAvailable()) {
122 if (base instanceof IndexedRelationsSearcher) {
123 IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache;
125 List<Object[]> os = searcher.allDocs(monitor, session);
126 ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os);
127 } catch (Exception e) {
128 LOGGER.error("Could not flush", e);
133 base.changeState(monitor, session, State.READY);
139 long totalTime = System.currentTimeMillis() - startTime;
140 LOGGER.info("index flush " + totalTime);