X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.indexing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Findexing%2FIndexedRelationsSearcherBase.java;h=1d5001c64ec15f728cfee967f19d75dcc2123dbb;hp=c52588af0b37b35f4dfe9e92bf2c0b9bedec5bc9;hb=ad8fc537d4cde0d8891cf1cd39862055ca7f03cb;hpb=6d390db460fe34d6b4f4ca29737f4e997c2d2d64 diff --git a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java index c52588af0..1d5001c64 100644 --- a/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java +++ b/bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java @@ -1,1054 +1,1048 @@ -/******************************************************************************* - * Copyright (c) 2007, 2015 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - * Semantum Oy - Fix for simantics issue #6053 - *******************************************************************************/ -package org.simantics.db.indexing; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.lucene.document.Document; -import org.apache.lucene.document.DocumentStoredFieldVisitor; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.FieldType; -import org.apache.lucene.document.LongField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.IndexWriterConfig.OpenMode; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.StoredFieldVisitor; -import org.apache.lucene.index.Term; -import org.apache.lucene.queryparser.classic.ParseException; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.util.Version; -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.SubMonitor; -import org.simantics.databoard.util.ObjectUtils; -import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.common.request.SafeName; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.common.utils.NameUtils; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.indexing.internal.IndexingJob; -import org.simantics.db.layer0.adapter.GenericRelation; -import org.simantics.db.request.Read; -import org.simantics.db.service.CollectionSupport; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.utils.FileUtils; -import org.simantics.utils.datastructures.Pair; - -import gnu.trove.map.hash.THashMap; - -/** - * @author Tuukka Lehtonen - * @author Antti Villberg - */ -abstract public class IndexedRelationsSearcherBase { - - protected enum State { - // No index is available - NONE, - // An index is available, but there is a problem with it - PROBLEM, - // An index is available but no reader or writer is ready - READY, - // A reader is ready - READ, - // A writer (and a reader) is ready - WRITE - } - - private State state = State.READY; - private Throwable exception; - - public Throwable getException() { - return exception; - } - - public void setProblem(Throwable t) { - this.state = State.PROBLEM; - this.exception = t; - } - - public void setNone() { - this.state = State.NONE; - } - - public void setReady() { - this.state = State.READY; - } - - protected boolean checkState(State state) { - return this.state == state; - } - - protected void assertState(State state) throws AssertionError { - - if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name()); - - } - - public void changeState(IProgressMonitor monitor, Session session, State state) { - changeState(monitor, session, state, 0); - } - - protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) { - - if(this.state == state) return; - - if (IndexPolicy.TRACE_INDEX_MANAGEMENT) - System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this); - - // Check transitions - - // Try to exit problem state - if (State.PROBLEM == this.state && depth > 0) { - Throwable t = bestEffortClear(monitor, session); - if(t != null) { - exception = t; - return; - } - // Managed to get into initial state - this.state = State.NONE; - return; - } - - // Cannot move into read from no index - if (State.NONE == this.state && State.READ == state) return; - // Cannot move into write from no index - if (State.NONE == this.state && State.WRITE == state) return; - - boolean success = false; - - try { - - if (searcher != null) { - searcher = null; - } - if (reader != null) { - reader.close(); - reader = null; - } - closeWriter(writer); - directory = null; - - success = true; - - // Enter new state - if (State.READ == state || State.WRITE == state) { - - success = false; - - boolean forWriting = State.WRITE == state; - - if (directory != null) - throw new IllegalStateException(getDescriptor() + "Index already loaded"); - - SubMonitor mon = SubMonitor.convert(monitor, 100); - - mon.beginTask("Loading index", 100); - - if (IndexPolicy.TRACE_INDEX_LOAD) - System.out.println(getDescriptor() + "Loading Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading")); - - long start = System.nanoTime(); - - directory = getDirectory(session); - - if (forWriting) { - // Never overwrite an index that is about to be loaded. - // TODO: could use OpenMode.CREATE_OR_APPEND but must test first - IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.APPEND); - try { - // FIXME: platform #4676 - writer = new IndexWriter(directory, config); - } catch (IndexNotFoundException e) { - // There was no pre-existing index on disk. Create it now. - writer = new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)); - writer.commit(); - } - reader = DirectoryReader.open(directory); - searcher = new IndexSearcher(reader); - } else { - reader = DirectoryReader.open(directory); - searcher = new IndexSearcher(reader); - } - - long end = System.nanoTime(); - - mon.worked(100); - - if (IndexPolicy.PERF_INDEX_LOAD) { - double time = (end - start) * 1e-6; - System.out.println(getDescriptor() + "Loaded Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading") + " in " + time + " ms"); - } - - success = true; - - } - - } catch (Throwable t) { - - setProblem(t); - - } finally { - - if(!success) { - this.state = State.PROBLEM; - changeState(monitor, session, State.NONE, depth+1); - return; - } - - } - - this.state = state; - - } - - public static final FieldType STRING_TYPE = new FieldType(); - - static { - STRING_TYPE.setIndexed(true); - STRING_TYPE.setStored(true); - STRING_TYPE.setTokenized(true); - STRING_TYPE.freeze(); - } - - protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException { - switch (fieldClass) { - case "Long": return new LongField(fieldName, 0L, Field.Store.YES); - case "String": return new Field (fieldName, "", STRING_TYPE); - case "Text": return new TextField(fieldName, "", Field.Store.YES); - default: - throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass); - } - } - - protected static Field[] makeFieldsForRelation(GenericRelation r, int boundLength, Document document) throws DatabaseException { - Pair[] fields = r.getFields(); - Field[] fs = new Field[Math.max(0, fields.length - boundLength)]; - for (int i = boundLength; i < fields.length; i++) { - Field f = makeField(fields[i].first, fields[i].second); - fs[i - boundLength] = f; - if (document != null) - document.add(f); - } - return fs; - } - - void insertIndex(IProgressMonitor monitor, GenericRelation r, int boundLength, Collection documentsData) - throws CorruptIndexException, IOException, DatabaseException { - assertAccessOpen(true); - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Inserting " + documentsData.size() + " documents into index at " + indexPath); - - long start = 0, end = 0; - if (IndexPolicy.PERF_INDEX_UPDATE) - start = System.nanoTime(); - - try { - Document document = new Document(); - Field[] fs = makeFieldsForRelation(r, boundLength, document); - - for (Object[] documentData : documentsData) { - if (setFields(fs, documentData) == null) - continue; - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Inserting document " + document); - - writer.addDocument(document); - } - - if (IndexPolicy.PERF_INDEX_UPDATE) { - end = System.nanoTime(); - double ms = (end - start) * 1e-6; - System.out.println(getDescriptor() + "Inserted " + documentsData.size() + " documents into index at " + indexPath + " in " + ms + " ms"); - } - - } finally { - } - } - - void removeIndex(IProgressMonitor monitor, GenericRelation r, RequestProcessor processor, String key, Collection keyValues) throws DatabaseException, CorruptIndexException, IOException { - assertAccessOpen(true); - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Removing " + keyValues.size() + " documents from index at " + indexPath); - - long start = 0, end = 0; - if (IndexPolicy.PERF_INDEX_UPDATE) - start = System.nanoTime(); - - try { - for (Object keyValue : keyValues) { - Term removedTerm = null; - if (keyValue instanceof Long) { - removedTerm = IndexUtils.longTerm(key, (Long) keyValue); - } else if (keyValue instanceof String) { - removedTerm = new Term(key, (String) keyValue); - } else { - // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state - continue; - } - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Removing document with key " + removedTerm); - writer.deleteDocuments(removedTerm); - } - - if (IndexPolicy.PERF_INDEX_UPDATE) { - end = System.nanoTime(); - double ms = (end - start) * 1e-6; - System.out.println(getDescriptor() + "Removed " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms"); - } - - } finally { - } - } - - void removeIndex(IProgressMonitor monitor) throws DatabaseException, CorruptIndexException, IOException { - assertAccessOpen(true); - - long start = 0, end = 0; - if (IndexPolicy.PERF_INDEX_UPDATE) - start = System.nanoTime(); - - try { - - writer.deleteAll(); - - if (IndexPolicy.PERF_INDEX_UPDATE) { - end = System.nanoTime(); - double ms = (end - start) * 1e-6; - System.out.println(getDescriptor() + "Removed all documents from index at " + indexPath + " in " + ms + " ms"); - } - - } finally { - } - } - - boolean replaceIndex(IProgressMonitor monitor, String key, Collection keyValues, GenericRelation r, int boundLength, Collection documentsData) throws CorruptIndexException, IOException, DatabaseException { - - boolean didReplace = false; - - assertAccessOpen(true); - if (keyValues.size() != documentsData.size()) - throw new IllegalArgumentException("keyValues size does not match documents data size, " + keyValues.size() + " <> " + documentsData.size()); - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Replacing " + keyValues.size() + " documents from index at " + indexPath); - - long start = 0, end = 0; - if (IndexPolicy.PERF_INDEX_UPDATE) - start = System.nanoTime(); - - try { - Iterator keyIt = keyValues.iterator(); - Iterator documentDataIt = documentsData.iterator(); - - Document document = new Document(); - Field[] fs = makeFieldsForRelation(r, boundLength, document); - - nextDocument: - while (keyIt.hasNext()) { - Object keyValue = keyIt.next(); - Object[] documentData = documentDataIt.next(); - - Term removedTerm = null; - if (keyValue instanceof Long) { - removedTerm = IndexUtils.longTerm(key, (Long) keyValue); - } else if (keyValue instanceof String) { - removedTerm = new Term(key, (String) keyValue); - } else { - // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state - System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document key to remove '" + keyValue + "', only " + String.class + " and " + Resource.class + " are supported."); - continue nextDocument; - } - - if (setFields(fs, documentData) == null) - continue nextDocument; - - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document); - - boolean done = false; - if(requireChangeInfoOnReplace()) { - TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2); - if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) { - Document doc = reader.document(exist.scoreDocs[0].doc); - if(!areSame(doc, document)) { - writer.deleteDocuments(removedTerm); - writer.addDocument(document); - didReplace |= true; - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println("-replaced single existing"); - } else { - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println("-was actually same than single existing"); - } - done = true; - } - } - if(!done) { - writer.deleteDocuments(removedTerm); - writer.addDocument(document); - didReplace |= true; - if (IndexPolicy.TRACE_INDEX_UPDATE) - System.out.println("-had many or none - removed all existing"); - } - - } - - if (IndexPolicy.PERF_INDEX_UPDATE) { - end = System.nanoTime(); - double ms = (end - start) * 1e-6; - System.out.println(getDescriptor() + "Replaced " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms"); - } - - } finally { - } - - return didReplace; - - } - - protected boolean requireChangeInfoOnReplace() { - return true; - } - - private boolean areSame(Document d1, Document d2) { - List fs1 = d1.getFields(); - List fs2 = d2.getFields(); - if(fs1.size() != fs2.size()) return false; - for(int i=0;itrue to open index for writing, - * false for reading - * @return true is required state was reached - * - */ - boolean startAccess(IProgressMonitor monitor, Session session, boolean forWriting) { - if(forWriting) { - changeState(monitor, session, State.WRITE); - return checkState(State.WRITE); - } else { - changeState(monitor, session, State.READ); - return checkState(State.READ); - } - } - - boolean hasAccess(boolean forWriting) { - - if (forWriting) - return checkState(State.WRITE); - else - return checkState(State.WRITE) || checkState(State.READ); - - } - - void assertAccessOpen(boolean forWriting) { - if (forWriting) - if(!checkState(State.WRITE)) - throw new IllegalStateException("index not opened for writing (directory=" + directory + ", reader=" + reader + ")"); - else - if(!(checkState(State.WRITE) || checkState(State.READ))) - throw new IllegalStateException("index not opened for reading (directory=" + directory + ", writer=" + writer + ")"); - } - - void closeWriter(IndexWriter writer) throws CorruptIndexException, IOException { - if (writer == null) - return; - - try { - // May throw OOME, see IndexWriter javadoc for the correct actions. - writer.close(false); - } catch (OutOfMemoryError e) { - writer.close(); - throw e; - } - } - - private static String getPattern(GenericRelation relation, int boundCount) { - String result = ""; - for (int i = 0; i < boundCount; i++) - result += "b"; - for (int i = 0; i < relation.getFields().length - boundCount; i++) - result += "f"; - return result; - } - - private static final int INDEXING_THREAD_COUNT = 2; - - private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Lucene Index Creator"); - if (t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }); - - void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite) - throws IOException, DatabaseException - { - IndexingJob.jobifyIfPossible( - monitor, - "Reindexing " + NameUtils.getSafeLabel(graph, input), - mon -> { - try { - initializeIndexImpl(mon, graph, bound, overwrite); - } catch (IOException e) { - throw new DatabaseException(e); - } - }); - } - - void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException, - DatabaseException { - - final SubMonitor mon = SubMonitor.convert(monitor, 100); - - if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); - mon.beginTask("Initializing Index", 100); - - if (overwrite) { - mon.subTask("Erasing previous index"); - FileUtils.deleteAll(indexPath); - } - - final AtomicReference directory = new AtomicReference(); - final AtomicReference writer = new AtomicReference(); - - try { - mon.subTask("Start index write"); - createDirectory(indexPath); - - directory.set(FSDirectory.open(indexPath)); - IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); - writer.set(new IndexWriter(directory.get(), conf)); - - mon.worked(5); - - final GenericRelation r = graph.adapt(relation, GenericRelation.class); - if (r == null) - throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) - + "could not be adapted to GenericRelation."); - - long realizeStart = 0; - if (IndexPolicy.PERF_INDEX_INIT) - realizeStart = System.nanoTime(); - - mon.subTask("Calculating indexed content"); - GenericRelation selection = r.select(getPattern(r, bound.length), bound); - mon.worked(5); - List results = selection.realize(graph); - mon.worked(40); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); - if (IndexPolicy.TRACE_INDEX_INIT) - System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); - - long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; - - mon.subTask("Indexing content"); - final Semaphore s = new Semaphore(0); - mon.setWorkRemaining(results.size()); - - for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { - final int startIndex = i; - executor.submit(() -> { - try { - Document document = new Document(); - Field[] fs = makeFieldsForRelation(r, bound.length, document); - - for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { - if (setFields(fs, results.get(index)) == null) - continue; - try { - writer.get().addDocument(document); - } catch (CorruptIndexException e) { - throw new IllegalStateException(e); - } catch (IOException e) { - throw new IllegalStateException(e); - } finally { - synchronized (mon) { - mon.worked(1); - } - } - } - - s.release(); - } catch (DatabaseException e) { - throw new IllegalStateException(e); - } - }); - } - - try { - s.acquire(INDEXING_THREAD_COUNT); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - - // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 - // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 - // advise against calling optimize at all. So let's not do it anymore. - //writer.get().optimize(); - //writer.get().commit(); - - mon.subTask("Flushing"); - - if (IndexPolicy.PERF_INDEX_INIT) - System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); - - } catch (DatabaseException e) { - - Logger.defaultLogError(e); - - } finally { - try { - closeWriter(writer.getAndSet(null)); - } finally { - directory.getAndSet(null).close(); - } - } - } - - - public List debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException { - - Query query = new MatchAllDocsQuery(); - - TopDocs td = searcher.search(query, Integer.MAX_VALUE); - - ScoreDoc[ ] scoreDocs = td.scoreDocs; - List result = new ArrayList(scoreDocs.length); - - for(ScoreDoc scoreDoc:scoreDocs) { - - try { - - Document doc = reader.document(scoreDoc.doc); - List fs = doc.getFields(); - Object[] o = new Object[fs.size()]; - int index = 0; - for (IndexableField f : fs) { - o[index++] = f.stringValue(); - } - result.add(o); - - } catch (CorruptIndexException e) { - throw new DatabaseException(e); - } catch (IOException e) { - throw new DatabaseException(e); - } - - } - - return result; - - } - - - List> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, - DatabaseException { - - // An empty search string will crash QueryParser - // Just return no results for empty queries. - //System.out.println("search: '" + search + "'"); - if (search.isEmpty()) - return Collections.emptyList(); - - assertAccessOpen(false); - - Query query = Queries.parse(search, schema); - - long start = System.nanoTime(); - - maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs()); - if (maxResultCount == 0) - return Collections.emptyList(); - - final TopDocs docs = searcher.search(query, null, maxResultCount); - -// for(Object[] o : debugDocs(monitor)) { -// System.err.println("-" + Arrays.toString(o)); -// } - - if (IndexPolicy.PERF_INDEX_QUERY) { - long end = System.nanoTime(); - System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds."); - } - - if (docs.totalHits == 0) { - return Collections.emptyList(); - } - - return processor.syncRequest(new Read>>() { - - @Override - public List> perform(ReadGraph graph) throws DatabaseException { - - GenericRelation r = graph.adapt(relation, GenericRelation.class); - if (r == null) - throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) - + "could not be adapted to GenericRelation."); - - SerialisationSupport support = graph.getService(SerialisationSupport.class); - - List> result = new ArrayList>(docs.scoreDocs.length); - - final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor(); - - for (ScoreDoc scoreDoc : docs.scoreDocs) { - - try { - - reader.document(scoreDoc.doc, visitor); - - Document doc = visitor.getDocument(); - - List fs = doc.getFields(); - Map entry = new THashMap(fs.size()); - for (IndexableField f : fs) { - IndexSchema.Type type = schema.typeMap.get(f.name()); - if (type == IndexSchema.Type.LONG) { - entry.put(f.name(), support.getResource(f.numericValue().longValue())); - } else { - entry.put(f.name(), f.stringValue()); - } - } - - result.add(entry); - - } catch (CorruptIndexException e) { - throw new DatabaseException(e); - } catch (IOException e) { - throw new DatabaseException(e); - } - - } - - return result; - - } - }); - } - - static class ResourceVisitor extends StoredFieldVisitor { - - public long id; - - @Override - public Status needsField(FieldInfo fieldInfo) throws IOException { - if("Resource".equals(fieldInfo.name)) return Status.YES; - return Status.NO; - } - - @Override - public void longField(FieldInfo fieldInfo, long value) throws IOException { - id = value; - } - - }; - - static class DumpVisitor extends StoredFieldVisitor { - - public List values; - - DumpVisitor(List values) { - this.values = values; - } - - @Override - public Status needsField(FieldInfo fieldInfo) throws IOException { - return Status.YES; - } - - @Override - public void longField(FieldInfo fieldInfo, long value) throws IOException { - values.add(value); - } - - @Override - public void stringField(FieldInfo fieldInfo, String value) throws IOException { - values.add(value); - } - - } - - List doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, - DatabaseException { - - // An empty search string will crash QueryParser - // Just return no results for empty queries. - //System.out.println("search: '" + search + "'"); - if (search.isEmpty()) - return Collections.emptyList(); - - assertAccessOpen(false); - - Query query = Queries.parse(search, schema); - - long start = System.nanoTime(); - - maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs()); - if (maxResultCount == 0) - return Collections.emptyList(); - - final TopDocs docs = searcher.search(query, null, maxResultCount); - -// for(Object[] o : debugDocs(monitor)) { -// System.err.println("-" + Arrays.toString(o)); -// } - - if (IndexPolicy.PERF_INDEX_QUERY) { - long end = System.nanoTime(); - System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds."); - } - - if (docs.totalHits == 0) { - return Collections.emptyList(); - } - - return processor.syncRequest(new Read>() { - - @Override - public List perform(ReadGraph graph) throws DatabaseException { - - CollectionSupport cs = graph.getService(CollectionSupport.class); - SerialisationSupport support = graph.getService(SerialisationSupport.class); - - List result = cs.createList(); - - ResourceVisitor visitor = new ResourceVisitor(); - - for (ScoreDoc scoreDoc : docs.scoreDocs) { - - try { - - reader.document(scoreDoc.doc, visitor); - result.add(support.getResource(visitor.id)); - - } catch (CorruptIndexException e) { - throw new DatabaseException(e); - } catch (IOException e) { - throw new DatabaseException(e); - } - - } - - return result; - - } - }); - } - - List doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException, - DatabaseException { - - assertAccessOpen(false); - - Query query = new MatchAllDocsQuery(); - - final TopDocs docs = searcher.search(query, Integer.MAX_VALUE); - - ArrayList result = new ArrayList(); - - DumpVisitor visitor = new DumpVisitor(result); - - for (ScoreDoc scoreDoc : docs.scoreDocs) { - - try { - - reader.document(scoreDoc.doc, visitor); - - } catch (CorruptIndexException e) { - throw new DatabaseException(e); - } catch (IOException e) { - throw new DatabaseException(e); - } - - } - - return result; - - } - - protected static File getIndexDirectory(Session session, Resource relation, Resource input) { - File path = DatabaseIndexing.getIndexLocation(session, relation, input); -// System.out.println("getIndexDirectory = " + path); - return path; - } - - private static void createDirectory(File path) throws IOException { - if (path.exists() && !path.isDirectory()) - throw new IOException("Could not create index directory " + path + ", a file by that name already exists"); - path.mkdirs(); - if (!path.exists()) - throw new IOException("Could not create index directory " + path + " for an unknown reason"); - if (!path.isDirectory()) - throw new IOException("Could not create index directory " + path + ", a file by that name already exists"); - } - - File getIndexPath() { - return indexPath; - } - - boolean isIndexAvailable() { - return (indexPath.exists() && indexPath.isDirectory()); - } - - Throwable bestEffortClear(IProgressMonitor monitor, Session session) { - return null; - } - - /* - * Start from scratch. Clear all caches and rebuild the index. - */ - Throwable clearDirectory(IProgressMonitor monitor, Session session) { - - File file = getIndexPath(); - - try { - - for(int i=0;i<15;i++) { - FileUtils.deleteDir(file); - if(!file.exists()) { - return null; - } - try { - Thread.sleep(i*100); - } catch (InterruptedException e) { - } - } - - } catch (Throwable t) { - - return t; - - } - - return new IllegalStateException("Failed to delete directory " + file.getAbsolutePath()); - - } - - private Field[] setFields(Field[] fs, Object[] result) { - for (int i = 0; i < result.length; i++) { - Object value = result[i]; - if (value instanceof String) { - if (IndexPolicy.DEBUG_INDEX_INIT) - System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String"); - fs[i].setStringValue((String) value); - } else if (value instanceof Long) { - if (IndexPolicy.DEBUG_INDEX_INIT) - System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long"); - fs[i].setLongValue((Long) value); - } else { - Logger.defaultLogError("Can only index Long and String fields, encountered " + value); - return null; - } - } - return fs; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2015 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + * Semantum Oy - Fix for simantics issue #6053 + *******************************************************************************/ +package org.simantics.db.indexing; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.DocumentStoredFieldVisitor; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Version; +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.SubMonitor; +import org.simantics.databoard.util.ObjectUtils; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.common.request.SafeName; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.indexing.internal.IndexingJob; +import org.simantics.db.layer0.adapter.GenericRelation; +import org.simantics.db.request.Read; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.utils.FileUtils; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.threads.ThreadUtils; +import org.slf4j.Logger; + +import gnu.trove.map.hash.THashMap; + +/** + * @author Tuukka Lehtonen + * @author Antti Villberg + */ +abstract public class IndexedRelationsSearcherBase { + + protected enum State { + // No index is available + NONE, + // An index is available, but there is a problem with it + PROBLEM, + // An index is available but no reader or writer is ready + READY, + // A reader is ready + READ, + // A writer (and a reader) is ready + WRITE + } + + private State state = State.READY; + private Throwable exception; + + public Throwable getException() { + return exception; + } + + public void setProblem(Throwable t) { + if (t != null) + getLogger().error("Setting problem for {} and previous state {}", this, this.state, t); + this.state = State.PROBLEM; + this.exception = t; + } + + public void setNone() { + this.state = State.NONE; + } + + public void setReady() { + this.state = State.READY; + } + + protected boolean checkState(State state) { + return this.state == state; + } + + protected void assertState(State state) throws AssertionError { + + if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name()); + + } + + public void changeState(IProgressMonitor monitor, Session session, State state) { + changeState(monitor, session, state, 0); + } + + protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) { + + if (this.state == state) { + if (getLogger().isDebugEnabled()) + getLogger().debug("Trying to change state {} to the same as previous state {} in depth {} with {}", state, this.state, depth, this); + return; + } + + if (IndexPolicy.TRACE_INDEX_MANAGEMENT) + System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this); + + // Check transitions + + // Try to exit problem state + if (State.PROBLEM == this.state && depth > 0) { + getLogger().info("Try to exit problem state for {} and state {}", this, state); + Throwable t = bestEffortClear(monitor, session); + if(t != null) { + getLogger().error("Best effort clear has failed for state {} and this {}", state, this, t); + exception = t; + return; + } + // Managed to get into initial state + this.state = State.NONE; + getLogger().info("Managed to get into initial state {}", this.state); + return; + } + + // Cannot move into read from no index + if (State.NONE == this.state && State.READ == state) { + if (getLogger().isDebugEnabled()) + getLogger().debug("Cannot move into read from no index in {} with state {}", this, state); + return; + } + // Cannot move into write from no index + if (State.NONE == this.state && State.WRITE == state) { + if (getLogger().isDebugEnabled()) + getLogger().debug("Cannot move into write from no index in {} with state {}", this, state); + return; + } + + boolean success = false; + + try { + + if (searcher != null) { + searcher = null; + } + if (reader != null) { + reader.close(); + reader = null; + } + closeWriter(writer); + directory = null; + + success = true; + + // Enter new state + if (State.READ == state || State.WRITE == state) { + + success = false; + + boolean forWriting = State.WRITE == state; + + if (directory != null) + throw new IllegalStateException(getDescriptor() + "Index already loaded"); + + SubMonitor mon = SubMonitor.convert(monitor, 100); + + mon.beginTask("Loading index", 100); + + if (IndexPolicy.TRACE_INDEX_LOAD) + System.out.println(getDescriptor() + "Loading Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading")); + + long start = System.nanoTime(); + + directory = getDirectory(session); + + if (forWriting) { + // Never overwrite an index that is about to be loaded. + // TODO: could use OpenMode.CREATE_OR_APPEND but must test first + IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.APPEND); + try { + // FIXME: platform #4676 + writer = new IndexWriter(directory, config); + } catch (IndexNotFoundException e) { + // There was no pre-existing index on disk. Create it now. + writer = new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE)); + writer.commit(); + } + reader = DirectoryReader.open(directory); + searcher = new IndexSearcher(reader); + } else { + reader = DirectoryReader.open(directory); + searcher = new IndexSearcher(reader); + } + + long end = System.nanoTime(); + + mon.worked(100); + + if (IndexPolicy.PERF_INDEX_LOAD) { + double time = (end - start) * 1e-6; + System.out.println(getDescriptor() + "Loaded Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading") + " in " + time + " ms"); + } + + success = true; + + } + + } catch (Throwable t) { + setProblem(t); + } finally { + + if(!success) { + this.state = State.PROBLEM; + changeState(monitor, session, State.NONE, depth+1); + return; + } + + } + + this.state = state; + + } + + public static final FieldType STRING_TYPE = new FieldType(); + + static { + STRING_TYPE.setIndexed(true); + STRING_TYPE.setStored(true); + STRING_TYPE.setTokenized(true); + STRING_TYPE.freeze(); + } + + protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException { + switch (fieldClass) { + case "Long": return new LongField(fieldName, 0L, Field.Store.YES); + case "String": return new Field (fieldName, "", STRING_TYPE); + case "Text": return new TextField(fieldName, "", Field.Store.YES); + default: + throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass); + } + } + + protected static Field[] makeFieldsForRelation(GenericRelation r, int boundLength, Document document) throws DatabaseException { + Pair[] fields = r.getFields(); + Field[] fs = new Field[Math.max(0, fields.length - boundLength)]; + for (int i = boundLength; i < fields.length; i++) { + Field f = makeField(fields[i].first, fields[i].second); + fs[i - boundLength] = f; + if (document != null) + document.add(f); + } + return fs; + } + + void insertIndex(IProgressMonitor monitor, GenericRelation r, int boundLength, Collection documentsData) + throws CorruptIndexException, IOException, DatabaseException { + assertAccessOpen(true); + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Inserting " + documentsData.size() + " documents into index at " + indexPath); + + long start = 0, end = 0; + if (IndexPolicy.PERF_INDEX_UPDATE) + start = System.nanoTime(); + + try { + Document document = new Document(); + Field[] fs = makeFieldsForRelation(r, boundLength, document); + + for (Object[] documentData : documentsData) { + if (setFields(fs, documentData) == null) + continue; + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Inserting document " + document); + + writer.addDocument(document); + } + + if (IndexPolicy.PERF_INDEX_UPDATE) { + end = System.nanoTime(); + double ms = (end - start) * 1e-6; + System.out.println(getDescriptor() + "Inserted " + documentsData.size() + " documents into index at " + indexPath + " in " + ms + " ms"); + } + + } finally { + } + } + + void removeIndex(IProgressMonitor monitor, GenericRelation r, RequestProcessor processor, String key, Collection keyValues) throws DatabaseException, CorruptIndexException, IOException { + assertAccessOpen(true); + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Removing " + keyValues.size() + " documents from index at " + indexPath); + + long start = 0, end = 0; + if (IndexPolicy.PERF_INDEX_UPDATE) + start = System.nanoTime(); + + try { + for (Object keyValue : keyValues) { + Term removedTerm = null; + if (keyValue instanceof Long) { + removedTerm = IndexUtils.longTerm(key, (Long) keyValue); + } else if (keyValue instanceof String) { + removedTerm = new Term(key, (String) keyValue); + } else { + // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state + continue; + } + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Removing document with key " + removedTerm); + writer.deleteDocuments(removedTerm); + } + + if (IndexPolicy.PERF_INDEX_UPDATE) { + end = System.nanoTime(); + double ms = (end - start) * 1e-6; + System.out.println(getDescriptor() + "Removed " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms"); + } + + } finally { + } + } + + void removeIndex(IProgressMonitor monitor) throws DatabaseException, CorruptIndexException, IOException { + assertAccessOpen(true); + + long start = 0, end = 0; + if (IndexPolicy.PERF_INDEX_UPDATE) + start = System.nanoTime(); + + try { + + writer.deleteAll(); + + if (IndexPolicy.PERF_INDEX_UPDATE) { + end = System.nanoTime(); + double ms = (end - start) * 1e-6; + System.out.println(getDescriptor() + "Removed all documents from index at " + indexPath + " in " + ms + " ms"); + } + + } finally { + } + } + + boolean replaceIndex(IProgressMonitor monitor, String key, Collection keyValues, GenericRelation r, int boundLength, Collection documentsData) throws CorruptIndexException, IOException, DatabaseException { + + boolean didReplace = false; + + assertAccessOpen(true); + if (keyValues.size() != documentsData.size()) + throw new IllegalArgumentException("keyValues size does not match documents data size, " + keyValues.size() + " <> " + documentsData.size()); + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Replacing " + keyValues.size() + " documents from index at " + indexPath); + + long start = 0, end = 0; + if (IndexPolicy.PERF_INDEX_UPDATE) + start = System.nanoTime(); + + try { + Iterator keyIt = keyValues.iterator(); + Iterator documentDataIt = documentsData.iterator(); + + Document document = new Document(); + Field[] fs = makeFieldsForRelation(r, boundLength, document); + + nextDocument: + while (keyIt.hasNext()) { + Object keyValue = keyIt.next(); + Object[] documentData = documentDataIt.next(); + + Term removedTerm = null; + if (keyValue instanceof Long) { + removedTerm = IndexUtils.longTerm(key, (Long) keyValue); + } else if (keyValue instanceof String) { + removedTerm = new Term(key, (String) keyValue); + } else { + // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state + System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document key to remove '" + keyValue + "', only " + String.class + " and " + Resource.class + " are supported."); + continue nextDocument; + } + + if (setFields(fs, documentData) == null) + continue nextDocument; + + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document); + + boolean done = false; + if(requireChangeInfoOnReplace()) { + TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2); + if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) { + Document doc = reader.document(exist.scoreDocs[0].doc); + if(!areSame(doc, document)) { + writer.deleteDocuments(removedTerm); + writer.addDocument(document); + didReplace |= true; + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println("-replaced single existing"); + } else { + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println("-was actually same than single existing"); + } + done = true; + } + } + if(!done) { + writer.deleteDocuments(removedTerm); + writer.addDocument(document); + didReplace |= true; + if (IndexPolicy.TRACE_INDEX_UPDATE) + System.out.println("-had many or none - removed all existing"); + } + + } + + if (IndexPolicy.PERF_INDEX_UPDATE) { + end = System.nanoTime(); + double ms = (end - start) * 1e-6; + System.out.println(getDescriptor() + "Replaced " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms"); + } + + } finally { + } + + return didReplace; + + } + + protected boolean requireChangeInfoOnReplace() { + return true; + } + + private boolean areSame(Document d1, Document d2) { + List fs1 = d1.getFields(); + List fs2 = d2.getFields(); + if(fs1.size() != fs2.size()) return false; + for(int i=0;itrue to open index for writing, + * false for reading + * @return true is required state was reached + * + */ + boolean startAccess(IProgressMonitor monitor, Session session, boolean forWriting) { + if(forWriting) { + changeState(monitor, session, State.WRITE); + return checkState(State.WRITE); + } else { + changeState(monitor, session, State.READ); + return checkState(State.READ); + } + } + + boolean hasAccess(boolean forWriting) { + + if (forWriting) + return checkState(State.WRITE); + else + return checkState(State.WRITE) || checkState(State.READ); + + } + + void assertAccessOpen(boolean forWriting) { + if (forWriting) + if(!checkState(State.WRITE)) + throw new IllegalStateException("index not opened for writing (directory=" + directory + ", reader=" + reader + ")"); + else + if(!(checkState(State.WRITE) || checkState(State.READ))) + throw new IllegalStateException("index not opened for reading (directory=" + directory + ", writer=" + writer + ")"); + } + + void closeWriter(IndexWriter writer) throws CorruptIndexException, IOException { + if (writer == null) + return; + + try { + // May throw OOME, see IndexWriter javadoc for the correct actions. + writer.close(false); + } catch (OutOfMemoryError e) { + writer.close(); + throw e; + } + } + + public static String getPattern(GenericRelation relation, int boundCount) { + String result = ""; + for (int i = 0; i < boundCount; i++) + result += "b"; + for (int i = 0; i < relation.getFields().length - boundCount; i++) + result += "f"; + return result; + } + + void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite) + throws IOException, DatabaseException + { + IndexingJob.jobifyIfPossible( + monitor, + "Reindexing " + NameUtils.getSafeLabel(graph, input), + mon -> { + try { + GenericRelation r = graph.adapt(relation, GenericRelation.class); + if (r == null) + throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation."); + + GenericRelation selection = r.select(getPattern(r, bound.length), bound); + + List results = selection.realize(graph); + initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new DatabaseException(e); + } + }); + } + + private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene + + void initializeIndexImpl(CompletableFuture result, IProgressMonitor monitor, GenericRelation r, List results, final Object[] bound, boolean overwrite) throws IOException { + try { + final SubMonitor mon = SubMonitor.convert(monitor, 100); + + if (IndexPolicy.TRACE_INDEX_INIT) + System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")"); + mon.beginTask("Initializing Index", 100); + + if (overwrite) { + if (Files.exists(indexPath)) { + mon.subTask("Erasing previous index"); + if (getLogger().isDebugEnabled()) + getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath()); + FileUtils.emptyDirectory(indexPath); + } + } + + final AtomicReference directory = new AtomicReference(); + final AtomicReference writer = new AtomicReference(); + + try { + mon.subTask("Start index write"); + Files.createDirectories(indexPath); + + directory.set(FSDirectory.open(indexPath.toFile())); + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE); + writer.set(new IndexWriter(directory.get(), conf)); + + mon.worked(5); + + long realizeStart = 0; + if (IndexPolicy.PERF_INDEX_INIT) + realizeStart = System.nanoTime(); + + mon.subTask("Calculating indexed content"); + mon.worked(5); + + mon.worked(40); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds."); + + if (IndexPolicy.TRACE_INDEX_INIT) + System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results"); + + long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0; + + mon.subTask("Indexing content"); + final Semaphore s = new Semaphore(0); + mon.setWorkRemaining(results.size()); + for (int i = 0; i < INDEXING_THREAD_COUNT; i++) { + final int startIndex = i; + ThreadUtils.getBlockingWorkExecutor().submit(() -> { + try { + Document document = new Document(); + Field[] fs = makeFieldsForRelation(r, bound.length, document); + + for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) { + if (setFields(fs, results.get(index)) == null) + continue; + try { + writer.get().addDocument(document); + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new IllegalStateException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new IllegalStateException(e); + } finally { + synchronized (mon) { + mon.worked(1); + } + } + } + } catch (DatabaseException e) { + throw new IllegalStateException(e); + } finally { + s.release(); + } + }); + } + + try { + s.acquire(INDEXING_THREAD_COUNT); + } catch (InterruptedException e) { + getLogger().error("Could not initialize index {}", this, e); + } + + // http://www.gossamer-threads.com/lists/lucene/java-dev/47895 + // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0 + // advise against calling optimize at all. So let's not do it anymore. + //writer.get().optimize(); + //writer.get().commit(); + + mon.subTask("Flushing"); + + if (IndexPolicy.PERF_INDEX_INIT) + System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds."); + + result.complete(null); + // } catch (DatabaseException e) { + // getLogger().error("Could not initialize index due to db {}", this, e); + } finally { + try { + closeWriter(writer.getAndSet(null)); + } finally { + FileUtils.uncheckedClose(directory.getAndSet(null)); + } + } + } catch (Throwable t) { + getLogger().error("Could not initialize index", t); + result.completeExceptionally(t); + } + } + + + public List debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException { + + Query query = new MatchAllDocsQuery(); + + TopDocs td = searcher.search(query, Integer.MAX_VALUE); + + ScoreDoc[ ] scoreDocs = td.scoreDocs; + List result = new ArrayList(scoreDocs.length); + + for(ScoreDoc scoreDoc:scoreDocs) { + + try { + + Document doc = reader.document(scoreDoc.doc); + List fs = doc.getFields(); + Object[] o = new Object[fs.size()]; + int index = 0; + for (IndexableField f : fs) { + o[index++] = f.stringValue(); + } + result.add(o); + + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new DatabaseException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new DatabaseException(e); + } + + } + + return result; + + } + + + List> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, + DatabaseException { + + // An empty search string will crash QueryParser + // Just return no results for empty queries. + //System.out.println("search: '" + search + "'"); + if (search.isEmpty()) + return Collections.emptyList(); + + assertAccessOpen(false); + + Query query = Queries.parse(search, schema); + + long start = System.nanoTime(); + + maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs()); + if (maxResultCount == 0) + return Collections.emptyList(); + + final TopDocs docs = searcher.search(query, null, maxResultCount); + +// for(Object[] o : debugDocs(monitor)) { +// System.err.println("-" + Arrays.toString(o)); +// } + + if (IndexPolicy.PERF_INDEX_QUERY) { + long end = System.nanoTime(); + System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds."); + } + + if (docs.totalHits == 0) { + return Collections.emptyList(); + } + + return processor.syncRequest(new Read>>() { + + @Override + public List> perform(ReadGraph graph) throws DatabaseException { + + GenericRelation r = graph.adapt(relation, GenericRelation.class); + if (r == null) + throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation)) + + "could not be adapted to GenericRelation."); + + SerialisationSupport support = graph.getService(SerialisationSupport.class); + + List> result = new ArrayList>(docs.scoreDocs.length); + + final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor(); + + for (ScoreDoc scoreDoc : docs.scoreDocs) { + + try { + + reader.document(scoreDoc.doc, visitor); + + Document doc = visitor.getDocument(); + + List fs = doc.getFields(); + Map entry = new THashMap(fs.size()); + for (IndexableField f : fs) { + IndexSchema.Type type = schema.typeMap.get(f.name()); + if (type == IndexSchema.Type.LONG) { + entry.put(f.name(), support.getResource(f.numericValue().longValue())); + } else { + entry.put(f.name(), f.stringValue()); + } + } + + result.add(entry); + + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new DatabaseException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new DatabaseException(e); + } + } + return result; + } + }); + } + + static class ResourceVisitor extends StoredFieldVisitor { + + public long id; + + @Override + public Status needsField(FieldInfo fieldInfo) throws IOException { + if("Resource".equals(fieldInfo.name)) return Status.YES; + return Status.NO; + } + + @Override + public void longField(FieldInfo fieldInfo, long value) throws IOException { + id = value; + } + + }; + + static class DumpVisitor extends StoredFieldVisitor { + + public List values; + + DumpVisitor(List values) { + this.values = values; + } + + @Override + public Status needsField(FieldInfo fieldInfo) throws IOException { + return Status.YES; + } + + @Override + public void longField(FieldInfo fieldInfo, long value) throws IOException { + values.add(value); + } + + @Override + public void stringField(FieldInfo fieldInfo, String value) throws IOException { + values.add(value); + } + + } + + List doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException, + DatabaseException { + + // An empty search string will crash QueryParser + // Just return no results for empty queries. + //System.out.println("search: '" + search + "'"); + if (search.isEmpty()) + return Collections.emptyList(); + + assertAccessOpen(false); + + Query query = Queries.parse(search, schema); + + long start = System.nanoTime(); + + maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs()); + if (maxResultCount == 0) + return Collections.emptyList(); + + final TopDocs docs = searcher.search(query, null, maxResultCount); + +// for(Object[] o : debugDocs(monitor)) { +// System.err.println("-" + Arrays.toString(o)); +// } + + if (IndexPolicy.PERF_INDEX_QUERY) { + long end = System.nanoTime(); + System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds."); + } + + if (docs.totalHits == 0) { + return Collections.emptyList(); + } + + return processor.syncRequest(new Read>() { + + @Override + public List perform(ReadGraph graph) throws DatabaseException { + + CollectionSupport cs = graph.getService(CollectionSupport.class); + SerialisationSupport support = graph.getService(SerialisationSupport.class); + + List result = cs.createList(); + + ResourceVisitor visitor = new ResourceVisitor(); + + for (ScoreDoc scoreDoc : docs.scoreDocs) { + try { + reader.document(scoreDoc.doc, visitor); + result.add(support.getResource(visitor.id)); + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new DatabaseException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new DatabaseException(e); + } + } + return result; + } + }); + } + + List doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException, + DatabaseException { + + assertAccessOpen(false); + + Query query = new MatchAllDocsQuery(); + + final TopDocs docs = searcher.search(query, Integer.MAX_VALUE); + + ArrayList result = new ArrayList(); + + DumpVisitor visitor = new DumpVisitor(result); + + for (ScoreDoc scoreDoc : docs.scoreDocs) { + + try { + + reader.document(scoreDoc.doc, visitor); + + } catch (CorruptIndexException e) { + getLogger().error("Index is corrupted! {}", this, e); + throw new DatabaseException(e); + } catch (IOException e) { + getLogger().error("Index is in problematic state! {}", this, e); + throw new DatabaseException(e); + } + + } + + return result; + + } + + protected static Path getIndexDirectory(Session session, Resource relation, Resource input) { + Path path = DatabaseIndexing.getIndexLocation(session, relation, input); +// System.out.println("getIndexDirectory = " + path); + return path; + } + + Path getIndexPath() { + return indexPath; + } + + boolean isIndexAvailable() { + return Files.isDirectory(indexPath); + } + + abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session); + + /* + * Start from scratch. Clear all caches and rebuild the index. + */ + Throwable clearDirectory(IProgressMonitor monitor, Session session) { + + Path file = getIndexPath(); + + try { + FileUtils.delete(file); + } catch (Throwable t) { + getLogger().error("Could not delete directory {}", file.toAbsolutePath(), t); + return t; + } + if (Files.exists(file)) + return new IllegalStateException("Failed to delete directory " + file.toAbsolutePath()); + return null; + } + + private Field[] setFields(Field[] fs, Object[] result) { + for (int i = 0; i < result.length; i++) { + Object value = result[i]; + if (value instanceof String) { + if (IndexPolicy.DEBUG_INDEX_INIT) + System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String"); + fs[i].setStringValue((String) value); + } else if (value instanceof Long) { + if (IndexPolicy.DEBUG_INDEX_INIT) + System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long"); + fs[i].setLongValue((Long) value); + } else { + getLogger().error("Can only index Long and String fields, encountered " + value); + return null; + } + } + return fs; + } + + protected abstract Logger getLogger(); + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + String.valueOf(schema) + ", " + String.valueOf(relation) + ", " + String.valueOf(input) + ", " + String.valueOf(indexPath) + ", " + String.valueOf(directory) + ", " + String.valueOf(state) + "]"; + } +}