X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.indexing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Findexing%2FIndexedRelationsSearcherBase.java;h=5df5d68c114d37dc98de59bd6a45100e81dfda3d;hb=HEAD;hp=9441bb75b8ee5a4a3c7fd419ac725e70778cd3ca;hpb=8b8630b93f8e4ec88a8a4eb53b55031407deedc0;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java index 9441bb75b..5df5d68c1 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java @@ -21,10 +21,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.document.Document; @@ -64,13 +62,17 @@ import org.simantics.db.Session; import org.simantics.db.common.request.SafeName; import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.DatabaseException; +import org.simantics.db.indexing.exception.IndexCorruptedException; +import org.simantics.db.indexing.exception.IndexingException; import org.simantics.db.indexing.internal.IndexingJob; import org.simantics.db.layer0.adapter.GenericRelation; +import org.simantics.db.layer0.genericrelation.IndexException; import org.simantics.db.request.Read; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.utils.FileUtils; import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.ThreadUtils; import org.slf4j.Logger; import gnu.trove.map.hash.THashMap; @@ -116,14 +118,18 @@ abstract public class IndexedRelationsSearcherBase { this.state = State.READY; } + public State state() { + return state; + } + protected boolean checkState(State state) { return this.state == state; } - protected void assertState(State state) throws AssertionError { - - if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name()); - + protected void assertState(State state) throws IndexException { + State s = this.state; + if (s != state) + throw new IndexException("Illegal index searcher state, expected " + state.name() + " but state was " + s.name()); } public void changeState(IProgressMonitor monitor, Session session, State state) { @@ -160,12 +166,14 @@ abstract public class IndexedRelationsSearcherBase { // Cannot move into read from no index if (State.NONE == this.state && State.READ == state) { - getLogger().info("Cannot move into read from no index in {} with state {}", this, state); + if (getLogger().isDebugEnabled()) + getLogger().debug("Cannot move into read from no index in {} with state {}", this, state); return; } // Cannot move into write from no index if (State.NONE == this.state && State.WRITE == state) { - getLogger().info("Cannot move into write from no index in {} with state {}", this, state); + if (getLogger().isDebugEnabled()) + getLogger().debug("Cannot move into write from no index in {} with state {}", this, state); return; } @@ -263,13 +271,13 @@ abstract public class IndexedRelationsSearcherBase { STRING_TYPE.freeze(); } - protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException { + protected static Field makeField(String fieldName, String fieldClass) throws IndexingException { switch (fieldClass) { case "Long": return new LongField(fieldName, 0L, Field.Store.YES); case "String": return new Field (fieldName, "", STRING_TYPE); case "Text": return new TextField(fieldName, "", Field.Store.YES); default: - throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass); + throw new IndexingException("Can only index Long, String and Text fields, encountered field type " + fieldClass); } } @@ -339,6 +347,7 @@ abstract public class IndexedRelationsSearcherBase { removedTerm = new Term(key, (String) keyValue); } else { // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state + getLogger().error("Attempting to remove document from index of {} with key {} and unrecognized key value type {} : {}", input, key, keyValue, keyValue != null ? keyValue.getClass() : "null"); continue; } @@ -425,7 +434,7 @@ abstract public class IndexedRelationsSearcherBase { boolean done = false; if(requireChangeInfoOnReplace()) { TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2); - if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) { + if(exist.scoreDocs.length == 1) { Document doc = reader.document(exist.scoreDocs[0].doc); if(!areSame(doc, document)) { writer.deleteDocuments(removedTerm); @@ -576,7 +585,7 @@ abstract public class IndexedRelationsSearcherBase { } } - private static String getPattern(GenericRelation relation, int boundCount) { + public static String getPattern(GenericRelation relation, int boundCount) { String result = ""; for (int i = 0; i < boundCount; i++) result += "b"; @@ -584,20 +593,6 @@ abstract public class IndexedRelationsSearcherBase { result += "f"; return result; } - - private static final int INDEXING_THREAD_COUNT = 2; - - private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Lucene Index Creator"); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }); void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite) throws IOException, DatabaseException @@ -607,132 +602,140 @@ abstract public class IndexedRelationsSearcherBase { "Reindexing " + NameUtils.getSafeLabel(graph, input), mon -> { try { - initializeIndexImpl(mon, graph, bound, overwrite); + GenericRelation r = graph.adapt(relation, GenericRelation.class); + if (r == null) + throw new IndexingException("Given resource " + relation + "could not be adapted to GenericRelation."); + + GenericRelation selection = r.select(getPattern(r, bound.length), bound); + + List results = selection.realize(graph); + initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite); } catch (IOException e) { getLogger().error("Index is in problematic state! {}", this, e); - throw new DatabaseException(e); + throw new IndexingException(e); } }); } - void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException, - DatabaseException { - - final SubMonitor mon = SubMonitor.convert(monitor, 100); - - if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); - mon.beginTask("Initializing Index", 100); - - if (overwrite) { - if (Files.exists(indexPath)) { - mon.subTask("Erasing previous index"); - getLogger().info("Erasing previous index {}", indexPath.toAbsolutePath()); - FileUtils.delete(indexPath); - } - } - - final AtomicReference directory = new AtomicReference(); - final AtomicReference writer = new AtomicReference(); + private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene + void initializeIndexImpl(CompletableFuture result, IProgressMonitor monitor, GenericRelation r, List results, final Object[] bound, boolean overwrite) throws IOException { try { - mon.subTask("Start index write"); - createDirectory(indexPath); - - directory.set(FSDirectory.open(indexPath.toFile())); - IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); - writer.set(new IndexWriter(directory.get(), conf)); - - mon.worked(5); - - final GenericRelation r = graph.adapt(relation, GenericRelation.class); - if (r == null) - throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) - + "could not be adapted to GenericRelation."); - - long realizeStart = 0; - if (IndexPolicy.PERF_INDEX_INIT) - realizeStart = System.nanoTime(); - - mon.subTask("Calculating indexed content"); - GenericRelation selection = r.select(getPattern(r, bound.length), bound); - mon.worked(5); - List results = selection.realize(graph); - mon.worked(40); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); + final SubMonitor mon = SubMonitor.convert(monitor, 100); + if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); - - long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; - - mon.subTask("Indexing content"); - final Semaphore s = new Semaphore(0); - mon.setWorkRemaining(results.size()); - - for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { - final int startIndex = i; - executor.submit(() -> { - try { - Document document = new Document(); - Field[] fs = makeFieldsForRelation(r, bound.length, document); - - for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { - if (setFields(fs, results.get(index)) == null) - continue; - try { - writer.get().addDocument(document); - } catch (CorruptIndexException e) { - getLogger().error("Index is corrupted! {}", this, e); - throw new IllegalStateException(e); - } catch (IOException e) { - getLogger().error("Index is in problematic state! {}", this, e); - throw new IllegalStateException(e); - } finally { - synchronized (mon) { - mon.worked(1); + System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); + mon.beginTask("Initializing Index", 100); + + if (overwrite) { + if (Files.exists(indexPath)) { + mon.subTask("Erasing previous index"); + if (getLogger().isDebugEnabled()) + getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath()); + FileUtils.emptyDirectory(indexPath); + } + } + + final AtomicReference directory = new AtomicReference(); + final AtomicReference writer = new AtomicReference(); + + try { + mon.subTask("Start index write"); + Files.createDirectories(indexPath); + + directory.set(FSDirectory.open(indexPath.toFile())); + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); + writer.set(new IndexWriter(directory.get(), conf)); + + mon.worked(5); + + long realizeStart = 0; + if (IndexPolicy.PERF_INDEX_INIT) + realizeStart = System.nanoTime(); + + mon.subTask("Calculating indexed content"); + mon.worked(5); + + mon.worked(40); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); + + if (IndexPolicy.TRACE_INDEX_INIT) + System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); + + long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; + + mon.subTask("Indexing content"); + final Semaphore s = new Semaphore(0); + mon.setWorkRemaining(results.size()); + for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { + final int startIndex = i; + ThreadUtils.getBlockingWorkExecutor().submit(() -> { + try { + Document document = new Document(); + Field[] fs = makeFieldsForRelation(r, bound.length, document); + + for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { + if (setFields(fs, results.get(index)) == null) + continue; + try { + writer.get().addDocument(document); + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new IllegalStateException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new IllegalStateException(e); + } finally { + synchronized (mon) { + mon.worked(1); + } } } + } catch (DatabaseException e) { + throw new IllegalStateException(e); + } finally { + s.release(); } - - s.release(); - } catch (DatabaseException e) { - throw new IllegalStateException(e); - } - }); - } - - try { - s.acquire(INDEXING_THREAD_COUNT); - } catch (InterruptedException e) { - getLogger().error("Could not initialize index {}", this, e); - } - - // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 - // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 - // advise against calling optimize at all. So let's not do it anymore. - //writer.get().optimize(); - //writer.get().commit(); - - mon.subTask("Flushing"); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); - - } catch (DatabaseException e) { - getLogger().error("Could not initialize index due to db {}", this, e); - } finally { - try { - closeWriter(writer.getAndSet(null)); + }); + } + + try { + s.acquire(INDEXING_THREAD_COUNT); + } catch (InterruptedException e) { + getLogger().error("Could not initialize index {}", this, e); + } + + // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 + // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 + // advise against calling optimize at all. So let's not do it anymore. + //writer.get().optimize(); + //writer.get().commit(); + + mon.subTask("Flushing"); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); + + result.complete(null); + // } catch (DatabaseException e) { + // getLogger().error("Could not initialize index due to db {}", this, e); } finally { - FileUtils.uncheckedClose(directory.getAndSet(null)); + try { + closeWriter(writer.getAndSet(null)); + } finally { + FileUtils.uncheckedClose(directory.getAndSet(null)); + } } + } catch (Throwable t) { + getLogger().error("Could not initialize index", t); + result.completeExceptionally(t); } } - public List debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException { + public List debugDocs(IProgressMonitor monitor) throws ParseException, IOException, IndexingException { Query query = new MatchAllDocsQuery(); @@ -756,10 +759,10 @@ abstract public class IndexedRelationsSearcherBase { } catch (CorruptIndexException e) { getLogger().error("Index is corrupted! {}", this, e); - throw new DatabaseException(e); + throw new IndexCorruptedException("Index is corrupted! " + this, e); } catch (IOException e) { getLogger().error("Index is in problematic state! {}", this, e); - throw new DatabaseException(e); + throw new IndexingException(e); } } @@ -770,7 +773,7 @@ abstract public class IndexedRelationsSearcherBase { List> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, - DatabaseException { + IndexingException { // An empty search string will crash QueryParser // Just return no results for empty queries. @@ -803,54 +806,62 @@ abstract public class IndexedRelationsSearcherBase { return Collections.emptyList(); } - return processor.syncRequest(new Read>>() { - - @Override - public List> perform(ReadGraph graph) throws DatabaseException { - - GenericRelation r = graph.adapt(relation, GenericRelation.class); - if (r == null) - throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) - + "could not be adapted to GenericRelation."); + try { + return processor.syncRequest(new Read>>() { - SerialisationSupport support = graph.getService(SerialisationSupport.class); + @Override + public List> perform(ReadGraph graph) throws DatabaseException { - List> result = new ArrayList>(docs.scoreDocs.length); - - final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor(); - - for (ScoreDoc scoreDoc : docs.scoreDocs) { + GenericRelation r = graph.adapt(relation, GenericRelation.class); + if (r == null) + throw new IndexingException("Given resource " + graph.syncRequest(new SafeName(relation)) + + "could not be adapted to GenericRelation."); - try { + SerialisationSupport support = graph.getService(SerialisationSupport.class); - reader.document(scoreDoc.doc, visitor); - - Document doc = visitor.getDocument(); - - List fs = doc.getFields(); - Map entry = new THashMap(fs.size()); - for (IndexableField f : fs) { - IndexSchema.Type type = schema.typeMap.get(f.name()); - if (type == IndexSchema.Type.LONG) { - entry.put(f.name(), support.getResource(f.numericValue().longValue())); - } else { - entry.put(f.name(), f.stringValue()); + List> result = new ArrayList>(docs.scoreDocs.length); + + final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor(); + + for (ScoreDoc scoreDoc : docs.scoreDocs) { + + try { + + reader.document(scoreDoc.doc, visitor); + + Document doc = visitor.getDocument(); + + List fs = doc.getFields(); + Map entry = new THashMap(fs.size()); + for (IndexableField f : fs) { + IndexSchema.Type type = schema.typeMap.get(f.name()); + if (type == IndexSchema.Type.LONG) { + entry.put(f.name(), support.getResource(f.numericValue().longValue())); + } else { + entry.put(f.name(), f.stringValue()); + } } + + result.add(entry); + + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new IndexingException(e); } - - result.add(entry); - - } catch (CorruptIndexException e) { - getLogger().error("Index is corrupted! {}", this, e); - throw new DatabaseException(e); - } catch (IOException e) { - getLogger().error("Index is in problematic state! {}", this, e); - throw new DatabaseException(e); } + return result; } - return result; + }); + } catch (DatabaseException e) { + if (e instanceof IndexingException) { + throw (IndexingException) e; + } else { + throw new IndexingException(e); } - }); + } } static class ResourceVisitor extends StoredFieldVisitor { @@ -896,7 +907,7 @@ abstract public class IndexedRelationsSearcherBase { } List doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, - DatabaseException { + IndexingException { // An empty search string will crash QueryParser // Just return no results for empty queries. @@ -929,37 +940,45 @@ abstract public class IndexedRelationsSearcherBase { return Collections.emptyList(); } - return processor.syncRequest(new Read>() { + try { + return processor.syncRequest(new Read>() { - @Override - public List perform(ReadGraph graph) throws DatabaseException { + @Override + public List perform(ReadGraph graph) throws DatabaseException { - CollectionSupport cs = graph.getService(CollectionSupport.class); - SerialisationSupport support = graph.getService(SerialisationSupport.class); - - List result = cs.createList(); - - ResourceVisitor visitor = new ResourceVisitor(); - - for (ScoreDoc scoreDoc : docs.scoreDocs) { - try { - reader.document(scoreDoc.doc, visitor); - result.add(support.getResource(visitor.id)); - } catch (CorruptIndexException e) { - getLogger().error("Index is corrupted! {}", this, e); - throw new DatabaseException(e); - } catch (IOException e) { - getLogger().error("Index is in problematic state! {}", this, e); - throw new DatabaseException(e); + CollectionSupport cs = graph.getService(CollectionSupport.class); + SerialisationSupport support = graph.getService(SerialisationSupport.class); + + List result = cs.createList(); + + ResourceVisitor visitor = new ResourceVisitor(); + + for (ScoreDoc scoreDoc : docs.scoreDocs) { + try { + reader.document(scoreDoc.doc, visitor); + result.add(support.getResource(visitor.id)); + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new IndexingException(e); + } } + return result; } - return result; + }); + } catch (DatabaseException e) { + if (e instanceof IndexingException) { + throw (IndexingException) e; + } else { + throw new IndexingException(e); } - }); + } } List doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException, - DatabaseException { + IndexingException { assertAccessOpen(false); @@ -979,10 +998,10 @@ abstract public class IndexedRelationsSearcherBase { } catch (CorruptIndexException e) { getLogger().error("Index is corrupted! {}", this, e); - throw new DatabaseException(e); + throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e); } catch (IOException e) { getLogger().error("Index is in problematic state! {}", this, e); - throw new DatabaseException(e); + throw new IndexingException(e); } } @@ -997,20 +1016,14 @@ abstract public class IndexedRelationsSearcherBase { return path; } - private static void createDirectory(Path path) throws IOException { - if (Files.exists(path) && !Files.isDirectory(path)) - throw new IOException("Could not create index directory " + path + ", a file by that name already exists"); - Files.createDirectories(path); - } - Path getIndexPath() { return indexPath; } boolean isIndexAvailable() { - return (Files.exists(indexPath) && Files.isDirectory(indexPath)); + return Files.isDirectory(indexPath); } - + abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session); /*