X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.indexing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Findexing%2FMemoryIndexing.java;h=cee5eb26bfb616799c522baca31cf186b2a46a17;hp=75c6b79bb51c6a1dc4da3b9c9e1064f3fbb93b25;hb=3d96e3fa94898d90d6892c081ebed7cb9e773a4a;hpb=9c1b0bdc646d2e91526e49fd9233c2eb0c243019 diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java index 75c6b79bb..cee5eb26b 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/MemoryIndexing.java @@ -2,10 +2,12 @@ package org.simantics.db.indexing; import java.io.IOException; import java.nio.file.Path; -import java.util.HashMap; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.stream.Stream; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; @@ -32,118 +34,110 @@ public class MemoryIndexing { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MemoryIndexing.class); - final private Session session; - - final Map>>> persistentCache = new HashMap>>>(); - final Map>> persistentCacheResources = new HashMap>>(); - - final private Map directories = new HashMap(); - - final private Map immutableSearchers = new HashMap(); - final private Map searchers = new HashMap(); + final private Session session; + + final Map>>> persistentCache = new ConcurrentHashMap<>(); + final Map>> persistentCacheResources = new ConcurrentHashMap<>(); + + final private ConcurrentHashMap directories = new ConcurrentHashMap<>(); + + final private ConcurrentHashMap immutableSearchers = new ConcurrentHashMap<>(); + final private ConcurrentHashMap searchers = new ConcurrentHashMap<>(); public MemoryIndexing(Session session) { this.session = session; } - + protected Path getIndexDirectory(Resource relation, Resource input) { return DatabaseIndexing.getIndexLocation(session, relation, input); } - + public IndexedRelationsSearcher get(RequestProcessor processor, Resource relation, Resource input) { Path location = getIndexDirectory(relation, input); - try { - String key = location.toAbsolutePath().toString(); - IndexedRelationsSearcher searcher = searchers.get(key); - if (searcher == null) { + String key = location.toAbsolutePath().toString(); + return searchers.computeIfAbsent(key, t -> { + try { GenericRelation r = processor.sync(new Adapt(relation, GenericRelation.class)); - searcher = new IndexedRelationsSearcher(processor, relation, input, r); - searchers.put(key, searcher); + return new IndexedRelationsSearcher(processor, relation, input, r); + } catch (Exception e) { + LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e); + return null; } - return searcher; - } catch (Exception e) { - LOGGER.error("Could not get searcher for relation {} and input {} in location {}", relation, input, location, e); - return null; - } + }); } public IndexedRelationsSearcherBase getImmutable(RequestProcessor processor, Resource relation, Resource input) { Path location = getIndexDirectory(relation, input); - try { - String key = location.toAbsolutePath().toString(); - IndexedRelationsSearcherBase searcher = immutableSearchers.get(key); - if (searcher == null) { - searcher = new ImmutableIndexedRelationsSearcher(processor, relation, input); - immutableSearchers.put(key, searcher); + String key = location.toAbsolutePath().toString(); + return immutableSearchers.computeIfAbsent(key, t -> { + try { + return new ImmutableIndexedRelationsSearcher(processor, relation, input); + } catch (Exception e) { + LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e); + return null; } - return searcher; - } catch (Exception e) { - LOGGER.error("Could not get searcher base for relation {} and input {} in location {}", relation, input, location, e); - return null; - } + }); } public static MemoryIndexing getInstance(Session session) { - MemoryIndexing ret = session.peekService(MemoryIndexing.class); + MemoryIndexing ret = session.peekService(MemoryIndexing.class); if(ret == null) { - ret = new MemoryIndexing(session); + ret = new MemoryIndexing(session); session.registerService(MemoryIndexing.class, ret); } return ret; } - public synchronized Directory getDirectory(String path, Analyzer analyzer) throws IOException { - RAMDirectory directory = directories.get(path); - if (directory == null) { - synchronized (directories) { - directory = directories.get(path); - if (directory == null) { - directory = new RAMDirectory(); + public Directory getDirectory(String path, Analyzer analyzer) throws IOException { + try { + return directories.computeIfAbsent(path, t -> { + try { + RAMDirectory directory = new RAMDirectory(); IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, analyzer); new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)).close(); - directories.put(path, directory); + return directory; + } catch (IOException e) { + throw new RuntimeException(e); } - } + }); + } catch (RuntimeException e) { + throw (IOException) e.getCause(); } - return directory; - } - + public void remove(String path) { directories.remove(path); } - + public void flush(IProgressMonitor progress) throws Exception { - + long startTime = System.currentTimeMillis(); SubMonitor monitor = SubMonitor.convert(progress); - - Set> set = searchers.entrySet(); - Set> iset = immutableSearchers.entrySet(); - - monitor.setWorkRemaining(set.size()+iset.size()); - - for(Map.Entry entry : set) { - - IndexedRelationsSearcher persistent = entry.getValue(); - IndexedRelationsMemorySearcher searcher = persistent.cache; - - if(persistent.isIndexAvailable()) { - List os = searcher.allDocs(monitor, session); - persistent.applyChanges(monitor, session, searcher.r, os); - } - - monitor.worked(1); - entry.getValue().changeState(monitor, session, State.READY); - - } - - for(Map.Entry entry : iset) { - - entry.getValue().changeState(monitor, session, State.READY); - monitor.worked(1); - - } - + Collection searcherEntries = searchers.values(); + Collection immutableSearcherEntries = immutableSearchers.values(); + int count = searcherEntries.size() + immutableSearcherEntries.size(); + Semaphore sema = new Semaphore(0); + Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> { + try { + if (base.isIndexAvailable()) { + if (base instanceof IndexedRelationsSearcher) { + IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache; + try { + List os = searcher.allDocs(monitor, session); + ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os); + } catch (Exception e) { + LOGGER.error("Could not flush", e); + } + } + } + monitor.worked(1); + base.changeState(monitor, session, State.READY); + } finally { + sema.release(); + } + }); + sema.acquire(count); + long totalTime = System.currentTimeMillis() - startTime; + LOGGER.info("index flush " + totalTime); } }