-
- Set<Map.Entry<String, IndexedRelationsSearcher>> set = searchers.entrySet();
- Set<Map.Entry<String, IndexedRelationsSearcherBase>> iset = immutableSearchers.entrySet();
-
- monitor.setWorkRemaining(set.size()+iset.size());
-
- for(Map.Entry<String, IndexedRelationsSearcher> entry : set) {
-
- IndexedRelationsSearcher persistent = entry.getValue();
- IndexedRelationsMemorySearcher searcher = persistent.cache;
-
- if(persistent.isIndexAvailable()) {
- List<Object[]> os = searcher.allDocs(monitor, session);
- persistent.applyChanges(monitor, session, searcher.r, os);
- }
-
- monitor.worked(1);
- entry.getValue().changeState(monitor, session, State.READY);
-
- }
-
- for(Map.Entry<String, IndexedRelationsSearcherBase> entry : iset) {
-
- entry.getValue().changeState(monitor, session, State.READY);
- monitor.worked(1);
-
- }
-
+ Collection<IndexedRelationsSearcher> searcherEntries = searchers.values();
+ Collection<IndexedRelationsSearcherBase> immutableSearcherEntries = immutableSearchers.values();
+ int count = searcherEntries.size() + immutableSearcherEntries.size();
+ Semaphore sema = new Semaphore(0);
+ Stream.concat(searcherEntries.stream(), immutableSearcherEntries.stream()).parallel().forEach(base -> {
+ try {
+ if (base.isIndexAvailable()) {
+ if (base instanceof IndexedRelationsSearcher) {
+ IndexedRelationsMemorySearcher searcher = ((IndexedRelationsSearcher) base).cache;
+ try {
+ List<Object[]> os = searcher.allDocs(monitor, session);
+ ((IndexedRelationsSearcher) base).applyChanges(monitor, session, searcher.r, os);
+ } catch (Exception e) {
+ LOGGER.error("Could not flush", e);
+ }
+ }
+ }
+ monitor.worked(1);
+ base.changeState(monitor, session, State.READY);
+ } finally {
+ sema.release();
+ }
+ });
+ sema.acquire(count);
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("index flush " + totalTime);