*******************************************************************************/
package org.simantics.db.indexing;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.document.Document;
import org.simantics.db.Resource;
import org.simantics.db.Session;
import org.simantics.db.common.request.SafeName;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.common.utils.NameUtils;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.indexing.internal.IndexingJob;
import org.simantics.db.service.SerialisationSupport;
import org.simantics.utils.FileUtils;
import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.threads.ThreadUtils;
+import org.slf4j.Logger;
import gnu.trove.map.hash.THashMap;
}
public void setProblem(Throwable t) {
+ if (t != null)
+ getLogger().error("Setting problem for {} and previous state {}", this, this.state, t);
this.state = State.PROBLEM;
this.exception = t;
}
protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) {
- if(this.state == state) return;
+ if (this.state == state) {
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Trying to change state {} to the same as previous state {} in depth {} with {}", state, this.state, depth, this);
+ return;
+ }
if (IndexPolicy.TRACE_INDEX_MANAGEMENT)
System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this);
// Try to exit problem state
if (State.PROBLEM == this.state && depth > 0) {
+ getLogger().info("Try to exit problem state for {} and state {}", this, state);
Throwable t = bestEffortClear(monitor, session);
if(t != null) {
+ getLogger().error("Best effort clear has failed for state {} and this {}", state, this, t);
exception = t;
return;
}
// Managed to get into initial state
this.state = State.NONE;
+ getLogger().info("Managed to get into initial state {}", this.state);
return;
}
// Cannot move into read from no index
- if (State.NONE == this.state && State.READ == state) return;
+ if (State.NONE == this.state && State.READ == state) {
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Cannot move into read from no index in {} with state {}", this, state);
+ return;
+ }
// Cannot move into write from no index
- if (State.NONE == this.state && State.WRITE == state) return;
+ if (State.NONE == this.state && State.WRITE == state) {
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Cannot move into write from no index in {} with state {}", this, state);
+ return;
+ }
boolean success = false;
}
} catch (Throwable t) {
-
setProblem(t);
-
} finally {
if(!success) {
Resource input;
- File indexPath;
+ Path indexPath;
Directory directory;
}
Directory getDirectory(Session session) throws IOException {
- return FSDirectory.open(indexPath);
+ return FSDirectory.open(indexPath.toFile());
}
abstract String getDescriptor();
}
}
- private static String getPattern(GenericRelation relation, int boundCount) {
+ public static String getPattern(GenericRelation relation, int boundCount) {
String result = "";
for (int i = 0; i < boundCount; i++)
result += "b";
result += "f";
return result;
}
-
- private static final int INDEXING_THREAD_COUNT = 2;
-
- private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "Lucene Index Creator");
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- });
void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)
throws IOException, DatabaseException
"Reindexing " + NameUtils.getSafeLabel(graph, input),
mon -> {
try {
- initializeIndexImpl(mon, graph, bound, overwrite);
+ GenericRelation r = graph.adapt(relation, GenericRelation.class);
+ if (r == null)
+ throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation.");
+
+ GenericRelation selection = r.select(getPattern(r, bound.length), bound);
+
+ List<Object[]> results = selection.realize(graph);
+ initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite);
} catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
throw new DatabaseException(e);
}
});
}
- void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,
- DatabaseException {
-
- final SubMonitor mon = SubMonitor.convert(monitor, 100);
-
- if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
- mon.beginTask("Initializing Index", 100);
-
- if (overwrite) {
- mon.subTask("Erasing previous index");
- FileUtils.deleteAll(indexPath);
- }
-
- final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
- final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+ private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene
+ void initializeIndexImpl(CompletableFuture<?> result, IProgressMonitor monitor, GenericRelation r, List<Object[]> results, final Object[] bound, boolean overwrite) throws IOException {
try {
- mon.subTask("Start index write");
- createDirectory(indexPath);
-
- directory.set(FSDirectory.open(indexPath));
- IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
- writer.set(new IndexWriter(directory.get(), conf));
-
- mon.worked(5);
-
- final GenericRelation r = graph.adapt(relation, GenericRelation.class);
- if (r == null)
- throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))
- + "could not be adapted to GenericRelation.");
-
- long realizeStart = 0;
- if (IndexPolicy.PERF_INDEX_INIT)
- realizeStart = System.nanoTime();
-
- mon.subTask("Calculating indexed content");
- GenericRelation selection = r.select(getPattern(r, bound.length), bound);
- mon.worked(5);
- List<Object[]> results = selection.realize(graph);
- mon.worked(40);
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+ final SubMonitor mon = SubMonitor.convert(monitor, 100);
+
if (IndexPolicy.TRACE_INDEX_INIT)
- System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
-
- long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
-
- mon.subTask("Indexing content");
- final Semaphore s = new Semaphore(0);
- mon.setWorkRemaining(results.size());
-
- for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
- final int startIndex = i;
- executor.submit(() -> {
- try {
- Document document = new Document();
- Field[] fs = makeFieldsForRelation(r, bound.length, document);
-
- for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
- if (setFields(fs, results.get(index)) == null)
- continue;
- try {
- writer.get().addDocument(document);
- } catch (CorruptIndexException e) {
- throw new IllegalStateException(e);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- } finally {
- synchronized (mon) {
- mon.worked(1);
+ System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
+ mon.beginTask("Initializing Index", 100);
+
+ if (overwrite) {
+ if (Files.exists(indexPath)) {
+ mon.subTask("Erasing previous index");
+ if (getLogger().isDebugEnabled())
+ getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath());
+ FileUtils.emptyDirectory(indexPath);
+ }
+ }
+
+ final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
+ final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
+
+ try {
+ mon.subTask("Start index write");
+ Files.createDirectories(indexPath);
+
+ directory.set(FSDirectory.open(indexPath.toFile()));
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
+ writer.set(new IndexWriter(directory.get(), conf));
+
+ mon.worked(5);
+
+ long realizeStart = 0;
+ if (IndexPolicy.PERF_INDEX_INIT)
+ realizeStart = System.nanoTime();
+
+ mon.subTask("Calculating indexed content");
+ mon.worked(5);
+
+ mon.worked(40);
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
+
+ if (IndexPolicy.TRACE_INDEX_INIT)
+ System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
+
+ long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
+
+ mon.subTask("Indexing content");
+ final Semaphore s = new Semaphore(0);
+ mon.setWorkRemaining(results.size());
+ for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
+ final int startIndex = i;
+ ThreadUtils.getBlockingWorkExecutor().submit(() -> {
+ try {
+ Document document = new Document();
+ Field[] fs = makeFieldsForRelation(r, bound.length, document);
+
+ for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
+ if (setFields(fs, results.get(index)) == null)
+ continue;
+ try {
+ writer.get().addDocument(document);
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new IllegalStateException(e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new IllegalStateException(e);
+ } finally {
+ synchronized (mon) {
+ mon.worked(1);
+ }
}
}
+ } catch (DatabaseException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ s.release();
}
-
- s.release();
- } catch (DatabaseException e) {
- throw new IllegalStateException(e);
- }
- });
- }
-
- try {
- s.acquire(INDEXING_THREAD_COUNT);
- } catch (InterruptedException e) {
- Logger.defaultLogError(e);
- }
-
- // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
- // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
- // advise against calling optimize at all. So let's not do it anymore.
- //writer.get().optimize();
- //writer.get().commit();
-
- mon.subTask("Flushing");
-
- if (IndexPolicy.PERF_INDEX_INIT)
- System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
-
- } catch (DatabaseException e) {
-
- Logger.defaultLogError(e);
-
- } finally {
- try {
- closeWriter(writer.getAndSet(null));
+ });
+ }
+
+ try {
+ s.acquire(INDEXING_THREAD_COUNT);
+ } catch (InterruptedException e) {
+ getLogger().error("Could not initialize index {}", this, e);
+ }
+
+ // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
+ // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
+ // advise against calling optimize at all. So let's not do it anymore.
+ //writer.get().optimize();
+ //writer.get().commit();
+
+ mon.subTask("Flushing");
+
+ if (IndexPolicy.PERF_INDEX_INIT)
+ System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
+
+ result.complete(null);
+ // } catch (DatabaseException e) {
+ // getLogger().error("Could not initialize index due to db {}", this, e);
} finally {
- FileUtils.uncheckedClose(directory.getAndSet(null));
+ try {
+ closeWriter(writer.getAndSet(null));
+ } finally {
+ FileUtils.uncheckedClose(directory.getAndSet(null));
+ }
}
+ } catch (Throwable t) {
+ getLogger().error("Could not initialize index", t);
+ result.completeExceptionally(t);
}
}
}
result.add(o);
- } catch (CorruptIndexException e) {
- throw new DatabaseException(e);
- } catch (IOException e) {
- throw new DatabaseException(e);
- }
+ } catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
+ throw new DatabaseException(e);
+ } catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
+ throw new DatabaseException(e);
+ }
}
result.add(entry);
} catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
throw new DatabaseException(e);
} catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
throw new DatabaseException(e);
}
-
}
-
return result;
-
}
});
}
ResourceVisitor visitor = new ResourceVisitor();
for (ScoreDoc scoreDoc : docs.scoreDocs) {
-
try {
-
reader.document(scoreDoc.doc, visitor);
result.add(support.getResource(visitor.id));
-
} catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
throw new DatabaseException(e);
} catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
throw new DatabaseException(e);
}
-
}
-
return result;
-
}
});
}
reader.document(scoreDoc.doc, visitor);
} catch (CorruptIndexException e) {
+ getLogger().error("Index is corrupted! {}", this, e);
throw new DatabaseException(e);
} catch (IOException e) {
+ getLogger().error("Index is in problematic state! {}", this, e);
throw new DatabaseException(e);
}
}
- protected static File getIndexDirectory(Session session, Resource relation, Resource input) {
- File path = DatabaseIndexing.getIndexLocation(session, relation, input);
+ protected static Path getIndexDirectory(Session session, Resource relation, Resource input) {
+ Path path = DatabaseIndexing.getIndexLocation(session, relation, input);
// System.out.println("getIndexDirectory = " + path);
return path;
}
- private static void createDirectory(File path) throws IOException {
- Path p = path.toPath();
- if (Files.exists(p) && !Files.isDirectory(p))
- throw new IOException("Could not create index directory " + path + ", a file by that name already exists");
- Files.createDirectories(p);
- }
-
- File getIndexPath() {
+ Path getIndexPath() {
return indexPath;
}
boolean isIndexAvailable() {
- return (indexPath.exists() && indexPath.isDirectory());
- }
-
- Throwable bestEffortClear(IProgressMonitor monitor, Session session) {
- return null;
+ return Files.isDirectory(indexPath);
}
+ abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session);
+
/*
* Start from scratch. Clear all caches and rebuild the index.
*/
Throwable clearDirectory(IProgressMonitor monitor, Session session) {
- File file = getIndexPath();
+ Path file = getIndexPath();
try {
-
- for(int i=0;i<15;i++) {
- FileUtils.deleteDir(file);
- if(!file.exists()) {
- return null;
- }
- try {
- Thread.sleep(i*100);
- } catch (InterruptedException e) {
- }
- }
-
+ FileUtils.delete(file);
} catch (Throwable t) {
-
+ getLogger().error("Could not delete directory {}", file.toAbsolutePath(), t);
return t;
-
}
-
- return new IllegalStateException("Failed to delete directory " + file.getAbsolutePath());
-
+ if (Files.exists(file))
+ return new IllegalStateException("Failed to delete directory " + file.toAbsolutePath());
+ return null;
}
private Field[] setFields(Field[] fs, Object[] result) {
System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");
fs[i].setLongValue((Long) value);
} else {
- Logger.defaultLogError("Can only index Long and String fields, encountered " + value);
+ getLogger().error("Can only index Long and String fields, encountered " + value);
return null;
}
}
return fs;
}
+ protected abstract Logger getLogger();
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " [" + String.valueOf(schema) + ", " + String.valueOf(relation) + ", " + String.valueOf(input) + ", " + String.valueOf(indexPath) + ", " + String.valueOf(directory) + ", " + String.valueOf(state) + "]";
+ }
}