1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.indexing;
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
22 import java.util.WeakHashMap;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ForkJoinPool;
26 import java.util.concurrent.Semaphore;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantReadWriteLock;
30 import org.apache.lucene.queryparser.classic.ParseException;
31 import org.eclipse.core.runtime.IProgressMonitor;
32 import org.eclipse.core.runtime.SubMonitor;
33 import org.simantics.db.ReadGraph;
34 import org.simantics.db.RequestProcessor;
35 import org.simantics.db.Resource;
36 import org.simantics.db.Session;
37 import org.simantics.db.common.request.ReadRequest;
38 import org.simantics.db.common.request.UniqueRead;
39 import org.simantics.db.common.utils.NameUtils;
40 import org.simantics.db.exception.DatabaseException;
41 import org.simantics.db.exception.InvalidResourceReferenceException;
42 import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;
43 import org.simantics.db.indexing.exception.IndexingException;
44 import org.simantics.db.layer0.adapter.GenericRelation;
45 import org.simantics.db.layer0.genericrelation.IndexException;
46 import org.simantics.db.layer0.genericrelation.IndexedRelations;
47 import org.simantics.db.layer0.util.Layer0Utils;
48 import org.simantics.db.service.QueryControl;
49 import org.simantics.db.service.SerialisationSupport;
50 import org.simantics.operation.Layer0X;
51 import org.simantics.utils.datastructures.Pair;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * @author Tuukka Lehtonen
57 * @author Antti Villberg
59 public class IndexedRelationsImpl implements IndexedRelations {
61 private static final Logger LOGGER = LoggerFactory.getLogger(IndexedRelationsImpl.class);
63 Map<Object, RWLock> indexLocks = new WeakHashMap<Object, RWLock>();
65 static class LockHandle {
66 public final Object id;
67 public final Lock lock;
69 public LockHandle(Object id, Lock lock) {
74 public void unlock() {
75 if (IndexPolicy.TRACE_INDEX_LOCKING)
76 System.out.println("Unlocking index " + id);
82 public final Object id;
83 public final ReentrantReadWriteLock lock;
85 public RWLock(Object id) {
87 this.lock = new ReentrantReadWriteLock(true);
90 LockHandle lock(RequestProcessor processor, boolean write) {
91 Lock l = write ? lock.writeLock() : lock.readLock();
92 if(processor instanceof ReadGraph) {
93 ReadGraph graph = (ReadGraph)processor;
95 QueryControl qc = processor.getService(QueryControl.class);
96 boolean executed = qc.resume(graph);
100 } catch (InterruptedException e) {
107 if (IndexPolicy.TRACE_INDEX_LOCKING)
108 System.out.println("Locked index " + id);
109 return new LockHandle(id, l);
112 LockHandle tryLock(RequestProcessor processor, boolean write) {
113 Lock l = write ? lock.writeLock() : lock.readLock();
114 if(l.tryLock()) return new LockHandle(id, l);
120 private LockHandle lock(RequestProcessor processor, Object lockIdentifier, boolean write) {
121 RWLock rwlock = null;
122 synchronized (indexLocks) {
123 rwlock = indexLocks.get(lockIdentifier);
124 if (rwlock == null) {
125 rwlock = new RWLock(lockIdentifier);
126 indexLocks.put(lockIdentifier, rwlock);
129 return rwlock.lock(processor, write);
132 private LockHandle tryLock(RequestProcessor processor, Object lockIdentifier, boolean write) {
133 RWLock rwlock = null;
134 synchronized (indexLocks) {
135 rwlock = indexLocks.get(lockIdentifier);
136 if (rwlock == null) {
137 rwlock = new RWLock(lockIdentifier);
138 indexLocks.put(lockIdentifier, rwlock);
141 return rwlock.tryLock(processor, write);
144 private static IndexedRelationsSearcherBase makeSearcher(final RequestProcessor processor, final Resource relation, final Resource input) {
146 return processor.syncRequest(new UniqueRead<IndexedRelationsSearcherBase>() {
149 public IndexedRelationsSearcherBase perform(ReadGraph graph) throws DatabaseException {
150 if(graph.isImmutable(input)) {
151 return MemoryIndexing.getInstance(processor.getSession()).getImmutable(processor, relation, input);
153 return MemoryIndexing.getInstance(processor.getSession()).get(processor, relation, input);
158 } catch (DatabaseException e) {
159 throw new IllegalStateException(e);
163 private LockHandle waitLoaded(SubMonitor progress, final IndexedRelationsSearcherBase searcher, RequestProcessor processor, LockHandle lock, final Object lockId, final Resource input) throws IndexException {
165 // Initial state: we are locked, no news about the index
166 // Final state: we are locked and the index has been loaded, the current lock is returned
168 // First just check if the index is loaded
169 if (searcher.isIndexAvailable()) {
170 // We have an index - try to start access
171 searcher.startAccess(progress.newChild(50), processor.getSession(), false);
172 // At this point we have three options:
174 if(searcher.hasAccess(false)) return lock;
175 // 2. something is wrong and the index cannot be cleaned
176 if(searcher.checkState(State.PROBLEM)) throw new IndexException("Searcher is in problematic state", searcher.getException());
177 // 3. something was wrong, but the index has been successfully cleaned
180 searcher.assertState(State.NONE);
182 // We loop until the index is loaded
185 // With ReadGraph we can proceed to initialize
186 if(processor instanceof ReadGraph) {
188 // (re)create the index.
190 SerialisationSupport ss = processor.getService(SerialisationSupport.class);
191 searcher.initializeIndex(progress.newChild(40), (ReadGraph)processor, new Object[] { ss.getRandomAccessId(input) }, true);
193 searcher.startAccess(progress.newChild(10), processor.getSession(), false);
194 if(searcher.hasAccess(false)) return lock;
195 } catch (IOException e) {
196 searcher.setProblem(e);
197 throw new IndexException(e);
198 } catch (DatabaseException e) {
199 searcher.setProblem(e);
200 throw new IndexException(e);
204 // With session we schedule the job
210 // final Semaphore s = new Semaphore(0);
214 boolean success = false;
216 while(!success && (++tries)<10) {
220 success = processor.sync(new UniqueRead<Boolean>() {
223 public Boolean perform(ReadGraph graph) throws DatabaseException {
226 LockHandle lock = tryLock(graph, lockId, true);
227 if(lock == null) return false;
231 boolean loaded = false;
232 if (searcher.isIndexAvailable()) {
233 searcher.startAccess(null, graph.getSession(), false);
234 // At this point we have three options:
236 if(searcher.hasAccess(false))
238 // 2. something is wrong and the index cannot be cleaned
239 if(searcher.checkState(State.PROBLEM))
240 throw new IndexingException("Searcher is in problematic state", searcher.getException());
241 // 3. something was wrong, but the index has been successfully cleaned
246 if(!searcher.checkState(State.NONE))
247 throw new IndexingException("Illegal searcher state " + searcher.state());
250 SerialisationSupport ss = graph.getService(SerialisationSupport.class);
251 searcher.initializeIndex(null, graph, new Object[] { ss.getRandomAccessId(input) }, true);
253 } catch (IOException e) {
254 searcher.setProblem(e);
255 throw new IndexingException(e);
273 } catch (DatabaseException e) {
274 throw new IndexException(e);
280 throw new IndexException("Did not manage to load index.");
285 lock= lock(processor, lockId, true);
287 if (searcher.isIndexAvailable()) {
288 searcher.startAccess(progress.newChild(50), processor.getSession(), false);
289 if(searcher.hasAccess(false)) return lock;
290 throw new IndexException("Illegal searcher state " + searcher.state());
301 public List<Resource> queryResources(IProgressMonitor monitor, String search, RequestProcessor processor,
302 Resource relation, final Resource input, int maxResultCount) {
303 if (processor == null)
304 throw new IllegalArgumentException("null processor");
305 if (relation == null)
306 throw new IllegalArgumentException("null relation");
308 throw new IllegalArgumentException("null input");
310 throw new IllegalArgumentException("null search criterion");
312 SubMonitor progress = SubMonitor.convert(monitor, 100);
314 // Look for existing index.
315 // Indexes always exist in secondary storage, i.e. disk.
316 // Indexes can be cached in memory when necessary performance-wise.
318 final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input);
320 final Object lockId = Pair.make(relation, input);
322 LockHandle lock = lock(processor, lockId, false);
324 // Ensure that index is loaded & ready
325 lock = waitLoaded(progress, searcher, processor, lock, lockId, input);
329 return searcher.doSearchResources(progress.newChild(50), processor, search, maxResultCount);
330 } catch (ParseException e) {
331 // FIXME: should throw an exception, not just ignore.
333 return Collections.emptyList();
334 //throw new IndexException(e);
335 } catch (IOException e) {
336 throw new IndexException(e);
337 } catch (DatabaseException e) {
338 throw new IndexException(e);
345 public List<Map<String, Object>> query(IProgressMonitor monitor, String search, RequestProcessor processor,
346 Resource relation, final Resource input, int maxResultCount) {
347 if (processor == null)
348 throw new IllegalArgumentException("null processor");
349 if (relation == null)
350 throw new IllegalArgumentException("null relation");
352 throw new IllegalArgumentException("null input");
354 throw new IllegalArgumentException("null search criterion");
356 SubMonitor progress = SubMonitor.convert(monitor, 100);
358 // Look for existing index.
359 // Indexes always exist in secondary storage, i.e. disk.
360 // Indexes can be cached in memory when necessary performance-wise.
362 final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input);
364 final Object lockId = Pair.make(relation, input);
366 LockHandle lock = lock(processor, lockId, false);
368 // Ensure that index is loaded & ready
369 lock = waitLoaded(progress, searcher, processor, lock, lockId, input);
373 return searcher.doSearch(progress.newChild(50), processor, search, maxResultCount);
374 } catch (ParseException e) {
375 // FIXME: should throw an exception, not just ignore.
377 return Collections.emptyList();
378 //throw new IndexException(e);
379 } catch (IOException e) {
380 throw new IndexException(e);
381 } catch (DatabaseException e) {
382 throw new IndexException(e);
389 public void insert(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
390 Resource relationResource, Resource input, Collection<Object[]> documents) throws IndexException {
392 // System.out.println("Inserting to index: " + input + " " + documents);
394 if (relation == null)
395 throw new IllegalArgumentException("null relation");
397 throw new IllegalArgumentException("null input");
398 if (documents == null)
399 throw new IllegalArgumentException("null documents");
401 if (documents.isEmpty())
404 final SubMonitor progress = SubMonitor.convert(monitor, 100);
406 final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
408 LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
412 DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
413 if(!searcher.startAccess(null, processor.getSession(), true)) {
414 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
418 searcher.insertIndex(progress.newChild(40), relation, 1, documents);
420 } catch (InvalidResourceReferenceException e) {
421 throw new IndexException(e);
422 } catch (IOException e) {
423 throw new IndexException(e);
424 } catch (DatabaseException e) {
425 throw new IndexException(e);
432 public void remove(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
433 Resource relationResource, Resource input, String key, Collection<Object> keyValues) throws IndexException {
435 if (relation == null)
436 throw new IllegalArgumentException("null relation");
438 throw new IllegalArgumentException("null input");
440 throw new IllegalArgumentException("null key");
442 SubMonitor progress = SubMonitor.convert(monitor, 100);
444 IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
446 LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
449 DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
450 if(!searcher.startAccess(null, processor.getSession(), true)) {
451 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
455 searcher.removeIndex(progress.newChild(40), relation, processor, key, keyValues);
457 } catch (DatabaseException e) {
458 throw new IndexException(e);
459 } catch (IOException e) {
460 throw new IndexException(e);
467 public void removeAll(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
468 Resource relationResource, Resource input) {
470 if (relation == null)
471 throw new IllegalArgumentException("null relation");
473 throw new IllegalArgumentException("null input");
475 IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
477 LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
481 Throwable t = searcher.bestEffortClear(monitor, processor.getSession());
482 if(t != null) searcher.setProblem(t);
483 else searcher.setNone();
492 public boolean replace(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
493 Resource relationResource, Resource input, String key, Collection<Object> keyValues, Collection<Object[]> documents) throws IndexException {
495 if (relation == null)
496 throw new IllegalArgumentException("null relation");
498 throw new IllegalArgumentException("null input");
500 throw new IllegalArgumentException("null key");
502 SubMonitor progress = SubMonitor.convert(monitor, 100);
504 IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
506 LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
508 boolean didChange = false;
512 DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
513 if(!searcher.startAccess(null, processor.getSession(), true)) {
514 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
517 didChange |= searcher.replaceIndex(progress.newChild(40), key, keyValues, relation, 1, documents);
519 } catch (InvalidResourceReferenceException e) {
520 throw new IndexException(e);
521 } catch (IOException e) {
522 throw new IndexException(e);
523 } catch (DatabaseException e) {
524 throw new IndexException(e);
525 } catch (Throwable t) {
526 throw new IndexException(t);
536 public void reset(IProgressMonitor monitor, RequestProcessor processor, Resource relationResource, Resource input) throws IndexException {
538 IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
540 LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
541 Path path = DatabaseIndexing.getIndexLocation(processor.getSession(), relationResource, input);
543 searcher.changeState(monitor, processor.getSession(), State.NONE);
544 if (!searcher.checkState(State.NONE))
545 throw new IndexException("Could not close index for input " + input + " before removing it");
547 DatabaseIndexing.deleteIndex(path);
548 } catch (IOException e) {
549 LOGGER.error("Could not delete {}", path.toAbsolutePath(), e);
550 throw new IndexException(e);
558 public void fullRebuild(IProgressMonitor monitor, RequestProcessor processor) throws IndexException {
560 processor.syncRequest(new ReadRequest() {
562 public void run(ReadGraph graph) throws DatabaseException {
564 fullRebuild(monitor, graph);
565 } catch (IOException e) {
566 throw new IndexingException(e);
570 } catch (DatabaseException e) {
571 throw new IndexException(e);
575 private void fullRebuild(IProgressMonitor monitor, ReadGraph graph) throws DatabaseException, IOException {
576 long startTime = System.currentTimeMillis();
577 Resource relation = Layer0X.getInstance(graph).DependenciesRelation;
578 SerialisationSupport ss = graph.getService(SerialisationSupport.class);
579 Set<Resource> indexRoots = Layer0Utils.listIndexRoots(graph);
580 List<CompletableFuture<?>> waitFor = new ArrayList<>(indexRoots.size());
581 SubMonitor mon = SubMonitor.convert(monitor, indexRoots.size()*2);
583 for (Resource indexRoot : indexRoots) {
584 monitor.subTask(NameUtils.getSafeName(graph, indexRoot));
586 IndexedRelationsSearcherBase searcher = makeSearcher(graph, relation, indexRoot);
588 GenericRelation r = graph.adapt(relation, GenericRelation.class);
590 throw new IndexingException("Given resource " + relation + "could not be adapted to GenericRelation.");
592 Object[] bound = new Object[] { ss.getRandomAccessId(indexRoot) };
593 GenericRelation selection = r.select(IndexedRelationsSearcherBase.getPattern(r, bound.length), bound);
595 long relStart = System.currentTimeMillis();
596 List<Object[]> results = selection.realize(graph);
597 if (LOGGER.isDebugEnabled()) {
598 long relEnd = System.currentTimeMillis() - relStart;
599 LOGGER.debug(indexRoot + " realized " + relEnd);
603 CompletableFuture<?> result = new CompletableFuture<>();
605 ForkJoinPool.commonPool().submit(() -> {
606 long startTime1 = System.currentTimeMillis();
608 searcher.initializeIndexImpl(result, mon.newChild(1, SubMonitor.SUPPRESS_ALL_LABELS), r, results, bound, true);
610 } catch (IOException e) {
611 result.completeExceptionally(e);
612 LOGGER.error("Could not initialize index", e);
614 if (LOGGER.isDebugEnabled())
615 LOGGER.debug(indexRoot + " initialized " + (System.currentTimeMillis() - startTime1));
619 for (CompletableFuture<?> fut : waitFor) {
622 } catch (InterruptedException | ExecutionException e) {
623 throw (IOException) e.getCause();
626 if (LOGGER.isInfoEnabled()) {
627 long endTime = System.currentTimeMillis() - startTime;
628 LOGGER.info("All indexes rebuilt in {}", endTime);
633 public void flush(IProgressMonitor progress, Session session) throws IndexException {
634 long startTime = System.currentTimeMillis();
635 SubMonitor monitor = SubMonitor.convert(progress);
636 MemoryIndexing mem = MemoryIndexing.getInstance(session);
639 List<IndexedRelationsSearcherBase> searchers = mem.getAllSearchers();
640 int count = searchers.size();
641 Semaphore sema = new Semaphore(0);
642 searchers.stream().parallel().forEach(s -> {
643 LockHandle handle = lock(session, Pair.make(s.getRelation(), s.getInput()), true);
645 if (s.isIndexAvailable() && s instanceof IndexedRelationsSearcher) {
646 IndexedRelationsSearcher searcher = (IndexedRelationsSearcher) s;
648 List<Object[]> os = searcher.cache.allDocs(monitor, session);
649 searcher.applyChanges(monitor, session, searcher.cache.r, os);
650 } catch (Exception e) {
651 LOGGER.error("Could not flush in-memory changes to on-disk index", e);
655 s.changeState(monitor, session, State.READY);
662 long totalTime = System.currentTimeMillis() - startTime;
663 LOGGER.info("Database index cache flush done in {} ms", totalTime);
664 } catch (InterruptedException e) {
665 LOGGER.error("Index searcher flush interrupted", e);