import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.document.Document;
import org.simantics.db.common.request.SafeName;
import org.simantics.db.common.utils.NameUtils;
import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.indexing.exception.IndexCorruptedException;
+import org.simantics.db.indexing.exception.IndexingException;
import org.simantics.db.indexing.internal.IndexingJob;
import org.simantics.db.layer0.adapter.GenericRelation;
+import org.simantics.db.layer0.genericrelation.IndexException;
import org.simantics.db.request.Read;
import org.simantics.db.service.CollectionSupport;
import org.simantics.db.service.SerialisationSupport;
import org.simantics.utils.FileUtils;
import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.ThreadUtils;
import org.slf4j.Logger;
import gnu.trove.map.hash.THashMap;
this.state = State.READY;
}
+ public State state() {
+ return state;
+ }
+
protected boolean checkState(State state) {
return this.state == state;
}
- protected void assertState(State state) throws AssertionError {
-
- if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name());
-
+ protected void assertState(State state) throws IndexException {
+ State s = this.state;
+ if (s != state)
+ throw new IndexException("Illegal index searcher state, expected " + state.name() + " but state was " + s.name());
}
public void changeState(IProgressMonitor monitor, Session session, State state) {
STRING_TYPE.freeze();
}
- protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException {
+ protected static Field makeField(String fieldName, String fieldClass) throws IndexingException {
switch (fieldClass) {
case "Long": return new LongField(fieldName, 0L, Field.Store.YES);
case "String": return new Field (fieldName, "", STRING_TYPE);
case "Text": return new TextField(fieldName, "", Field.Store.YES);
default:
- throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass);
+ throw new IndexingException("Can only index Long, String and Text fields, encountered field type " + fieldClass);
}
}
removedTerm = new Term(key, (String) keyValue);
} else {
// FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state
+ getLogger().error("Attempting to remove document from index of {} with key {} and unrecognized key value type {} : {}", input, key, keyValue, keyValue != null ? keyValue.getClass() : "null");
continue;
}
boolean done = false;
if(requireChangeInfoOnReplace()) {
TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2);
- if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) {
+ if(exist.scoreDocs.length == 1) {
Document doc = reader.document(exist.scoreDocs[0].doc);
if(!areSame(doc, document)) {
writer.deleteDocuments(removedTerm);
*/
final IndexSchema schema;
- Resource input;
+ final Resource input;
Path indexPath;
this.schema = IndexSchema.readFromRelation(session, relation);
}
+ public Resource getRelation() {
+ return relation;
+ }
+
+ public Resource getInput() {
+ return input;
+ }
+
Directory getDirectory(Session session) throws IOException {
return FSDirectory.open(indexPath.toFile());
}
}
}
- private static String getPattern(GenericRelation relation, int boundCount) {
+ public static String getPattern(GenericRelation relation, int boundCount) {
String result = "";
for (int i = 0; i < boundCount; i++)
result += "b";
result += "f";
return result;
}
-
- private static final int INDEXING_THREAD_COUNT = 2;
-
- private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "Lucene Index Creator");
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- });
void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)
throws IOException, DatabaseException
"Reindexing " + NameUtils.getSafeLabel(graph, input),
mon -> {
try {
- initializeIndexImpl(mon, graph, bound, overwrite);
+ GenericRelation r = graph.adapt(relation, GenericRelation.class);
+ if (r == null)
+ throw new IndexingException("Given resource " + relation + "could not be adapted to GenericRelation.");
+
+ GenericRelation selection = r.select(getPattern(r, bound.length), bound);
+
+ List<Object[]> results = selection.realize(graph);
+ initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite);
} catch (IOException e) {
getLogger().error("Index is in problematic state! {}", this, e);
- throw new DatabaseException(e);
+ throw new IndexingException(e);
}
});
}
- void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,
- DatabaseException {
-
- final SubMonitor mon = SubMonitor.convert(monitor, 100);
-
- if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
- mon.beginTask("Initializing Index", 100);
-
- if (overwrite) {
- if (Files.exists(indexPath)) {
- mon.subTask("Erasing previous index");
- if (getLogger().isDebugEnabled())
- getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath());
- FileUtils.delete(indexPath);
- }
- }
-
- final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
- final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+ private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene
+ void initializeIndexImpl(CompletableFuture<?> result, IProgressMonitor monitor, GenericRelation r, List<Object[]> results, final Object[] bound, boolean overwrite) throws IOException {
try {
- mon.subTask("Start index write");
- createDirectory(indexPath);
-
- directory.set(FSDirectory.open(indexPath.toFile()));
- IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
- writer.set(new IndexWriter(directory.get(), conf));
-
- mon.worked(5);
-
- final GenericRelation r = graph.adapt(relation, GenericRelation.class);
- if (r == null)
- throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))
- + "could not be adapted to GenericRelation.");
-
- long realizeStart = 0;
- if (IndexPolicy.PERF_INDEX_INIT)
- realizeStart = System.nanoTime();
-
- mon.subTask("Calculating indexed content");
- GenericRelation selection = r.select(getPattern(r, bound.length), bound);
- mon.worked(5);
- List<Object[]> results = selection.realize(graph);
- mon.worked(40);
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+ final SubMonitor mon = SubMonitor.convert(monitor, 100);
+
if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
-
- long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
-
- mon.subTask("Indexing content");
- final Semaphore s = new Semaphore(0);
- mon.setWorkRemaining(results.size());
-
- for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
- final int startIndex = i;
- executor.submit(() -> {
- try {
- Document document = new Document();
- Field[] fs = makeFieldsForRelation(r, bound.length, document);
-
- for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
- if (setFields(fs, results.get(index)) == null)
- continue;
- try {
- writer.get().addDocument(document);
- } catch (CorruptIndexException e) {
- getLogger().error("Index is corrupted! {}", this, e);
- throw new IllegalStateException(e);
- } catch (IOException e) {
- getLogger().error("Index is in problematic state! {}", this, e);
- throw new IllegalStateException(e);
- } finally {
- synchronized (mon) {
- mon.worked(1);
+ System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
+ mon.beginTask("Initializing Index", 100);
+
+ if (overwrite) {
+ if (Files.exists(indexPath)) {
+ mon.subTask("Erasing previous index");
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath());
+ FileUtils.emptyDirectory(indexPath);
+ }
+ }
+
+ final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
+ final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+
+ try {
+ mon.subTask("Start index write");
+ Files.createDirectories(indexPath);
+
+ directory.set(FSDirectory.open(indexPath.toFile()));
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
+ writer.set(new IndexWriter(directory.get(), conf));
+
+ mon.worked(5);
+
+ long realizeStart = 0;
+ if (IndexPolicy.PERF_INDEX_INIT)
+ realizeStart = System.nanoTime();
+
+ mon.subTask("Calculating indexed content");
+ mon.worked(5);
+
+ mon.worked(40);
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+
+ if (IndexPolicy.TRACE_INDEX_INIT)
+ System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
+
+ long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
+
+ mon.subTask("Indexing content");
+ final Semaphore s = new Semaphore(0);
+ mon.setWorkRemaining(results.size());
+ for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
+ final int startIndex = i;
+ ThreadUtils.getBlockingWorkExecutor().submit(() -> {
+ try {
+ Document document = new Document();
+ Field[] fs = makeFieldsForRelation(r, bound.length, document);
+
+ for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
+ if (setFields(fs, results.get(index)) == null)
+ continue;
+ try {
+ writer.get().addDocument(document);
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new IllegalStateException(e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new IllegalStateException(e);
+ } finally {
+ synchronized (mon) {
+ mon.worked(1);
+ }
}
}
+ } catch (DatabaseException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ s.release();
}
- } catch (DatabaseException e) {
- throw new IllegalStateException(e);
- } finally {
- s.release();
- }
- });
- }
-
- try {
- s.acquire(INDEXING_THREAD_COUNT);
- } catch (InterruptedException e) {
- getLogger().error("Could not initialize index {}", this, e);
- }
-
- // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
- // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
- // advise against calling optimize at all. So let's not do it anymore.
- //writer.get().optimize();
- //writer.get().commit();
-
- mon.subTask("Flushing");
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
-
- } catch (DatabaseException e) {
- getLogger().error("Could not initialize index due to db {}", this, e);
- } finally {
- try {
- closeWriter(writer.getAndSet(null));
+ });
+ }
+
+ try {
+ s.acquire(INDEXING_THREAD_COUNT);
+ } catch (InterruptedException e) {
+ getLogger().error("Could not initialize index {}", this, e);
+ }
+
+ // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
+ // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
+ // advise against calling optimize at all. So let's not do it anymore.
+ //writer.get().optimize();
+ //writer.get().commit();
+
+ mon.subTask("Flushing");
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
+
+ result.complete(null);
+ // } catch (DatabaseException e) {
+ // getLogger().error("Could not initialize index due to db {}", this, e);
} finally {
- FileUtils.uncheckedClose(directory.getAndSet(null));
+ try {
+ closeWriter(writer.getAndSet(null));
+ } finally {
+ FileUtils.uncheckedClose(directory.getAndSet(null));
+ }
}
+ } catch (Throwable t) {
+ getLogger().error("Could not initialize index", t);
+ result.completeExceptionally(t);
}
}
- public List<Object[]> debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException {
+ public List<Object[]> debugDocs(IProgressMonitor monitor) throws ParseException, IOException, IndexingException {
Query query = new MatchAllDocsQuery();
} catch (CorruptIndexException e) {
getLogger().error("Index is corrupted! {}", this, e);
- throw new DatabaseException(e);
+ throw new IndexCorruptedException("Index is corrupted! " + this, e);
} catch (IOException e) {
getLogger().error("Index is in problematic state! {}", this, e);
- throw new DatabaseException(e);
+ throw new IndexingException(e);
}
}
List<Map<String, Object>> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,
- DatabaseException {
+ IndexingException {
// An empty search string will crash QueryParser
// Just return no results for empty queries.
return Collections.emptyList();
}
- return processor.syncRequest(new Read<List<Map<String, Object>>>() {
-
- @Override
- public List<Map<String, Object>> perform(ReadGraph graph) throws DatabaseException {
-
- GenericRelation r = graph.adapt(relation, GenericRelation.class);
- if (r == null)
- throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))
- + "could not be adapted to GenericRelation.");
+ try {
+ return processor.syncRequest(new Read<List<Map<String, Object>>>() {
- SerialisationSupport support = graph.getService(SerialisationSupport.class);
+ @Override
+ public List<Map<String, Object>> perform(ReadGraph graph) throws DatabaseException {
- List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(docs.scoreDocs.length);
-
- final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
-
- for (ScoreDoc scoreDoc : docs.scoreDocs) {
+ GenericRelation r = graph.adapt(relation, GenericRelation.class);
+ if (r == null)
+ throw new IndexingException("Given resource " + graph.syncRequest(new SafeName(relation))
+ + "could not be adapted to GenericRelation.");
- try {
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
- reader.document(scoreDoc.doc, visitor);
-
- Document doc = visitor.getDocument();
-
- List<IndexableField> fs = doc.getFields();
- Map<String, Object> entry = new THashMap<String, Object>(fs.size());
- for (IndexableField f : fs) {
- IndexSchema.Type type = schema.typeMap.get(f.name());
- if (type == IndexSchema.Type.LONG) {
- entry.put(f.name(), support.getResource(f.numericValue().longValue()));
- } else {
- entry.put(f.name(), f.stringValue());
+ List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(docs.scoreDocs.length);
+
+ final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
+
+ for (ScoreDoc scoreDoc : docs.scoreDocs) {
+
+ try {
+
+ reader.document(scoreDoc.doc, visitor);
+
+ Document doc = visitor.getDocument();
+
+ List<IndexableField> fs = doc.getFields();
+ Map<String, Object> entry = new THashMap<String, Object>(fs.size());
+ for (IndexableField f : fs) {
+ IndexSchema.Type type = schema.typeMap.get(f.name());
+ if (type == IndexSchema.Type.LONG) {
+ entry.put(f.name(), support.getResource(f.numericValue().longValue()));
+ } else {
+ entry.put(f.name(), f.stringValue());
+ }
}
+
+ result.add(entry);
+
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new IndexingException(e);
}
-
- result.add(entry);
-
- } catch (CorruptIndexException e) {
- getLogger().error("Index is corrupted! {}", this, e);
- throw new DatabaseException(e);
- } catch (IOException e) {
- getLogger().error("Index is in problematic state! {}", this, e);
- throw new DatabaseException(e);
}
+ return result;
}
- return result;
+ });
+ } catch (DatabaseException e) {
+ if (e instanceof IndexingException) {
+ throw (IndexingException) e;
+ } else {
+ throw new IndexingException(e);
}
- });
+ }
}
static class ResourceVisitor extends StoredFieldVisitor {
}
List<Resource> doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,
- DatabaseException {
+ IndexingException {
// An empty search string will crash QueryParser
// Just return no results for empty queries.
return Collections.emptyList();
}
- return processor.syncRequest(new Read<List<Resource>>() {
+ try {
+ return processor.syncRequest(new Read<List<Resource>>() {
- @Override
- public List<Resource> perform(ReadGraph graph) throws DatabaseException {
+ @Override
+ public List<Resource> perform(ReadGraph graph) throws DatabaseException {
- CollectionSupport cs = graph.getService(CollectionSupport.class);
- SerialisationSupport support = graph.getService(SerialisationSupport.class);
-
- List<Resource> result = cs.createList();
-
- ResourceVisitor visitor = new ResourceVisitor();
-
- for (ScoreDoc scoreDoc : docs.scoreDocs) {
- try {
- reader.document(scoreDoc.doc, visitor);
- result.add(support.getResource(visitor.id));
- } catch (CorruptIndexException e) {
- getLogger().error("Index is corrupted! {}", this, e);
- throw new DatabaseException(e);
- } catch (IOException e) {
- getLogger().error("Index is in problematic state! {}", this, e);
- throw new DatabaseException(e);
+ CollectionSupport cs = graph.getService(CollectionSupport.class);
+ SerialisationSupport support = graph.getService(SerialisationSupport.class);
+
+ List<Resource> result = cs.createList();
+
+ ResourceVisitor visitor = new ResourceVisitor();
+
+ for (ScoreDoc scoreDoc : docs.scoreDocs) {
+ try {
+ reader.document(scoreDoc.doc, visitor);
+ result.add(support.getResource(visitor.id));
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new IndexingException(e);
+ }
}
+ return result;
}
- return result;
+ });
+ } catch (DatabaseException e) {
+ if (e instanceof IndexingException) {
+ throw (IndexingException) e;
+ } else {
+ throw new IndexingException(e);
}
- });
+ }
}
List<Object> doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException,
- DatabaseException {
+ IndexingException {
assertAccessOpen(false);
} catch (CorruptIndexException e) {
getLogger().error("Index is corrupted! {}", this, e);
- throw new DatabaseException(e);
+ throw new IndexCorruptedException("Index is corrupted! " + " " + this + " " + scoreDoc, e);
} catch (IOException e) {
getLogger().error("Index is in problematic state! {}", this, e);
- throw new DatabaseException(e);
+ throw new IndexingException(e);
}
}
return path;
}
- private static void createDirectory(Path path) throws IOException {
- if (Files.exists(path) && !Files.isDirectory(path))
- throw new IOException("Could not create index directory " + path + ", a file by that name already exists");
- Files.createDirectories(path);
- }
-
Path getIndexPath() {
return indexPath;
}
boolean isIndexAvailable() {
- return (Files.exists(indexPath) && Files.isDirectory(indexPath));
+ return Files.isDirectory(indexPath);
}
-
+
abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session);
/*