import java.io.IOException;
import java.nio.file.Path;
-import java.util.HashMap;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.stream.Stream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexWriter;
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MemoryIndexing.class);
- final private Session session;
-
- final Map<String,Map<String,List<Map<String, Object>>>> persistentCache = new HashMap<String,Map<String,List<Map<String, Object>>>>();
- final Map<String,Map<String,List<Resource>>> persistentCacheResources = new HashMap<String,Map<String,List<Resource>>>();
-
- final private Map<String,RAMDirectory> directories = new HashMap<String,RAMDirectory>();
-
- final private Map<String,IndexedRelationsSearcherBase> immutableSearchers = new HashMap<String,IndexedRelationsSearcherBase>();
- final private Map<String,IndexedRelationsSearcher> searchers = new HashMap<String,IndexedRelationsSearcher>();
+ final private Session session;
+
+ final Map<String,Map<String,List<Map<String, Object>>>> persistentCache = new ConcurrentHashMap<>();
+ final Map<String,Map<String,List<Resource>>> persistentCacheResources = new ConcurrentHashMap<>();
+
+ final private ConcurrentHashMap<String,RAMDirectory> directories = new ConcurrentHashMap<>();
+
+ final private ConcurrentHashMap<String,IndexedRelationsSearcherBase> immutableSearchers = new ConcurrentHashMap<>();
+ final private ConcurrentHashMap<String,IndexedRelationsSearcher> searchers = new ConcurrentHashMap<>();
public MemoryIndexing(Session session) {
this.session = session;
}
-
+
protected Path getIndexDirectory(Resource relation, Resource input) {
return DatabaseIndexing.getIndexLocation(session, relation, input);
}
-
+
public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) {
Path location = getIndexDirectory(relation, input);
- try {
- String key = location.toAbsolutePath().toString();
- IndexedRelationsSearcher searcher = searchers.get(key);
- if (searcher == null) {
+ String key = location.toAbsolutePath().toString();
+ return searchers.computeIfAbsent(key, t -> {
+ try {
GenericRelation r = processor.sync(new Adapt<GenericRelation>(relation, GenericRelation.class));
- searcher = new IndexedRelationsSearcher(processor, relation, input, r);
- searchers.put(key, searcher);
+ return new IndexedRelationsSearcher(processor, relation, input, r);
+ } catch (Exception e) {
+ LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e);
+ return null;
}
- return searcher;
- } catch (Exception e) {
- LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e);
- return null;
- }
+ });
}
public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) {
Path location = getIndexDirectory(relation, input);
- try {
- String key = location.toAbsolutePath().toString();
- IndexedRelationsSearcherBase searcher = immutableSearchers.get(key);
- if (searcher == null) {
- searcher = new ImmutableIndexedRelationsSearcher(processor, relation, input);
- immutableSearchers.put(key, searcher);
+ String key = location.toAbsolutePath().toString();
+ return immutableSearchers.computeIfAbsent(key, t -> {
+ try {
+ return new ImmutableIndexedRelationsSearcher(processor, relation, input);
+ } catch (Exception e) {
+ LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e);
+ return null;
}
- return searcher;
- } catch (Exception e) {
- LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e);
- return null;
- }
+ });
}
public static MemoryIndexing getInstance(Session session) {
- MemoryIndexing ret = session.peekService(MemoryIndexing.class);
+ MemoryIndexing ret = session.peekService(MemoryIndexing.class);
if(ret == null) {
- ret = new MemoryIndexing(session);
+ ret = new MemoryIndexing(session);
session.registerService(MemoryIndexing.class, ret);
}
return ret;
}
- public synchronized Directory getDirectory(String path, Analyzer analyzer) throws IOException {
- RAMDirectory directory = directories.get(path);
- if (directory == null) {
- synchronized (directories) {
- directory = directories.get(path);
- if (directory == null) {
- directory = new RAMDirectory();
+ public Directory getDirectory(String path, Analyzer analyzer) throws IOException {
+ try {
+ return directories.computeIfAbsent(path, t -> {
+ try {
+ RAMDirectory directory = new RAMDirectory();
IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer);
new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close();
- directories.put(path, directory);
+ return directory;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- }
+ });
+ } catch (RuntimeException e) {
+ throw (IOException) e.getCause();
}
- return directory;
-
}
-
+
public void remove(String path) {
directories.remove(path);
}
-
+
public void flush(IProgressMonitor progress) throws Exception {
-
+ long startTime = System.currentTimeMillis();
SubMonitor monitor = SubMonitor.convert(progress);
-
- Set<Map.Entry<String, IndexedRelationsSearcher>> set = searchers.entrySet();
- Set<Map.Entry<String, IndexedRelationsSearcherBase>> iset = immutableSearchers.entrySet();
-
- monitor.setWorkRemaining(set.size()+iset.size());
-
- for(Map.Entry<String, IndexedRelationsSearcher> entry : set) {
-
- IndexedRelationsSearcher persistent = entry.getValue();
- IndexedRelationsMemorySearcher searcher = persistent.cache;
-
- if(persistent.isIndexAvailable()) {
- List<Object[]> os = searcher.allDocs(monitor, session);
- persistent.applyChanges(monitor, session, searcher.r, os);
- }
-
- monitor.worked(1);
- entry.getValue().changeState(monitor, session, State.READY);
-
- }
-
- for(Map.Entry<String, IndexedRelationsSearcherBase> entry : iset) {
-
- entry.getValue().changeState(monitor, session, State.READY);
- monitor.worked(1);
-
- }
-
+ Collection<IndexedRelationsSearcher> searcherEntries = searchers.values();
+ Collection<IndexedRelationsSearcherBase> immutableSearcherEntries = immutableSearchers.values();
+ int count = searcherEntries.size() + immutableSearcherEntries.size();
+ Semaphore sema = new Semaphore(0);
+ Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> {
+ try {
+ if (base.isIndexAvailable()) {
+ if (base instanceof IndexedRelationsSearcher) {
+ IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache;
+ try {
+ List<Object[]> os = searcher.allDocs(monitor, session);
+ ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os);
+ } catch (Exception e) {
+ LOGGER.error("Could not flush", e);
+ }
+ }
+ }
+ monitor.worked(1);
+ base.changeState(monitor, session, State.READY);
+ } finally {
+ sema.release();
+ }
+ });
+ sema.acquire(count);
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("index flush " + totalTime);
}
}