/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.indexing; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.lucene.queryparser.classic.ParseException; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.SubMonitor; import org.simantics.db.ReadGraph; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.common.request.UniqueRead; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.InvalidResourceReferenceException; import org.simantics.db.indexing.IndexedRelationsSearcherBase.State; import org.simantics.db.layer0.adapter.GenericRelation; import org.simantics.db.layer0.genericrelation.IndexException; import org.simantics.db.layer0.genericrelation.IndexedRelations; import org.simantics.db.service.QueryControl; import org.simantics.db.service.SerialisationSupport; import org.simantics.utils.datastructures.Pair; /** * @author Tuukka Lehtonen * @author Antti Villberg */ public class IndexedRelationsImpl implements IndexedRelations { Map indexLocks = new WeakHashMap(); static class LockHandle { public final Object id; public final Lock lock; public LockHandle(Object id, Lock lock) { this.id = id; this.lock = lock; } public void unlock() { if (IndexPolicy.TRACE_INDEX_LOCKING) System.out.println("Unlocking index " + id); lock.unlock(); } } static class RWLock { public final Object id; public final ReentrantReadWriteLock lock; public RWLock(Object id) { this.id = id; this.lock = new ReentrantReadWriteLock(true); } LockHandle lock(RequestProcessor processor, boolean write) { Lock l = write ? lock.writeLock() : lock.readLock(); if(processor instanceof ReadGraph) { ReadGraph graph = (ReadGraph)processor; while(!l.tryLock()) { QueryControl qc = processor.getService(QueryControl.class); boolean executed = qc.resume(graph); if(!executed) { try { Thread.sleep(1); } catch (InterruptedException e) { } } } } else { l.lock(); } if (IndexPolicy.TRACE_INDEX_LOCKING) System.out.println("Locked index " + id); return new LockHandle(id, l); } LockHandle tryLock(RequestProcessor processor, boolean write) { Lock l = write ? lock.writeLock() : lock.readLock(); if(l.tryLock()) return new LockHandle(id, l); else return null; } } private LockHandle lock(RequestProcessor processor, Object lockIdentifier, boolean write) { RWLock rwlock = null; synchronized (indexLocks) { rwlock = indexLocks.get(lockIdentifier); if (rwlock == null) { rwlock = new RWLock(lockIdentifier); indexLocks.put(lockIdentifier, rwlock); } } return rwlock.lock(processor, write); } private LockHandle tryLock(RequestProcessor processor, Object lockIdentifier, boolean write) { RWLock rwlock = null; synchronized (indexLocks) { rwlock = indexLocks.get(lockIdentifier); if (rwlock == null) { rwlock = new RWLock(lockIdentifier); indexLocks.put(lockIdentifier, rwlock); } } return rwlock.tryLock(processor, write); } private IndexedRelationsSearcherBase makeSearcher(final RequestProcessor processor, final Resource relation, final Resource input) { try { return processor.syncRequest(new UniqueRead() { @Override public IndexedRelationsSearcherBase perform(ReadGraph graph) throws DatabaseException { if(graph.isImmutable(input)) { return MemoryIndexing.getInstance(processor.getSession()).getImmutable(processor, relation, input); } else { return MemoryIndexing.getInstance(processor.getSession()).get(processor, relation, input); } } }); } catch (DatabaseException e) { throw new IllegalStateException(e); } } private LockHandle waitLoaded(SubMonitor progress, final IndexedRelationsSearcherBase searcher, RequestProcessor processor, LockHandle lock, final Object lockId, final Resource input) throws IndexException { // Initial state: we are locked, no news about the index // Final state: we are locked and the index has been loaded, the current lock is returned // First just check if the index is loaded if (searcher.isIndexAvailable()) { // We have an index - try to start access searcher.startAccess(progress.newChild(50), processor.getSession(), false); // At this point we have three options: // 1. we have access if(searcher.hasAccess(false)) return lock; // 2. something is wrong and the index cannot be cleaned if(searcher.checkState(State.PROBLEM)) throw new IndexException("Searcher is in problematic state", searcher.getException()); // 3. something was wrong, but the index has been successfully cleaned } if(!searcher.checkState(State.NONE)) throw new IndexException("Illegal searcher state, contact application support."); // We loop until the index is loaded while(true) { // With ReadGraph we can proceed to initialize if(processor instanceof ReadGraph) { // (re)create the index. try { SerialisationSupport ss = processor.getService(SerialisationSupport.class); searcher.initializeIndex(progress.newChild(40), (ReadGraph)processor, new Object[] { ss.getRandomAccessId(input) }, true); searcher.setReady(); searcher.startAccess(progress.newChild(10), processor.getSession(), false); if(searcher.hasAccess(false)) return lock; } catch (IOException e) { searcher.setProblem(e); throw new IndexException(e); } catch (DatabaseException e) { searcher.setProblem(e); throw new IndexException(e); } } // With session we schedule the job else { // Release lock lock.unlock(); // final Semaphore s = new Semaphore(0); // Schedule job boolean success = false; int tries = 0; while(!success && (++tries)<10) { try { success = processor.sync(new UniqueRead() { @Override public Boolean perform(ReadGraph graph) throws DatabaseException { // Obtain lock LockHandle lock = tryLock(graph, lockId, true); if(lock == null) return false; try { boolean loaded = false; if (searcher.isIndexAvailable()) { searcher.startAccess(null, graph.getSession(), false); // At this point we have three options: // 1. we have access if(searcher.hasAccess(false)) loaded = true; // 2. something is wrong and the index cannot be cleaned if(searcher.checkState(State.PROBLEM)) throw new DatabaseException("Searcher is in problematic state", searcher.getException()); // 3. something was wrong, but the index has been successfully cleaned } if(!loaded) { if(!searcher.checkState(State.NONE)) throw new DatabaseException("Illegal searcher state, contact application support."); try { SerialisationSupport ss = graph.getService(SerialisationSupport.class); searcher.initializeIndex(null, graph, new Object[] { ss.getRandomAccessId(input) }, true); searcher.setReady(); } catch (IOException e) { searcher.setProblem(e); throw new DatabaseException(e); } } } finally { lock.unlock(); // s.release(); } return true; } }); } catch (DatabaseException e) { throw new IndexException(e); } } if(!success) throw new IndexException("Did not manage to load index. Contact application support."); // Try again // Obtain lock lock= lock(processor, lockId, true); if (searcher.isIndexAvailable()) { searcher.startAccess(progress.newChild(50), processor.getSession(), false); if(searcher.hasAccess(false)) return lock; throw new IndexException("Illegal searcher state, contact application support."); } } } } @Override public List queryResources(IProgressMonitor monitor, String search, RequestProcessor processor, Resource relation, final Resource input, int maxResultCount) { if (processor == null) throw new IllegalArgumentException("null processor"); if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); if (search == null) throw new IllegalArgumentException("null search criterion"); SubMonitor progress = SubMonitor.convert(monitor, 100); // Look for existing index. // Indexes always exist in secondary storage, i.e. disk. // Indexes can be cached in memory when necessary performance-wise. final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input); final Object lockId = Pair.make(relation, input); LockHandle lock = lock(processor, lockId, false); // Ensure that index is loaded & ready lock = waitLoaded(progress, searcher, processor, lock, lockId, input); // Perform query try { return searcher.doSearchResources(progress.newChild(50), processor, search, maxResultCount); } catch (ParseException e) { // FIXME: should throw an exception, not just ignore. e.printStackTrace(); return Collections.emptyList(); //throw new IndexException(e); } catch (IOException e) { throw new IndexException(e); } catch (DatabaseException e) { throw new IndexException(e); } finally { lock.unlock(); } } @Override public List> query(IProgressMonitor monitor, String search, RequestProcessor processor, Resource relation, final Resource input, int maxResultCount) { if (processor == null) throw new IllegalArgumentException("null processor"); if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); if (search == null) throw new IllegalArgumentException("null search criterion"); SubMonitor progress = SubMonitor.convert(monitor, 100); // Look for existing index. // Indexes always exist in secondary storage, i.e. disk. // Indexes can be cached in memory when necessary performance-wise. final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input); final Object lockId = Pair.make(relation, input); LockHandle lock = lock(processor, lockId, false); // Ensure that index is loaded & ready lock = waitLoaded(progress, searcher, processor, lock, lockId, input); // Perform query try { return searcher.doSearch(progress.newChild(50), processor, search, maxResultCount); } catch (ParseException e) { // FIXME: should throw an exception, not just ignore. e.printStackTrace(); return Collections.emptyList(); //throw new IndexException(e); } catch (IOException e) { throw new IndexException(e); } catch (DatabaseException e) { throw new IndexException(e); } finally { lock.unlock(); } } @Override public void insert(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation, Resource relationResource, Resource input, Collection documents) { // System.out.println("Inserting to index: " + input + " " + documents); if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); if (documents == null) throw new IllegalArgumentException("null documents"); if (documents.isEmpty()) return; final SubMonitor progress = SubMonitor.convert(monitor, 100); final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input); LockHandle handle = lock(processor, Pair.make(relationResource, input), true); try { if(!searcher.startAccess(null, processor.getSession(), true)) { // Could not write index for some reason. Ignore and let the next index query reinitialize the index. return; } searcher.insertIndex(progress.newChild(40), relation, 1, documents); DatabaseIndexing.markIndexChanged(searcher.getIndexPath()); } catch (InvalidResourceReferenceException e) { throw new IndexException(e); } catch (IOException e) { throw new IndexException(e); } catch (DatabaseException e) { throw new IndexException(e); } finally { handle.unlock(); } } @Override public void remove(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation, Resource relationResource, Resource input, String key, Collection keyValues) { if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); if (key == null) throw new IllegalArgumentException("null key"); SubMonitor progress = SubMonitor.convert(monitor, 100); IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input); LockHandle handle = lock(processor, Pair.make(relationResource, input), true); try { if(!searcher.startAccess(null, processor.getSession(), true)) { // Could not write index for some reason. Ignore and let the next index query reinitialize the index. return; } searcher.removeIndex(progress.newChild(40), relation, processor, key, keyValues); DatabaseIndexing.markIndexChanged(searcher.getIndexPath()); } catch (DatabaseException e) { throw new IndexException(e); } catch (IOException e) { throw new IndexException(e); } finally { handle.unlock(); } } @Override public void removeAll(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation, Resource relationResource, Resource input) { if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input); LockHandle handle = lock(processor, Pair.make(relationResource, input), true); try { Throwable t = searcher.bestEffortClear(monitor, processor.getSession()); if(t != null) searcher.setProblem(t); else searcher.setNone(); } finally { handle.unlock(); } } @Override public boolean replace(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation, Resource relationResource, Resource input, String key, Collection keyValues, Collection documents) { if (relation == null) throw new IllegalArgumentException("null relation"); if (input == null) throw new IllegalArgumentException("null input"); if (key == null) throw new IllegalArgumentException("null key"); SubMonitor progress = SubMonitor.convert(monitor, 100); IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input); LockHandle handle = lock(processor, Pair.make(relationResource, input), true); boolean didChange = false; try { if(!searcher.startAccess(null, processor.getSession(), true)) { // Could not write index for some reason. Ignore and let the next index query reinitialize the index. return true; } didChange = searcher.replaceIndex(progress.newChild(40), key, keyValues, relation, 1, documents); if(didChange) DatabaseIndexing.markIndexChanged(searcher.getIndexPath()); } catch (InvalidResourceReferenceException e) { throw new IndexException(e); } catch (IOException e) { throw new IndexException(e); } catch (DatabaseException e) { throw new IndexException(e); } catch (Throwable t) { throw new IndexException(t); } finally { handle.unlock(); } return didChange; } @Override public void reset(IProgressMonitor monitor, RequestProcessor processor, Resource relationResource, Resource input) throws IndexException { IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input); LockHandle handle = lock(processor, Pair.make(relationResource, input), true); try { searcher.changeState(monitor, processor.getSession(), State.NONE); if (!searcher.checkState(State.NONE)) throw new IndexException("Could not close index for input " + input + " before removing it"); File path = DatabaseIndexing.getIndexLocation(processor.getSession(), relationResource, input); DatabaseIndexing.deleteIndex(path); } catch (IOException e) { throw new IndexException(e); } finally { handle.unlock(); } } }