+ @Override
+ public void fullRebuild(IProgressMonitor monitor, RequestProcessor processor) throws IndexException {
+ try {
+ processor.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ try {
+ fullRebuild(monitor, graph);
+ } catch (IOException e) {
+ throw new IndexingException(e);
+ }
+ }
+ });
+ } catch (DatabaseException e) {
+ throw new IndexException(e);
+ }
+ }
+
+ private void fullRebuild(IProgressMonitor monitor, ReadGraph graph) throws DatabaseException, IOException {
+ long startTime = System.currentTimeMillis();
+ Resource relation = Layer0X.getInstance(graph).DependenciesRelation;
+ SerialisationSupport ss = graph.getService(SerialisationSupport.class);
+ Set<Resource> indexRoots = Layer0Utils.listIndexRoots(graph);
+ List<CompletableFuture<?>> waitFor = new ArrayList<>(indexRoots.size());
+ SubMonitor mon = SubMonitor.convert(monitor, indexRoots.size()*2);
+
+ for (Resource indexRoot : indexRoots) {
+ monitor.subTask(NameUtils.getSafeName(graph, indexRoot));
+
+ IndexedRelationsSearcherBase searcher = makeSearcher(graph, relation, indexRoot);
+
+ GenericRelation r = graph.adapt(relation, GenericRelation.class);
+ if (r == null)
+ throw new IndexingException("Given resource " + relation + "could not be adapted to GenericRelation.");
+
+ Object[] bound = new Object[] { ss.getRandomAccessId(indexRoot) };
+ GenericRelation selection = r.select(IndexedRelationsSearcherBase.getPattern(r, bound.length), bound);
+
+ long relStart = System.currentTimeMillis();
+ List<Object[]> results = selection.realize(graph);
+ if (LOGGER.isDebugEnabled()) {
+ long relEnd = System.currentTimeMillis() - relStart;
+ LOGGER.debug(indexRoot + " realized " + relEnd);
+ }
+ mon.worked(1);
+
+ CompletableFuture<?> result = new CompletableFuture<>();
+ waitFor.add(result);
+ ForkJoinPool.commonPool().submit(() -> {
+ long startTime1 = System.currentTimeMillis();
+ try {
+ searcher.initializeIndexImpl(result, mon.newChild(1, SubMonitor.SUPPRESS_ALL_LABELS), r, results, bound, true);
+ searcher.setReady();
+ } catch (IOException e) {
+ result.completeExceptionally(e);
+ LOGGER.error("Could not initialize index", e);
+ } finally {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug(indexRoot + " initialized " + (System.currentTimeMillis() - startTime1));
+ }
+ });
+ }
+ for (CompletableFuture<?> fut : waitFor) {
+ try {
+ fut.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw (IOException) e.getCause();
+ }
+ }
+ if (LOGGER.isInfoEnabled()) {
+ long endTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("All indexes rebuilt in {}", endTime);
+ }
+ }
+
+ @Override
+ public void flush(IProgressMonitor progress, Session session) throws IndexException {
+ long startTime = System.currentTimeMillis();
+ SubMonitor monitor = SubMonitor.convert(progress);
+ MemoryIndexing mem = MemoryIndexing.getInstance(session);
+
+ try {
+ List<IndexedRelationsSearcherBase> searchers = mem.getAllSearchers();
+ int count = searchers.size();
+ Semaphore sema = new Semaphore(0);
+ searchers.stream().parallel().forEach(s -> {
+ LockHandle handle = lock(session, Pair.make(s.getRelation(), s.getInput()), true);
+ try {
+ if (s.isIndexAvailable() && s instanceof IndexedRelationsSearcher) {
+ IndexedRelationsSearcher searcher = (IndexedRelationsSearcher) s;
+ try {
+ List<Object[]> os = searcher.cache.allDocs(monitor, session);
+ searcher.applyChanges(monitor, session, searcher.cache.r, os);
+ } catch (Exception e) {
+ LOGGER.error("Could not flush in-memory changes to on-disk index", e);
+ }
+ }
+ monitor.worked(1);
+ s.changeState(monitor, session, State.READY);
+ } finally {
+ handle.unlock();
+ sema.release();
+ }
+ });
+ sema.acquire(count);
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("Database index cache flush done in {} ms", totalTime);
+ } catch (InterruptedException e) {
+ LOGGER.error("Index searcher flush interrupted", e);
+ }
+ }
+