X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.indexing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Findexing%2FIndexedRelationsSearcherBase.java;h=82f4abdd905d570600a9a10871e4d7f7e43cb414;hp=71db7f6168ac525d69cf1356329f1af081068792;hb=4aba159170fc72d39c2f930ea224aa71f4cdc2e7;hpb=3826e289058a51d09310b7ba1251e959dc0ed3d0 diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java index 71db7f616..82f4abdd9 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java @@ -21,10 +21,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.document.Document; @@ -66,11 +64,13 @@ import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.DatabaseException; import org.simantics.db.indexing.internal.IndexingJob; import org.simantics.db.layer0.adapter.GenericRelation; +import org.simantics.db.layer0.genericrelation.IndexException; import org.simantics.db.request.Read; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.utils.FileUtils; import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.ThreadUtils; import org.slf4j.Logger; import gnu.trove.map.hash.THashMap; @@ -116,14 +116,18 @@ abstract public class IndexedRelationsSearcherBase { this.state = State.READY; } + public State state() { + return state; + } + protected boolean checkState(State state) { return this.state == state; } - protected void assertState(State state) throws AssertionError { - - if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name()); - + protected void assertState(State state) throws IndexException { + State s = this.state; + if (s != state) + throw new IndexException("Illegal index searcher state, expected " + state.name() + " but state was " + s.name()); } public void changeState(IProgressMonitor monitor, Session session, State state) { @@ -578,7 +582,7 @@ abstract public class IndexedRelationsSearcherBase { } } - private static String getPattern(GenericRelation relation, int boundCount) { + public static String getPattern(GenericRelation relation, int boundCount) { String result = ""; for (int i = 0; i < boundCount; i++) result += "b"; @@ -586,20 +590,6 @@ abstract public class IndexedRelationsSearcherBase { result += "f"; return result; } - - private static final int INDEXING_THREAD_COUNT = 2; - - private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Lucene Index Creator"); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }); void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite) throws IOException, DatabaseException @@ -609,7 +599,14 @@ abstract public class IndexedRelationsSearcherBase { "Reindexing " + NameUtils.getSafeLabel(graph, input), mon -> { try { - initializeIndexImpl(mon, graph, bound, overwrite); + GenericRelation r = graph.adapt(relation, GenericRelation.class); + if (r == null) + throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation."); + + GenericRelation selection = r.select(getPattern(r, bound.length), bound); + + List results = selection.realize(graph); + initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite); } catch (IOException e) { getLogger().error("Index is in problematic state! {}", this, e); throw new DatabaseException(e); @@ -617,120 +614,120 @@ abstract public class IndexedRelationsSearcherBase { }); } - void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException, - DatabaseException { - - final SubMonitor mon = SubMonitor.convert(monitor, 100); - - if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); - mon.beginTask("Initializing Index", 100); - - if (overwrite) { - if (Files.exists(indexPath)) { - mon.subTask("Erasing previous index"); - if (getLogger().isDebugEnabled()) - getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath()); - FileUtils.delete(indexPath); - } - } - - final AtomicReference directory = new AtomicReference(); - final AtomicReference writer = new AtomicReference(); + private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene + void initializeIndexImpl(CompletableFuture result, IProgressMonitor monitor, GenericRelation r, List results, final Object[] bound, boolean overwrite) throws IOException { try { - mon.subTask("Start index write"); - createDirectory(indexPath); - - directory.set(FSDirectory.open(indexPath.toFile())); - IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); - writer.set(new IndexWriter(directory.get(), conf)); - - mon.worked(5); - - final GenericRelation r = graph.adapt(relation, GenericRelation.class); - if (r == null) - throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) - + "could not be adapted to GenericRelation."); - - long realizeStart = 0; - if (IndexPolicy.PERF_INDEX_INIT) - realizeStart = System.nanoTime(); - - mon.subTask("Calculating indexed content"); - GenericRelation selection = r.select(getPattern(r, bound.length), bound); - mon.worked(5); - List results = selection.realize(graph); - mon.worked(40); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); + final SubMonitor mon = SubMonitor.convert(monitor, 100); + if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); - - long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; - - mon.subTask("Indexing content"); - final Semaphore s = new Semaphore(0); - mon.setWorkRemaining(results.size()); - - for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { - final int startIndex = i; - executor.submit(() -> { - try { - Document document = new Document(); - Field[] fs = makeFieldsForRelation(r, bound.length, document); - - for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { - if (setFields(fs, results.get(index)) == null) - continue; - try { - writer.get().addDocument(document); - } catch (CorruptIndexException e) { - getLogger().error("Index is corrupted! {}", this, e); - throw new IllegalStateException(e); - } catch (IOException e) { - getLogger().error("Index is in problematic state! {}", this, e); - throw new IllegalStateException(e); - } finally { - synchronized (mon) { - mon.worked(1); + System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); + mon.beginTask("Initializing Index", 100); + + if (overwrite) { + if (Files.exists(indexPath)) { + mon.subTask("Erasing previous index"); + if (getLogger().isDebugEnabled()) + getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath()); + FileUtils.emptyDirectory(indexPath); + } + } + + final AtomicReference directory = new AtomicReference(); + final AtomicReference writer = new AtomicReference(); + + try { + mon.subTask("Start index write"); + Files.createDirectories(indexPath); + + directory.set(FSDirectory.open(indexPath.toFile())); + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); + writer.set(new IndexWriter(directory.get(), conf)); + + mon.worked(5); + + long realizeStart = 0; + if (IndexPolicy.PERF_INDEX_INIT) + realizeStart = System.nanoTime(); + + mon.subTask("Calculating indexed content"); + mon.worked(5); + + mon.worked(40); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); + + if (IndexPolicy.TRACE_INDEX_INIT) + System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); + + long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; + + mon.subTask("Indexing content"); + final Semaphore s = new Semaphore(0); + mon.setWorkRemaining(results.size()); + for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { + final int startIndex = i; + ThreadUtils.getBlockingWorkExecutor().submit(() -> { + try { + Document document = new Document(); + Field[] fs = makeFieldsForRelation(r, bound.length, document); + + for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { + if (setFields(fs, results.get(index)) == null) + continue; + try { + writer.get().addDocument(document); + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new IllegalStateException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new IllegalStateException(e); + } finally { + synchronized (mon) { + mon.worked(1); + } } } + } catch (DatabaseException e) { + throw new IllegalStateException(e); + } finally { + s.release(); } - } catch (DatabaseException e) { - throw new IllegalStateException(e); - } finally { - s.release(); - } - }); - } - - try { - s.acquire(INDEXING_THREAD_COUNT); - } catch (InterruptedException e) { - getLogger().error("Could not initialize index {}", this, e); - } - - // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 - // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 - // advise against calling optimize at all. So let's not do it anymore. - //writer.get().optimize(); - //writer.get().commit(); - - mon.subTask("Flushing"); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); - - } catch (DatabaseException e) { - getLogger().error("Could not initialize index due to db {}", this, e); - } finally { - try { - closeWriter(writer.getAndSet(null)); + }); + } + + try { + s.acquire(INDEXING_THREAD_COUNT); + } catch (InterruptedException e) { + getLogger().error("Could not initialize index {}", this, e); + } + + // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 + // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 + // advise against calling optimize at all. So let's not do it anymore. + //writer.get().optimize(); + //writer.get().commit(); + + mon.subTask("Flushing"); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); + + result.complete(null); + // } catch (DatabaseException e) { + // getLogger().error("Could not initialize index due to db {}", this, e); } finally { - FileUtils.uncheckedClose(directory.getAndSet(null)); + try { + closeWriter(writer.getAndSet(null)); + } finally { + FileUtils.uncheckedClose(directory.getAndSet(null)); + } } + } catch (Throwable t) { + getLogger().error("Could not initialize index", t); + result.completeExceptionally(t); } } @@ -1000,20 +997,14 @@ abstract public class IndexedRelationsSearcherBase { return path; } - private static void createDirectory(Path path) throws IOException { - if (Files.exists(path) && !Files.isDirectory(path)) - throw new IOException("Could not create index directory " + path + ", a file by that name already exists"); - Files.createDirectories(path); - } - Path getIndexPath() { return indexPath; } boolean isIndexAvailable() { - return (Files.exists(indexPath) && Files.isDirectory(indexPath)); + return Files.isDirectory(indexPath); } - + abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session); /*