-package org.simantics.db.indexing;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.util.HashMap;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-\r
-import org.apache.lucene.analysis.Analyzer;\r
-import org.apache.lucene.index.IndexWriter;\r
-import org.apache.lucene.index.IndexWriterConfig;\r
-import org.apache.lucene.index.IndexWriterConfig.OpenMode;\r
-import org.apache.lucene.store.Directory;\r
-import org.apache.lucene.store.RAMDirectory;\r
-import org.apache.lucene.util.Version;\r
-import org.eclipse.core.runtime.IProgressMonitor;\r
-import org.eclipse.core.runtime.SubMonitor;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.common.request.Adapt;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;\r
-import org.simantics.db.layer0.adapter.GenericRelation;\r
-\r
-/**\r
- * @author Tuukka Lehtonen\r
- * @author Antti Villberg\r
- */\r
-public class MemoryIndexing {\r
-\r
- final private Session session;\r
- \r
- final Map<String,Map<String,List<Map<String, Object>>>> persistentCache = new HashMap<String,Map<String,List<Map<String, Object>>>>();\r
- final Map<String,Map<String,List<Resource>>> persistentCacheResources = new HashMap<String,Map<String,List<Resource>>>();\r
- \r
- final private Map<String,RAMDirectory> directories = new HashMap<String,RAMDirectory>();\r
- \r
- final private Map<String,IndexedRelationsSearcherBase> immutableSearchers = new HashMap<String,IndexedRelationsSearcherBase>();\r
- final private Map<String,IndexedRelationsSearcher> searchers = new HashMap<String,IndexedRelationsSearcher>();\r
-\r
- public MemoryIndexing(Session session) {\r
- this.session = session;\r
- }\r
- \r
- protected File getIndexDirectory(Resource relation, Resource input) {\r
- return DatabaseIndexing.getIndexLocation(session, relation, input);\r
- }\r
- \r
- public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) {\r
- try {\r
- File location = getIndexDirectory(relation, input);\r
- String key = location.getAbsolutePath();\r
- IndexedRelationsSearcher searcher = searchers.get(key);\r
- if (searcher == null) {\r
- GenericRelation r = processor.sync(new Adapt<GenericRelation>(relation, GenericRelation.class));\r
- searcher = new IndexedRelationsSearcher(processor, relation, input, r);\r
- searchers.put(key, searcher);\r
- }\r
- return searcher;\r
- } catch (Exception e) {\r
- Logger.defaultLogError(e);\r
- return null;\r
- }\r
- }\r
-\r
- public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) {\r
- try {\r
- File location = getIndexDirectory(relation, input);\r
- String key = location.getAbsolutePath();\r
- IndexedRelationsSearcherBase searcher = immutableSearchers.get(key);\r
- if (searcher == null) {\r
- searcher = new ImmutableIndexedRelationsSearcher(processor, relation, input);\r
- immutableSearchers.put(key, searcher);\r
- }\r
- return searcher;\r
- } catch (Exception e) {\r
- Logger.defaultLogError(e);\r
- return null;\r
- }\r
- }\r
- \r
- public static MemoryIndexing getInstance(Session session) {\r
- MemoryIndexing ret = session.peekService(MemoryIndexing.class);\r
- if(ret == null) {\r
- ret = new MemoryIndexing(session);\r
- session.registerService(MemoryIndexing.class, ret);\r
- }\r
- return ret;\r
- }\r
- \r
- public synchronized Directory getDirectory(String path, Analyzer analyzer) throws IOException {\r
- RAMDirectory directory = directories.get(path);\r
- if (directory == null) {\r
- synchronized (directories) {\r
- directory = directories.get(path);\r
- if (directory == null) {\r
- directory = new RAMDirectory();\r
- IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer);\r
- new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close();\r
- directories.put(path, directory);\r
- }\r
- }\r
- }\r
- return directory;\r
-\r
- }\r
- \r
- public void remove(String path) {\r
- directories.remove(path);\r
- }\r
- \r
- public void flush(IProgressMonitor progress) throws Exception {\r
- \r
- SubMonitor monitor = SubMonitor.convert(progress);\r
- \r
- Set<Map.Entry<String, IndexedRelationsSearcher>> set = searchers.entrySet();\r
- Set<Map.Entry<String, IndexedRelationsSearcherBase>> iset = immutableSearchers.entrySet();\r
- \r
- monitor.setWorkRemaining(set.size()+iset.size());\r
-\r
- for(Map.Entry<String, IndexedRelationsSearcher> entry : set) {\r
-\r
- IndexedRelationsSearcher persistent = entry.getValue();\r
- IndexedRelationsMemorySearcher searcher = persistent.cache;\r
-\r
- if(persistent.isIndexAvailable()) {\r
- List<Object[]> os = searcher.allDocs(monitor, session);\r
- persistent.applyChanges(monitor, session, searcher.r, os);\r
- }\r
- \r
- monitor.worked(1);\r
- entry.getValue().changeState(monitor, session, State.READY);\r
-\r
- }\r
-\r
- for(Map.Entry<String, IndexedRelationsSearcherBase> entry : iset) {\r
- \r
- entry.getValue().changeState(monitor, session, State.READY);\r
- monitor.worked(1);\r
- \r
- }\r
- \r
- }\r
- \r
-}\r
+package org.simantics.db.indexing;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.stream.Stream;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.SubMonitor;
+import org.simantics.db.RequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.common.request.Adapt;
+import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;
+import org.simantics.db.layer0.adapter.GenericRelation;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Tuukka Lehtonen
+ * @author Antti Villberg
+ */
+public class MemoryIndexing {
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MemoryIndexing.class);
+
+ final private Session session;
+
+ final Map<String,Map<String,List<Map<String, Object>>>> persistentCache = new ConcurrentHashMap<>();
+ final Map<String,Map<String,List<Resource>>> persistentCacheResources = new ConcurrentHashMap<>();
+
+ final private ConcurrentHashMap<String,RAMDirectory> directories = new ConcurrentHashMap<>();
+
+ final private ConcurrentHashMap<String,IndexedRelationsSearcherBase> immutableSearchers = new ConcurrentHashMap<>();
+ final private ConcurrentHashMap<String,IndexedRelationsSearcher> searchers = new ConcurrentHashMap<>();
+
+ public MemoryIndexing(Session session) {
+ this.session = session;
+ }
+
+ protected Path getIndexDirectory(Resource relation, Resource input) {
+ return DatabaseIndexing.getIndexLocation(session, relation, input);
+ }
+
+ public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) {
+ Path location = getIndexDirectory(relation, input);
+ String key = location.toAbsolutePath().toString();
+ return searchers.computeIfAbsent(key, t -> {
+ try {
+ GenericRelation r = processor.sync(new Adapt<GenericRelation>(relation, GenericRelation.class));
+ return new IndexedRelationsSearcher(processor, relation, input, r);
+ } catch (Exception e) {
+ LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e);
+ return null;
+ }
+ });
+ }
+
+ public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) {
+ Path location = getIndexDirectory(relation, input);
+ String key = location.toAbsolutePath().toString();
+ return immutableSearchers.computeIfAbsent(key, t -> {
+ try {
+ return new ImmutableIndexedRelationsSearcher(processor, relation, input);
+ } catch (Exception e) {
+ LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e);
+ return null;
+ }
+ });
+ }
+
+ public static MemoryIndexing getInstance(Session session) {
+ MemoryIndexing ret = session.peekService(MemoryIndexing.class);
+ if(ret == null) {
+ ret = new MemoryIndexing(session);
+ session.registerService(MemoryIndexing.class, ret);
+ }
+ return ret;
+ }
+
+ public Directory getDirectory(String path, Analyzer analyzer) throws IOException {
+ try {
+ return directories.computeIfAbsent(path, t -> {
+ try {
+ RAMDirectory directory = new RAMDirectory();
+ IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer);
+ new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close();
+ return directory;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RuntimeException e) {
+ throw (IOException) e.getCause();
+ }
+ }
+
+ public void remove(String path) {
+ directories.remove(path);
+ }
+
+ public void flush(IProgressMonitor progress) throws Exception {
+ long startTime = System.currentTimeMillis();
+ SubMonitor monitor = SubMonitor.convert(progress);
+ Collection<IndexedRelationsSearcher> searcherEntries = searchers.values();
+ Collection<IndexedRelationsSearcherBase> immutableSearcherEntries = immutableSearchers.values();
+ int count = searcherEntries.size() + immutableSearcherEntries.size();
+ Semaphore sema = new Semaphore(0);
+ Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> {
+ try {
+ if (base.isIndexAvailable()) {
+ if (base instanceof IndexedRelationsSearcher) {
+ IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache;
+ try {
+ List<Object[]> os = searcher.allDocs(monitor, session);
+ ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os);
+ } catch (Exception e) {
+ LOGGER.error("Could not flush", e);
+ }
+ }
+ }
+ monitor.worked(1);
+ base.changeState(monitor, session, State.READY);
+ } finally {
+ sema.release();
+ }
+ });
+ sema.acquire(count);
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("index flush " + totalTime);
+ }
+
+}