import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.document.Document;
import org.simantics.db.service.SerialisationSupport;
import org.simantics.utils.FileUtils;
import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.ThreadUtils;
import org.slf4j.Logger;
import gnu.trove.map.hash.THashMap;
// Cannot move into read from no index
if (State.NONE == this.state && State.READ == state) {
- getLogger().info("Cannot move into read from no index in {} with state {}", this, state);
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Cannot move into read from no index in {} with state {}", this, state);
return;
}
// Cannot move into write from no index
if (State.NONE == this.state && State.WRITE == state) {
- getLogger().info("Cannot move into write from no index in {} with state {}", this, state);
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Cannot move into write from no index in {} with state {}", this, state);
return;
}
}
}
- private static String getPattern(GenericRelation relation, int boundCount) {
+ public static String getPattern(GenericRelation relation, int boundCount) {
String result = "";
for (int i = 0; i < boundCount; i++)
result += "b";
result += "f";
return result;
}
-
- private static final int INDEXING_THREAD_COUNT = 2;
-
- private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "Lucene Index Creator");
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- });
void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)
throws IOException, DatabaseException
"Reindexing " + NameUtils.getSafeLabel(graph, input),
mon -> {
try {
- initializeIndexImpl(mon, graph, bound, overwrite);
+ GenericRelation r = graph.adapt(relation, GenericRelation.class);
+ if (r == null)
+ throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation.");
+
+ GenericRelation selection = r.select(getPattern(r, bound.length), bound);
+
+ List<Object[]> results = selection.realize(graph);
+ initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite);
} catch (IOException e) {
getLogger().error("Index is in problematic state! {}", this, e);
throw new DatabaseException(e);
});
}
- void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,
- DatabaseException {
-
- final SubMonitor mon = SubMonitor.convert(monitor, 100);
-
- if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
- mon.beginTask("Initializing Index", 100);
-
- if (overwrite) {
- if (Files.exists(indexPath)) {
- mon.subTask("Erasing previous index");
- getLogger().info("Erasing previous index {}", indexPath.toAbsolutePath());
- FileUtils.delete(indexPath);
- }
- }
-
- final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
- final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+ private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene
+ void initializeIndexImpl(CompletableFuture<?> result, IProgressMonitor monitor, GenericRelation r, List<Object[]> results, final Object[] bound, boolean overwrite) throws IOException {
try {
- mon.subTask("Start index write");
- createDirectory(indexPath);
-
- directory.set(FSDirectory.open(indexPath.toFile()));
- IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
- writer.set(new IndexWriter(directory.get(), conf));
-
- mon.worked(5);
-
- final GenericRelation r = graph.adapt(relation, GenericRelation.class);
- if (r == null)
- throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))
- + "could not be adapted to GenericRelation.");
-
- long realizeStart = 0;
- if (IndexPolicy.PERF_INDEX_INIT)
- realizeStart = System.nanoTime();
-
- mon.subTask("Calculating indexed content");
- GenericRelation selection = r.select(getPattern(r, bound.length), bound);
- mon.worked(5);
- List<Object[]> results = selection.realize(graph);
- mon.worked(40);
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+ final SubMonitor mon = SubMonitor.convert(monitor, 100);
+
if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
-
- long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
-
- mon.subTask("Indexing content");
- final Semaphore s = new Semaphore(0);
- mon.setWorkRemaining(results.size());
-
- for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
- final int startIndex = i;
- executor.submit(() -> {
- try {
- Document document = new Document();
- Field[] fs = makeFieldsForRelation(r, bound.length, document);
-
- for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
- if (setFields(fs, results.get(index)) == null)
- continue;
- try {
- writer.get().addDocument(document);
- } catch (CorruptIndexException e) {
- getLogger().error("Index is corrupted! {}", this, e);
- throw new IllegalStateException(e);
- } catch (IOException e) {
- getLogger().error("Index is in problematic state! {}", this, e);
- throw new IllegalStateException(e);
- } finally {
- synchronized (mon) {
- mon.worked(1);
+ System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
+ mon.beginTask("Initializing Index", 100);
+
+ if (overwrite) {
+ if (Files.exists(indexPath)) {
+ mon.subTask("Erasing previous index");
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath());
+ FileUtils.delete(indexPath);
+ }
+ }
+
+ final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
+ final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+
+ try {
+ mon.subTask("Start index write");
+ Files.createDirectories(indexPath);
+
+ directory.set(FSDirectory.open(indexPath.toFile()));
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
+ writer.set(new IndexWriter(directory.get(), conf));
+
+ mon.worked(5);
+
+ long realizeStart = 0;
+ if (IndexPolicy.PERF_INDEX_INIT)
+ realizeStart = System.nanoTime();
+
+ mon.subTask("Calculating indexed content");
+ mon.worked(5);
+
+ mon.worked(40);
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+
+ if (IndexPolicy.TRACE_INDEX_INIT)
+ System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
+
+ long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
+
+ mon.subTask("Indexing content");
+ final Semaphore s = new Semaphore(0);
+ mon.setWorkRemaining(results.size());
+ for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
+ final int startIndex = i;
+ ThreadUtils.getBlockingWorkExecutor().submit(() -> {
+ try {
+ Document document = new Document();
+ Field[] fs = makeFieldsForRelation(r, bound.length, document);
+
+ for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
+ if (setFields(fs, results.get(index)) == null)
+ continue;
+ try {
+ writer.get().addDocument(document);
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new IllegalStateException(e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new IllegalStateException(e);
+ } finally {
+ synchronized (mon) {
+ mon.worked(1);
+ }
}
}
+ } catch (DatabaseException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ s.release();
}
-
- s.release();
- } catch (DatabaseException e) {
- throw new IllegalStateException(e);
- }
- });
- }
-
- try {
- s.acquire(INDEXING_THREAD_COUNT);
- } catch (InterruptedException e) {
- getLogger().error("Could not initialize index {}", this, e);
- }
-
- // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
- // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
- // advise against calling optimize at all. So let's not do it anymore.
- //writer.get().optimize();
- //writer.get().commit();
-
- mon.subTask("Flushing");
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
-
- } catch (DatabaseException e) {
- getLogger().error("Could not initialize index due to db {}", this, e);
- } finally {
- try {
- closeWriter(writer.getAndSet(null));
+ });
+ }
+
+ try {
+ s.acquire(INDEXING_THREAD_COUNT);
+ } catch (InterruptedException e) {
+ getLogger().error("Could not initialize index {}", this, e);
+ }
+
+ // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
+ // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
+ // advise against calling optimize at all. So let's not do it anymore.
+ //writer.get().optimize();
+ //writer.get().commit();
+
+ mon.subTask("Flushing");
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
+
+ result.complete(null);
+ // } catch (DatabaseException e) {
+ // getLogger().error("Could not initialize index due to db {}", this, e);
} finally {
- FileUtils.uncheckedClose(directory.getAndSet(null));
+ try {
+ closeWriter(writer.getAndSet(null));
+ } finally {
+ FileUtils.uncheckedClose(directory.getAndSet(null));
+ }
}
+ } catch (Throwable t) {
+ getLogger().error("Could not initialize index", t);
+ result.completeExceptionally(t);
}
}
return path;
}
- private static void createDirectory(Path path) throws IOException {
- if (Files.exists(path) && !Files.isDirectory(path))
- throw new IOException("Could not create index directory " + path + ", a file by that name already exists");
- Files.createDirectories(path);
- }
-
Path getIndexPath() {
return indexPath;
}