]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java
Worked around Windows FS problems in IndexedRelationsSearcherBase
[simantics/platform.git] / bundles / org.simantics.db.indexing / src / org / simantics / db / indexing / IndexedRelationsSearcherBase.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2015 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *     Semantum Oy - Fix for simantics issue #6053
12  *******************************************************************************/
13 package org.simantics.db.indexing;
14
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.apache.lucene.document.Document;
29 import org.apache.lucene.document.DocumentStoredFieldVisitor;
30 import org.apache.lucene.document.Field;
31 import org.apache.lucene.document.FieldType;
32 import org.apache.lucene.document.LongField;
33 import org.apache.lucene.document.TextField;
34 import org.apache.lucene.index.CorruptIndexException;
35 import org.apache.lucene.index.DirectoryReader;
36 import org.apache.lucene.index.FieldInfo;
37 import org.apache.lucene.index.IndexNotFoundException;
38 import org.apache.lucene.index.IndexReader;
39 import org.apache.lucene.index.IndexWriter;
40 import org.apache.lucene.index.IndexWriterConfig;
41 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
42 import org.apache.lucene.index.IndexableField;
43 import org.apache.lucene.index.StoredFieldVisitor;
44 import org.apache.lucene.index.Term;
45 import org.apache.lucene.queryparser.classic.ParseException;
46 import org.apache.lucene.search.IndexSearcher;
47 import org.apache.lucene.search.MatchAllDocsQuery;
48 import org.apache.lucene.search.Query;
49 import org.apache.lucene.search.ScoreDoc;
50 import org.apache.lucene.search.TermQuery;
51 import org.apache.lucene.search.TopDocs;
52 import org.apache.lucene.store.Directory;
53 import org.apache.lucene.store.FSDirectory;
54 import org.apache.lucene.util.Version;
55 import org.eclipse.core.runtime.IProgressMonitor;
56 import org.eclipse.core.runtime.SubMonitor;
57 import org.simantics.databoard.util.ObjectUtils;
58 import org.simantics.db.ReadGraph;
59 import org.simantics.db.RequestProcessor;
60 import org.simantics.db.Resource;
61 import org.simantics.db.Session;
62 import org.simantics.db.common.request.SafeName;
63 import org.simantics.db.common.utils.NameUtils;
64 import org.simantics.db.exception.DatabaseException;
65 import org.simantics.db.indexing.internal.IndexingJob;
66 import org.simantics.db.layer0.adapter.GenericRelation;
67 import org.simantics.db.request.Read;
68 import org.simantics.db.service.CollectionSupport;
69 import org.simantics.db.service.SerialisationSupport;
70 import org.simantics.utils.FileUtils;
71 import org.simantics.utils.datastructures.Pair;
72 import org.simantics.utils.threads.ThreadUtils;
73 import org.slf4j.Logger;
74
75 import gnu.trove.map.hash.THashMap;
76
77 /**
78  * @author Tuukka Lehtonen
79  * @author Antti Villberg
80  */
81 abstract public class IndexedRelationsSearcherBase {
82
83     protected enum State {
84         // No index is available
85         NONE, 
86         // An index is available, but there is a problem with it
87         PROBLEM, 
88         // An index is available but no reader or writer is ready
89         READY,
90         // A reader is ready
91         READ, 
92         // A writer (and a reader) is ready
93         WRITE
94     }
95     
96     private State state = State.READY;
97     private Throwable exception;
98     
99     public Throwable getException() {
100         return exception;
101     }
102
103     public void setProblem(Throwable t) {
104         if (t != null)
105             getLogger().error("Setting problem for {} and previous state {}", this, this.state, t);
106         this.state = State.PROBLEM;
107         this.exception = t;
108     }
109     
110     public void setNone() {
111         this.state = State.NONE;
112     }
113     
114     public void setReady() {
115         this.state = State.READY;
116     }
117     
118     protected boolean checkState(State state) {
119         return this.state == state;
120     }
121     
122     protected void assertState(State state) throws AssertionError {
123
124         if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name());
125         
126     }
127     
128     public void changeState(IProgressMonitor monitor, Session session, State state) {
129         changeState(monitor, session, state, 0);
130     }
131
132     protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) {
133
134         if (this.state == state) {
135             if (getLogger().isDebugEnabled())
136                 getLogger().debug("Trying to change state {} to the same as previous state {} in depth {} with {}", state, this.state, depth, this);
137             return;
138         }
139
140         if (IndexPolicy.TRACE_INDEX_MANAGEMENT)
141                 System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this);
142
143         // Check transitions
144         
145         // Try to exit problem state
146         if (State.PROBLEM == this.state && depth > 0) {
147             getLogger().info("Try to exit problem state for {} and state {}", this, state);
148                 Throwable t = bestEffortClear(monitor, session);
149                 if(t != null) {
150                     getLogger().error("Best effort clear has failed for state {} and this {}", state, this, t);
151                                 exception = t;
152                                 return;
153                 }
154                 // Managed to get into initial state
155                 this.state = State.NONE;
156                 getLogger().info("Managed to get into initial state {}", this.state);
157                 return;
158         }
159
160         // Cannot move into read from no index
161         if (State.NONE ==  this.state && State.READ == state) {
162             if (getLogger().isDebugEnabled())
163                 getLogger().debug("Cannot move into read from no index in {} with state {}", this, state);
164             return;
165         }
166         // Cannot move into write from no index
167         if (State.NONE ==  this.state && State.WRITE == state) {
168             if (getLogger().isDebugEnabled())
169                 getLogger().debug("Cannot move into write from no index in {} with state {}", this, state);
170             return;
171         }
172         
173                 boolean success = false;
174
175         try {
176
177                 if (searcher != null) {
178                         searcher = null;
179                 }
180                 if (reader != null) {
181                         reader.close();
182                         reader = null;
183                 }
184                         closeWriter(writer);
185                         directory = null;
186                         
187                         success = true;
188
189                 // Enter new state
190                 if (State.READ == state || State.WRITE == state) {
191                         
192                         success = false;
193                         
194                         boolean forWriting = State.WRITE == state;
195
196                         if (directory != null)
197                                 throw new IllegalStateException(getDescriptor() + "Index already loaded");
198
199                         SubMonitor mon = SubMonitor.convert(monitor, 100);
200
201                         mon.beginTask("Loading index", 100);
202
203                         if (IndexPolicy.TRACE_INDEX_LOAD)
204                     System.out.println(getDescriptor() + "Loading Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading"));
205
206                 long start = System.nanoTime();
207
208                 directory = getDirectory(session);
209
210                 if (forWriting) {
211                     // Never overwrite an index that is about to be loaded.
212                     // TODO: could use OpenMode.CREATE_OR_APPEND but must test first
213                     IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.APPEND);
214                     try {
215                         // FIXME: platform #4676
216                         writer = new IndexWriter(directory, config);
217                     } catch (IndexNotFoundException e) {
218                         // There was no pre-existing index on disk. Create it now.
219                         writer = new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE));
220                         writer.commit();
221                     }
222                     reader = DirectoryReader.open(directory);
223                     searcher = new IndexSearcher(reader);
224                 } else {
225                     reader = DirectoryReader.open(directory);
226                     searcher = new IndexSearcher(reader);
227                 }
228
229                 long end = System.nanoTime();
230
231                 mon.worked(100);
232
233                 if (IndexPolicy.PERF_INDEX_LOAD) {
234                     double time = (end - start) * 1e-6;
235                     System.out.println(getDescriptor() + "Loaded Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading") + " in " + time + " ms");
236                 }
237
238                 success = true;
239                 
240                 }
241                 
242         } catch (Throwable t) {
243                 setProblem(t);
244         } finally {
245
246                 if(!success) {
247                         this.state = State.PROBLEM;
248                         changeState(monitor, session, State.NONE, depth+1);
249                         return;
250                 }
251
252         }
253
254         this.state = state;
255         
256     }
257
258     public static final FieldType STRING_TYPE = new FieldType();
259
260     static {
261       STRING_TYPE.setIndexed(true);
262       STRING_TYPE.setStored(true);
263       STRING_TYPE.setTokenized(true);
264       STRING_TYPE.freeze();
265     }
266
267     protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException {
268         switch (fieldClass) {
269         case "Long":   return new LongField(fieldName, 0L, Field.Store.YES);
270         case "String": return new Field    (fieldName, "", STRING_TYPE);
271         case "Text":   return new TextField(fieldName, "", Field.Store.YES);
272         default:
273             throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass);
274         }
275     }
276
277     protected static Field[] makeFieldsForRelation(GenericRelation r, int boundLength, Document document) throws DatabaseException {
278         Pair<String, String>[] fields = r.getFields();
279         Field[] fs = new Field[Math.max(0, fields.length - boundLength)];
280         for (int i = boundLength; i < fields.length; i++) {
281             Field f = makeField(fields[i].first, fields[i].second);
282             fs[i - boundLength] = f;
283             if (document != null)
284                 document.add(f);
285         }
286         return fs;
287     }
288
289     void insertIndex(IProgressMonitor monitor, GenericRelation r, int boundLength, Collection<Object[]> documentsData)
290     throws CorruptIndexException, IOException, DatabaseException {
291         assertAccessOpen(true);
292
293         if (IndexPolicy.TRACE_INDEX_UPDATE)
294             System.out.println(getDescriptor() + "Inserting " + documentsData.size() + " documents into index at " + indexPath);
295
296         long start = 0, end = 0;
297         if (IndexPolicy.PERF_INDEX_UPDATE)
298             start = System.nanoTime();
299
300         try {
301             Document document = new Document();
302             Field[] fs = makeFieldsForRelation(r, boundLength, document);
303
304             for (Object[] documentData : documentsData) {
305                 if (setFields(fs, documentData) == null)
306                     continue;
307
308                 if (IndexPolicy.TRACE_INDEX_UPDATE)
309                     System.out.println(getDescriptor() + "Inserting document " + document);
310
311                 writer.addDocument(document);
312             }
313
314             if (IndexPolicy.PERF_INDEX_UPDATE) {
315                 end = System.nanoTime();
316                 double ms = (end - start) * 1e-6;
317                 System.out.println(getDescriptor() + "Inserted " + documentsData.size() + " documents into index at " + indexPath + " in " + ms + " ms");
318             }
319
320         } finally {
321         }
322     }
323
324     void removeIndex(IProgressMonitor monitor, GenericRelation r, RequestProcessor processor, String key, Collection<Object> keyValues) throws DatabaseException, CorruptIndexException, IOException {
325         assertAccessOpen(true);
326
327         if (IndexPolicy.TRACE_INDEX_UPDATE)
328             System.out.println(getDescriptor() + "Removing " + keyValues.size() + " documents from index at " + indexPath);
329
330         long start = 0, end = 0;
331         if (IndexPolicy.PERF_INDEX_UPDATE)
332             start = System.nanoTime();
333
334         try {
335             for (Object keyValue : keyValues) {
336                 Term removedTerm = null;
337                 if (keyValue instanceof Long) {
338                     removedTerm = IndexUtils.longTerm(key, (Long) keyValue);
339                 } else if (keyValue instanceof String) {
340                     removedTerm = new Term(key, (String) keyValue);
341                 } else {
342                     // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state
343                     continue;
344                 }
345
346                 if (IndexPolicy.TRACE_INDEX_UPDATE)
347                     System.out.println(getDescriptor() + "Removing document with key " + removedTerm);
348                 writer.deleteDocuments(removedTerm);
349             }
350
351             if (IndexPolicy.PERF_INDEX_UPDATE) {
352                 end = System.nanoTime();
353                 double ms = (end - start) * 1e-6;
354                 System.out.println(getDescriptor() + "Removed " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");
355             }
356
357         } finally {
358         }
359     }
360
361     void removeIndex(IProgressMonitor monitor) throws DatabaseException, CorruptIndexException, IOException {
362         assertAccessOpen(true);
363
364         long start = 0, end = 0;
365         if (IndexPolicy.PERF_INDEX_UPDATE)
366             start = System.nanoTime();
367
368         try {
369
370             writer.deleteAll();
371
372             if (IndexPolicy.PERF_INDEX_UPDATE) {
373                 end = System.nanoTime();
374                 double ms = (end - start) * 1e-6;
375                 System.out.println(getDescriptor() + "Removed all documents from index at " + indexPath + " in " + ms + " ms");
376             }
377
378         } finally {
379         }
380     }
381     
382     boolean replaceIndex(IProgressMonitor monitor, String key, Collection<Object> keyValues, GenericRelation r, int boundLength, Collection<Object[]> documentsData) throws CorruptIndexException, IOException, DatabaseException {
383
384         boolean didReplace = false;
385         
386         assertAccessOpen(true);
387         if (keyValues.size() != documentsData.size())
388             throw new IllegalArgumentException("keyValues size does not match documents data size, " + keyValues.size() + " <> " + documentsData.size());
389
390         if (IndexPolicy.TRACE_INDEX_UPDATE)
391             System.out.println(getDescriptor() + "Replacing " + keyValues.size() + " documents from index at " + indexPath);
392
393         long start = 0, end = 0;
394         if (IndexPolicy.PERF_INDEX_UPDATE)
395             start = System.nanoTime();
396
397         try {
398             Iterator<Object> keyIt = keyValues.iterator();
399             Iterator<Object[]> documentDataIt = documentsData.iterator();
400
401             Document document = new Document();
402             Field[] fs = makeFieldsForRelation(r, boundLength, document);
403
404             nextDocument:
405                 while (keyIt.hasNext()) {
406                     Object keyValue = keyIt.next();
407                     Object[] documentData = documentDataIt.next();
408
409                     Term removedTerm = null;
410                     if (keyValue instanceof Long) {
411                         removedTerm = IndexUtils.longTerm(key, (Long) keyValue);
412                     } else if (keyValue instanceof String) {
413                         removedTerm = new Term(key, (String) keyValue);
414                     } else {
415                         // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state
416                         System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document key to remove '" + keyValue + "', only " + String.class + " and " + Resource.class + " are supported.");
417                         continue nextDocument;
418                     }
419
420                     if (setFields(fs, documentData) == null)
421                         continue nextDocument;
422
423                     if (IndexPolicy.TRACE_INDEX_UPDATE)
424                         System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document);
425
426                     boolean done = false;
427                     if(requireChangeInfoOnReplace()) {
428                             TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2);
429                             if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) {
430                                 Document doc = reader.document(exist.scoreDocs[0].doc);
431                                 if(!areSame(doc, document)) {
432                                     writer.deleteDocuments(removedTerm);
433                                     writer.addDocument(document);
434                                     didReplace |= true;
435                                     if (IndexPolicy.TRACE_INDEX_UPDATE)
436                                         System.out.println("-replaced single existing");
437                                 } else {
438                                     if (IndexPolicy.TRACE_INDEX_UPDATE)
439                                         System.out.println("-was actually same than single existing");
440                                 }
441                                 done = true;
442                             } 
443                     }
444                     if(!done) {
445                         writer.deleteDocuments(removedTerm);
446                         writer.addDocument(document);
447                         didReplace |= true;
448                         if (IndexPolicy.TRACE_INDEX_UPDATE)
449                                 System.out.println("-had many or none - removed all existing");
450                     }
451                     
452                 }
453
454             if (IndexPolicy.PERF_INDEX_UPDATE) {
455                 end = System.nanoTime();
456                 double ms = (end - start) * 1e-6;
457                 System.out.println(getDescriptor() + "Replaced " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");
458             }
459
460         } finally {
461         }
462         
463         return didReplace;
464         
465     }
466     
467     protected boolean requireChangeInfoOnReplace() {
468         return true;
469     }
470     
471     private boolean areSame(Document d1, Document d2) {
472         List<IndexableField> fs1 = d1.getFields();
473         List<IndexableField> fs2 = d2.getFields();
474         if(fs1.size() != fs2.size()) return false;
475         for(int i=0;i<fs1.size();i++) {
476                 IndexableField f1 = fs1.get(i);
477                 IndexableField f2 = fs2.get(i);
478                 String s1 = f1.stringValue();
479                 String s2 = f2.stringValue();
480             if (IndexPolicy.TRACE_INDEX_UPDATE)
481                 System.err.println("areSame " + s1 + " vs. " + s2 );
482                 if(!ObjectUtils.objectEquals(s1,s2)) return false;
483         }
484         return true;
485     }
486
487     final RequestProcessor session;
488
489     final Resource         relation;
490
491     /**
492      * The schema of the index, i.e. the fields that will be indexed per
493      * document for the specified relation. Since the relation stays the same
494      * throughout the lifetime of this class, the index schema is also assumed
495      * to the same. This means that {@link GenericRelation#getFields()} is
496      * assumed to stay the same.
497      */
498     final IndexSchema      schema;
499
500     Resource         input;
501
502     Path             indexPath;
503
504     Directory        directory;
505
506     IndexReader      reader;
507
508     IndexWriter      writer;
509
510     IndexSearcher    searcher;
511
512     IndexedRelationsSearcherBase(RequestProcessor session, Resource relation, Resource input) {
513         this.session = session;
514         this.relation = relation;
515         this.input = input;
516         this.indexPath = getIndexDirectory(session.getSession(), relation, input);
517         if(isIndexAvailable()) {
518                 state = State.READY;
519         } else {
520                 state = State.NONE;
521         }
522         this.schema = IndexSchema.readFromRelation(session, relation);
523     }
524
525     Directory getDirectory(Session session) throws IOException {
526         return FSDirectory.open(indexPath.toFile());
527     }
528
529     abstract String getDescriptor();
530     
531     /**
532      * Ensures that searcher is in read or write state.
533      * 
534      * @param forWriting <code>true</code> to open index for writing,
535      *        <code>false</code> for reading
536      * @return true is required state was reached       
537      *        
538      */
539     boolean startAccess(IProgressMonitor monitor, Session session, boolean forWriting) {
540         if(forWriting) {
541                 changeState(monitor, session, State.WRITE);
542                 return checkState(State.WRITE);
543         } else {
544                 changeState(monitor, session, State.READ);
545                 return checkState(State.READ);
546         }
547     }
548
549     boolean hasAccess(boolean forWriting) {
550         
551         if (forWriting)
552                 return checkState(State.WRITE); 
553         else
554                 return checkState(State.WRITE) || checkState(State.READ);
555         
556     }
557     
558     void assertAccessOpen(boolean forWriting) {
559         if (forWriting)
560                 if(!checkState(State.WRITE)) 
561                 throw new IllegalStateException("index not opened for writing (directory=" + directory + ", reader=" + reader + ")");
562         else
563                 if(!(checkState(State.WRITE) || checkState(State.READ))) 
564                 throw new IllegalStateException("index not opened for reading (directory=" + directory + ", writer=" + writer + ")");
565     }
566     
567     void closeWriter(IndexWriter writer) throws CorruptIndexException, IOException {
568         if (writer == null)
569             return;
570
571         try {
572             // May throw OOME, see IndexWriter javadoc for the correct actions.
573             writer.close(false);
574         } catch (OutOfMemoryError e) {
575             writer.close();
576             throw e;
577         }
578     }
579
580     public static String getPattern(GenericRelation relation, int boundCount) {
581         String result = "";
582         for (int i = 0; i < boundCount; i++)
583             result += "b";
584         for (int i = 0; i < relation.getFields().length - boundCount; i++)
585             result += "f";
586         return result;
587     }
588
589     void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)
590             throws IOException, DatabaseException
591     {
592         IndexingJob.jobifyIfPossible(
593                 monitor,
594                 "Reindexing " + NameUtils.getSafeLabel(graph, input),
595                 mon -> {
596                     try {
597                         GenericRelation r = graph.adapt(relation, GenericRelation.class);
598                         if (r == null)
599                             throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation.");
600
601                         GenericRelation selection = r.select(getPattern(r, bound.length), bound);
602
603                         List<Object[]> results = selection.realize(graph);
604                         initializeIndexImpl(new CompletableFuture<>(), mon, r, results, bound, overwrite);
605                     } catch (IOException e) {
606                         getLogger().error("Index is in problematic state! {}", this, e);
607                         throw new DatabaseException(e);
608                     }
609                 });
610     }
611
612     private static final int INDEXING_THREAD_COUNT = 2; // this is quite good parallelism level for lucene
613
614     void initializeIndexImpl(CompletableFuture<?> result, IProgressMonitor monitor, GenericRelation r, List<Object[]> results, final Object[] bound, boolean overwrite) throws IOException {
615         try {
616             final SubMonitor mon = SubMonitor.convert(monitor, 100);
617     
618             if (IndexPolicy.TRACE_INDEX_INIT)
619                 System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");
620             mon.beginTask("Initializing Index", 100);
621     
622             if (overwrite) {
623                 if (Files.exists(indexPath)) {
624                     mon.subTask("Erasing previous index");
625                     if (getLogger().isDebugEnabled())
626                         getLogger().debug("Erasing previous index {}", indexPath.toAbsolutePath());
627                     FileUtils.emptyDirectory(indexPath);
628                 }
629             }
630     
631             final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();
632             final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();
633     
634             try {
635                 mon.subTask("Start index write");
636                 Files.createDirectories(indexPath);
637     
638                 directory.set(FSDirectory.open(indexPath.toFile()));
639                 IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);
640                 writer.set(new IndexWriter(directory.get(), conf));
641     
642                 mon.worked(5);
643     
644                 long realizeStart = 0;
645                 if (IndexPolicy.PERF_INDEX_INIT)
646                     realizeStart = System.nanoTime();
647     
648                 mon.subTask("Calculating indexed content");
649                 mon.worked(5);
650                 
651                 mon.worked(40);
652     
653                 if (IndexPolicy.PERF_INDEX_INIT)
654                     System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");
655     
656                 if (IndexPolicy.TRACE_INDEX_INIT)
657                     System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");
658                 
659                 long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;
660     
661                 mon.subTask("Indexing content");
662                 final Semaphore s = new Semaphore(0);
663                 mon.setWorkRemaining(results.size());
664                 for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {
665                     final int startIndex = i;
666                     ThreadUtils.getBlockingWorkExecutor().submit(() -> {
667                         try {
668                             Document document = new Document();
669                             Field[] fs = makeFieldsForRelation(r, bound.length, document);
670     
671                             for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {
672                                 if (setFields(fs, results.get(index)) == null)
673                                     continue;
674                                 try {
675                                     writer.get().addDocument(document);
676                                 } catch (CorruptIndexException e) {
677                                     getLogger().error("Index is corrupted! {}", this, e);
678                                     throw new IllegalStateException(e);
679                                 } catch (IOException e) {
680                                     getLogger().error("Index is in problematic state! {}", this, e);
681                                     throw new IllegalStateException(e);
682                                 } finally {
683                                     synchronized (mon) {
684                                         mon.worked(1);
685                                     }
686                                 }
687                             }
688                         } catch (DatabaseException e) {
689                             throw new IllegalStateException(e);
690                         } finally {
691                             s.release();
692                         }
693                     });
694                 }
695     
696                 try {
697                     s.acquire(INDEXING_THREAD_COUNT);
698                 } catch (InterruptedException e) {
699                     getLogger().error("Could not initialize index {}", this, e);
700                 }
701     
702                 // http://www.gossamer-threads.com/lists/lucene/java-dev/47895
703                 // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0
704                 // advise against calling optimize at all. So let's not do it anymore.
705                 //writer.get().optimize();
706                 //writer.get().commit();
707     
708                 mon.subTask("Flushing");
709     
710                 if (IndexPolicy.PERF_INDEX_INIT)
711                     System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");
712                 
713                 result.complete(null);
714     //        } catch (DatabaseException e) {
715     //            getLogger().error("Could not initialize index due to db {}", this, e);
716             } finally {
717                 try {
718                     closeWriter(writer.getAndSet(null));
719                 } finally {
720                     FileUtils.uncheckedClose(directory.getAndSet(null));
721                 }
722             }
723         } catch (Throwable t) {
724             getLogger().error("Could not initialize index", t);
725             result.completeExceptionally(t);
726         }
727     }
728
729     
730     public List<Object[]> debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException {
731     
732             Query query = new MatchAllDocsQuery(); 
733         
734             TopDocs td = searcher.search(query, Integer.MAX_VALUE);
735     
736             ScoreDoc[ ] scoreDocs = td.scoreDocs; 
737             List<Object[]> result = new ArrayList<Object[]>(scoreDocs.length);
738         
739             for(ScoreDoc scoreDoc:scoreDocs) {
740         
741                 try {
742         
743                     Document doc = reader.document(scoreDoc.doc);
744                     List<IndexableField> fs = doc.getFields();
745                     Object[] o = new Object[fs.size()];
746                     int index = 0; 
747                     for (IndexableField f : fs) {
748                     o[index++] = f.stringValue();
749                     }
750                     result.add(o);
751         
752             } catch (CorruptIndexException e) {
753                 getLogger().error("Index is corrupted! {}", this, e);
754                 throw new DatabaseException(e);
755             } catch (IOException e) {
756                 getLogger().error("Index is in problematic state! {}", this, e);
757                 throw new DatabaseException(e);
758             }
759
760             }
761             
762             return result;
763             
764     }
765
766     
767     List<Map<String, Object>> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,
768     DatabaseException {
769
770         // An empty search string will crash QueryParser
771         // Just return no results for empty queries.
772         //System.out.println("search: '" + search + "'");
773         if (search.isEmpty())
774             return Collections.emptyList();
775
776         assertAccessOpen(false);
777
778         Query query = Queries.parse(search, schema);
779
780         long start = System.nanoTime();
781
782         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());
783         if (maxResultCount == 0)
784             return Collections.emptyList();
785         
786         final TopDocs docs = searcher.search(query, null, maxResultCount);
787         
788 //        for(Object[] o : debugDocs(monitor)) {
789 //            System.err.println("-" + Arrays.toString(o));
790 //        }
791         
792         if (IndexPolicy.PERF_INDEX_QUERY) {
793             long end = System.nanoTime();
794             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");
795         }
796
797         if (docs.totalHits == 0) {
798             return Collections.emptyList();
799         }
800
801         return processor.syncRequest(new Read<List<Map<String, Object>>>() {
802
803             @Override
804             public List<Map<String, Object>> perform(ReadGraph graph) throws DatabaseException {
805
806                 GenericRelation r = graph.adapt(relation, GenericRelation.class);
807                 if (r == null)
808                     throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))
809                             + "could not be adapted to GenericRelation.");
810
811                 SerialisationSupport support = graph.getService(SerialisationSupport.class);
812
813                 List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(docs.scoreDocs.length);
814                 
815                 final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
816                 
817                 for (ScoreDoc scoreDoc : docs.scoreDocs) {
818
819                     try {
820
821                         reader.document(scoreDoc.doc, visitor);
822                         
823                         Document doc = visitor.getDocument();
824
825                         List<IndexableField> fs = doc.getFields();
826                         Map<String, Object> entry = new THashMap<String, Object>(fs.size());
827                         for (IndexableField f : fs) {
828                             IndexSchema.Type type = schema.typeMap.get(f.name());
829                             if (type == IndexSchema.Type.LONG) {
830                                 entry.put(f.name(), support.getResource(f.numericValue().longValue()));
831                             } else {
832                                 entry.put(f.name(), f.stringValue());
833                             }
834                         }
835                         
836                         result.add(entry);
837
838                     } catch (CorruptIndexException e) {
839                         getLogger().error("Index is corrupted! {}", this, e);
840                         throw new DatabaseException(e);
841                     } catch (IOException e) {
842                         getLogger().error("Index is in problematic state! {}", this, e);
843                         throw new DatabaseException(e);
844                     }
845                 }
846                 return result;
847             }
848         });
849     }
850
851     static class ResourceVisitor extends StoredFieldVisitor {
852         
853         public long id;
854
855                 @Override
856                 public Status needsField(FieldInfo fieldInfo) throws IOException {
857                         if("Resource".equals(fieldInfo.name)) return Status.YES;
858                         return Status.NO;
859                 }
860                 
861                 @Override
862                 public void longField(FieldInfo fieldInfo, long value) throws IOException {
863                         id = value;
864                 }
865         
866     };
867     
868     static class DumpVisitor extends StoredFieldVisitor {
869
870         public List<Object> values;
871         
872         DumpVisitor(List<Object> values) {
873                 this.values = values;
874         }
875
876                 @Override
877                 public Status needsField(FieldInfo fieldInfo) throws IOException {
878                         return Status.YES;
879                 }
880                 
881                 @Override
882                 public void longField(FieldInfo fieldInfo, long value) throws IOException {
883                         values.add(value);
884                 }
885                 
886                 @Override
887                 public void stringField(FieldInfo fieldInfo, String value) throws IOException {
888                         values.add(value);
889                 }
890
891     }
892
893     List<Resource> doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,
894     DatabaseException {
895
896         // An empty search string will crash QueryParser
897         // Just return no results for empty queries.
898         //System.out.println("search: '" + search + "'");
899         if (search.isEmpty())
900             return Collections.emptyList();
901
902         assertAccessOpen(false);
903
904         Query query = Queries.parse(search, schema);
905
906         long start = System.nanoTime();
907
908         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());
909         if (maxResultCount == 0)
910             return Collections.emptyList();
911         
912         final TopDocs docs = searcher.search(query, null, maxResultCount);
913         
914 //        for(Object[] o : debugDocs(monitor)) {
915 //            System.err.println("-" + Arrays.toString(o));
916 //        }
917         
918         if (IndexPolicy.PERF_INDEX_QUERY) {
919             long end = System.nanoTime();
920             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");
921         }
922
923         if (docs.totalHits == 0) {
924             return Collections.emptyList();
925         }
926         
927         return processor.syncRequest(new Read<List<Resource>>() {
928
929             @Override
930             public List<Resource> perform(ReadGraph graph) throws DatabaseException {
931
932                 CollectionSupport cs = graph.getService(CollectionSupport.class);
933                 SerialisationSupport support = graph.getService(SerialisationSupport.class);
934                 
935                 List<Resource> result = cs.createList();
936                 
937                 ResourceVisitor visitor = new ResourceVisitor();
938                 
939                 for (ScoreDoc scoreDoc : docs.scoreDocs) {
940                     try {
941                         reader.document(scoreDoc.doc, visitor);
942                         result.add(support.getResource(visitor.id));
943                     } catch (CorruptIndexException e) {
944                         getLogger().error("Index is corrupted! {}", this, e);
945                         throw new DatabaseException(e);
946                     } catch (IOException e) {
947                         getLogger().error("Index is in problematic state! {}", this, e);
948                         throw new DatabaseException(e);
949                     }
950                 }
951                 return result;
952             }
953         });
954     }
955
956     List<Object> doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException,
957     DatabaseException {
958
959         assertAccessOpen(false);
960
961         Query query = new MatchAllDocsQuery(); 
962
963         final TopDocs docs = searcher.search(query, Integer.MAX_VALUE);
964         
965         ArrayList<Object> result = new ArrayList<Object>();
966         
967         DumpVisitor visitor = new DumpVisitor(result);
968                 
969         for (ScoreDoc scoreDoc : docs.scoreDocs) {
970
971                 try {
972
973                         reader.document(scoreDoc.doc, visitor);
974
975                 } catch (CorruptIndexException e) {
976                     getLogger().error("Index is corrupted! {}", this, e);
977                         throw new DatabaseException(e);
978                 } catch (IOException e) {
979                     getLogger().error("Index is in problematic state! {}", this, e);
980                         throw new DatabaseException(e);
981                 }
982
983         }
984
985         return result;
986
987     }
988     
989     protected static Path getIndexDirectory(Session session, Resource relation, Resource input) {
990         Path path = DatabaseIndexing.getIndexLocation(session, relation, input);
991 //        System.out.println("getIndexDirectory = " + path);
992         return path;
993     }
994
995     Path getIndexPath() {
996         return indexPath;
997     }
998
999     boolean isIndexAvailable() {
1000         return Files.isDirectory(indexPath);
1001     }
1002
1003     abstract Throwable bestEffortClear(IProgressMonitor monitor, Session session);
1004
1005     /*
1006      * Start from scratch. Clear all caches and rebuild the index. 
1007      */
1008     Throwable clearDirectory(IProgressMonitor monitor, Session session) {
1009         
1010                 Path file = getIndexPath();
1011
1012         try {
1013                         FileUtils.delete(file);
1014         } catch (Throwable t) {
1015                 getLogger().error("Could not delete directory {}", file.toAbsolutePath(), t);
1016                 return t;
1017         }
1018         if (Files.exists(file))
1019             return new IllegalStateException("Failed to delete directory " + file.toAbsolutePath());
1020         return null;
1021     }
1022
1023     private Field[] setFields(Field[] fs, Object[] result) {
1024         for (int i = 0; i < result.length; i++) {
1025             Object value = result[i];
1026             if (value instanceof String) {
1027                 if (IndexPolicy.DEBUG_INDEX_INIT)
1028                     System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String");
1029                 fs[i].setStringValue((String) value);
1030             } else if (value instanceof Long) {
1031                 if (IndexPolicy.DEBUG_INDEX_INIT)
1032                     System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");
1033                 fs[i].setLongValue((Long) value);
1034             } else {
1035                 getLogger().error("Can only index Long and String fields, encountered " + value);
1036                 return null;
1037             }
1038         }
1039         return fs;
1040     }
1041
1042     protected abstract Logger getLogger();
1043     
1044     @Override
1045     public String toString() {
1046         return getClass().getSimpleName() + " [" + String.valueOf(schema) + ", " + String.valueOf(relation) + ", " + String.valueOf(input) + ", " + String.valueOf(indexPath) + ", " + String.valueOf(directory) + ", " + String.valueOf(state) + "]";
1047     }
1048 }