]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java
Merge commit '0915910'
[simantics/platform.git] / bundles / org.simantics.db.indexing / src / org / simantics / db / indexing / IndexedRelationsSearcherBase.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2015 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *     Semantum Oy - Fix for simantics issue #6053\r
12  *******************************************************************************/\r
13 package org.simantics.db.indexing;\r
14 \r
15 import java.io.File;\r
16 import java.io.IOException;\r
17 import java.util.ArrayList;\r
18 import java.util.Collection;\r
19 import java.util.Collections;\r
20 import java.util.Iterator;\r
21 import java.util.List;\r
22 import java.util.Map;\r
23 import java.util.concurrent.ExecutorService;\r
24 import java.util.concurrent.Executors;\r
25 import java.util.concurrent.Semaphore;\r
26 import java.util.concurrent.ThreadFactory;\r
27 import java.util.concurrent.atomic.AtomicReference;\r
28 \r
29 import org.apache.lucene.document.Document;\r
30 import org.apache.lucene.document.DocumentStoredFieldVisitor;\r
31 import org.apache.lucene.document.Field;\r
32 import org.apache.lucene.document.FieldType;\r
33 import org.apache.lucene.document.LongField;\r
34 import org.apache.lucene.document.TextField;\r
35 import org.apache.lucene.index.CorruptIndexException;\r
36 import org.apache.lucene.index.DirectoryReader;\r
37 import org.apache.lucene.index.FieldInfo;\r
38 import org.apache.lucene.index.IndexNotFoundException;\r
39 import org.apache.lucene.index.IndexReader;\r
40 import org.apache.lucene.index.IndexWriter;\r
41 import org.apache.lucene.index.IndexWriterConfig;\r
42 import org.apache.lucene.index.IndexWriterConfig.OpenMode;\r
43 import org.apache.lucene.index.IndexableField;\r
44 import org.apache.lucene.index.StoredFieldVisitor;\r
45 import org.apache.lucene.index.Term;\r
46 import org.apache.lucene.queryparser.classic.ParseException;\r
47 import org.apache.lucene.search.IndexSearcher;\r
48 import org.apache.lucene.search.MatchAllDocsQuery;\r
49 import org.apache.lucene.search.Query;\r
50 import org.apache.lucene.search.ScoreDoc;\r
51 import org.apache.lucene.search.TermQuery;\r
52 import org.apache.lucene.search.TopDocs;\r
53 import org.apache.lucene.store.Directory;\r
54 import org.apache.lucene.store.FSDirectory;\r
55 import org.apache.lucene.util.Version;\r
56 import org.eclipse.core.runtime.IProgressMonitor;\r
57 import org.eclipse.core.runtime.SubMonitor;\r
58 import org.simantics.databoard.util.ObjectUtils;\r
59 import org.simantics.db.ReadGraph;\r
60 import org.simantics.db.RequestProcessor;\r
61 import org.simantics.db.Resource;\r
62 import org.simantics.db.Session;\r
63 import org.simantics.db.common.request.SafeName;\r
64 import org.simantics.db.common.utils.Logger;\r
65 import org.simantics.db.common.utils.NameUtils;\r
66 import org.simantics.db.exception.DatabaseException;\r
67 import org.simantics.db.indexing.internal.IndexingJob;\r
68 import org.simantics.db.layer0.adapter.GenericRelation;\r
69 import org.simantics.db.request.Read;\r
70 import org.simantics.db.service.CollectionSupport;\r
71 import org.simantics.db.service.SerialisationSupport;\r
72 import org.simantics.utils.FileUtils;\r
73 import org.simantics.utils.datastructures.Pair;\r
74 \r
75 import gnu.trove.map.hash.THashMap;\r
76 \r
77 /**\r
78  * @author Tuukka Lehtonen\r
79  * @author Antti Villberg\r
80  */\r
81 abstract public class IndexedRelationsSearcherBase {\r
82 \r
83     protected enum State {\r
84         // No index is available\r
85         NONE, \r
86         // An index is available, but there is a problem with it\r
87         PROBLEM, \r
88         // An index is available but no reader or writer is ready\r
89         READY,\r
90         // A reader is ready\r
91         READ, \r
92         // A writer (and a reader) is ready\r
93         WRITE\r
94     }\r
95     \r
96     private State state = State.READY;\r
97     private Throwable exception;\r
98     \r
99     public Throwable getException() {\r
100         return exception;\r
101     }\r
102 \r
103     public void setProblem(Throwable t) {\r
104         this.state = State.PROBLEM;\r
105         this.exception = t;\r
106     }\r
107     \r
108     public void setNone() {\r
109         this.state = State.NONE;\r
110     }\r
111     \r
112     public void setReady() {\r
113         this.state = State.READY;\r
114     }\r
115     \r
116     protected boolean checkState(State state) {\r
117         return this.state == state;\r
118     }\r
119     \r
120     protected void assertState(State state) throws AssertionError {\r
121 \r
122         if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name());\r
123         \r
124     }\r
125     \r
126     public void changeState(IProgressMonitor monitor, Session session, State state) {\r
127         changeState(monitor, session, state, 0);\r
128     }\r
129 \r
130     protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) {\r
131 \r
132         if(this.state == state) return;\r
133 \r
134         if (IndexPolicy.TRACE_INDEX_MANAGEMENT)\r
135                 System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this);\r
136 \r
137         // Check transitions\r
138         \r
139         // Try to exit problem state\r
140         if (State.PROBLEM == this.state && depth > 0) {\r
141                 Throwable t = bestEffortClear(monitor, session);\r
142                 if(t != null) {\r
143                                 exception = t;\r
144                                 return;\r
145                 }\r
146                 // Managed to get into initial state\r
147                 this.state = State.NONE;\r
148                 return;\r
149         }\r
150 \r
151         // Cannot move into read from no index\r
152         if (State.NONE ==  this.state && State.READ == state) return;\r
153         // Cannot move into write from no index\r
154         if (State.NONE ==  this.state && State.WRITE == state) return;\r
155         \r
156                 boolean success = false;\r
157 \r
158         try {\r
159 \r
160                 if (searcher != null) {\r
161                         searcher = null;\r
162                 }\r
163                 if (reader != null) {\r
164                         reader.close();\r
165                         reader = null;\r
166                 }\r
167                         closeWriter(writer);\r
168                         directory = null;\r
169                         \r
170                         success = true;\r
171 \r
172                 // Enter new state\r
173                 if (State.READ == state || State.WRITE == state) {\r
174                         \r
175                         success = false;\r
176                         \r
177                         boolean forWriting = State.WRITE == state;\r
178 \r
179                         if (directory != null)\r
180                                 throw new IllegalStateException(getDescriptor() + "Index already loaded");\r
181 \r
182                         SubMonitor mon = SubMonitor.convert(monitor, 100);\r
183 \r
184                         mon.beginTask("Loading index", 100);\r
185 \r
186                         if (IndexPolicy.TRACE_INDEX_LOAD)\r
187                     System.out.println(getDescriptor() + "Loading Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading"));\r
188 \r
189                 long start = System.nanoTime();\r
190 \r
191                 directory = getDirectory(session);\r
192 \r
193                 if (forWriting) {\r
194                     // Never overwrite an index that is about to be loaded.\r
195                     // TODO: could use OpenMode.CREATE_OR_APPEND but must test first\r
196                     IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.APPEND);\r
197                     try {\r
198                         // FIXME: platform #4676\r
199                         writer = new IndexWriter(directory, config);\r
200                     } catch (IndexNotFoundException e) {\r
201                         // There was no pre-existing index on disk. Create it now.\r
202                         writer = new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE));\r
203                         writer.commit();\r
204                     }\r
205                     reader = DirectoryReader.open(directory);\r
206                     searcher = new IndexSearcher(reader);\r
207                 } else {\r
208                     reader = DirectoryReader.open(directory);\r
209                     searcher = new IndexSearcher(reader);\r
210                 }\r
211 \r
212                 long end = System.nanoTime();\r
213 \r
214                 mon.worked(100);\r
215 \r
216                 if (IndexPolicy.PERF_INDEX_LOAD) {\r
217                     double time = (end - start) * 1e-6;\r
218                     System.out.println(getDescriptor() + "Loaded Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading") + " in " + time + " ms");\r
219                 }\r
220 \r
221                 success = true;\r
222                 \r
223                 }\r
224                 \r
225         } catch (Throwable t) {\r
226 \r
227                 setProblem(t);\r
228 \r
229         } finally {\r
230 \r
231                 if(!success) {\r
232                         this.state = State.PROBLEM;\r
233                         changeState(monitor, session, State.NONE, depth+1);\r
234                         return;\r
235                 }\r
236 \r
237         }\r
238 \r
239         this.state = state;\r
240         \r
241     }\r
242 \r
243     public static final FieldType STRING_TYPE = new FieldType();\r
244 \r
245     static {\r
246       STRING_TYPE.setIndexed(true);\r
247       STRING_TYPE.setStored(true);\r
248       STRING_TYPE.setTokenized(true);\r
249       STRING_TYPE.freeze();\r
250     }\r
251 \r
252     protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException {\r
253         switch (fieldClass) {\r
254         case "Long":   return new LongField(fieldName, 0L, Field.Store.YES);\r
255         case "String": return new Field    (fieldName, "", STRING_TYPE);\r
256         case "Text":   return new TextField(fieldName, "", Field.Store.YES);\r
257         default:\r
258             throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass);\r
259         }\r
260     }\r
261 \r
262     protected static Field[] makeFieldsForRelation(GenericRelation r, int boundLength, Document document) throws DatabaseException {\r
263         Pair<String, String>[] fields = r.getFields();\r
264         Field[] fs = new Field[Math.max(0, fields.length - boundLength)];\r
265         for (int i = boundLength; i < fields.length; i++) {\r
266             Field f = makeField(fields[i].first, fields[i].second);\r
267             fs[i - boundLength] = f;\r
268             if (document != null)\r
269                 document.add(f);\r
270         }\r
271         return fs;\r
272     }\r
273 \r
274     void insertIndex(IProgressMonitor monitor, GenericRelation r, int boundLength, Collection<Object[]> documentsData)\r
275     throws CorruptIndexException, IOException, DatabaseException {\r
276         assertAccessOpen(true);\r
277 \r
278         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
279             System.out.println(getDescriptor() + "Inserting " + documentsData.size() + " documents into index at " + indexPath);\r
280 \r
281         long start = 0, end = 0;\r
282         if (IndexPolicy.PERF_INDEX_UPDATE)\r
283             start = System.nanoTime();\r
284 \r
285         try {\r
286             Document document = new Document();\r
287             Field[] fs = makeFieldsForRelation(r, boundLength, document);\r
288 \r
289             for (Object[] documentData : documentsData) {\r
290                 if (setFields(fs, documentData) == null)\r
291                     continue;\r
292 \r
293                 if (IndexPolicy.TRACE_INDEX_UPDATE)\r
294                     System.out.println(getDescriptor() + "Inserting document " + document);\r
295 \r
296                 writer.addDocument(document);\r
297             }\r
298 \r
299             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
300                 end = System.nanoTime();\r
301                 double ms = (end - start) * 1e-6;\r
302                 System.out.println(getDescriptor() + "Inserted " + documentsData.size() + " documents into index at " + indexPath + " in " + ms + " ms");\r
303             }\r
304 \r
305         } finally {\r
306         }\r
307     }\r
308 \r
309     void removeIndex(IProgressMonitor monitor, GenericRelation r, RequestProcessor processor, String key, Collection<Object> keyValues) throws DatabaseException, CorruptIndexException, IOException {\r
310         assertAccessOpen(true);\r
311 \r
312         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
313             System.out.println(getDescriptor() + "Removing " + keyValues.size() + " documents from index at " + indexPath);\r
314 \r
315         long start = 0, end = 0;\r
316         if (IndexPolicy.PERF_INDEX_UPDATE)\r
317             start = System.nanoTime();\r
318 \r
319         try {\r
320             for (Object keyValue : keyValues) {\r
321                 Term removedTerm = null;\r
322                 if (keyValue instanceof Long) {\r
323                     removedTerm = IndexUtils.longTerm(key, (Long) keyValue);\r
324                 } else if (keyValue instanceof String) {\r
325                     removedTerm = new Term(key, (String) keyValue);\r
326                 } else {\r
327                     // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
328                     continue;\r
329                 }\r
330 \r
331                 if (IndexPolicy.TRACE_INDEX_UPDATE)\r
332                     System.out.println(getDescriptor() + "Removing document with key " + removedTerm);\r
333                 writer.deleteDocuments(removedTerm);\r
334             }\r
335 \r
336             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
337                 end = System.nanoTime();\r
338                 double ms = (end - start) * 1e-6;\r
339                 System.out.println(getDescriptor() + "Removed " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");\r
340             }\r
341 \r
342         } finally {\r
343         }\r
344     }\r
345 \r
346     void removeIndex(IProgressMonitor monitor) throws DatabaseException, CorruptIndexException, IOException {\r
347         assertAccessOpen(true);\r
348 \r
349         long start = 0, end = 0;\r
350         if (IndexPolicy.PERF_INDEX_UPDATE)\r
351             start = System.nanoTime();\r
352 \r
353         try {\r
354 \r
355             writer.deleteAll();\r
356 \r
357             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
358                 end = System.nanoTime();\r
359                 double ms = (end - start) * 1e-6;\r
360                 System.out.println(getDescriptor() + "Removed all documents from index at " + indexPath + " in " + ms + " ms");\r
361             }\r
362 \r
363         } finally {\r
364         }\r
365     }\r
366     \r
367     boolean replaceIndex(IProgressMonitor monitor, String key, Collection<Object> keyValues, GenericRelation r, int boundLength, Collection<Object[]> documentsData) throws CorruptIndexException, IOException, DatabaseException {\r
368 \r
369         boolean didReplace = false;\r
370         \r
371         assertAccessOpen(true);\r
372         if (keyValues.size() != documentsData.size())\r
373             throw new IllegalArgumentException("keyValues size does not match documents data size, " + keyValues.size() + " <> " + documentsData.size());\r
374 \r
375         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
376             System.out.println(getDescriptor() + "Replacing " + keyValues.size() + " documents from index at " + indexPath);\r
377 \r
378         long start = 0, end = 0;\r
379         if (IndexPolicy.PERF_INDEX_UPDATE)\r
380             start = System.nanoTime();\r
381 \r
382         try {\r
383             Iterator<Object> keyIt = keyValues.iterator();\r
384             Iterator<Object[]> documentDataIt = documentsData.iterator();\r
385 \r
386             Document document = new Document();\r
387             Field[] fs = makeFieldsForRelation(r, boundLength, document);\r
388 \r
389             nextDocument:\r
390                 while (keyIt.hasNext()) {\r
391                     Object keyValue = keyIt.next();\r
392                     Object[] documentData = documentDataIt.next();\r
393 \r
394                     Term removedTerm = null;\r
395                     if (keyValue instanceof Long) {\r
396                         removedTerm = IndexUtils.longTerm(key, (Long) keyValue);\r
397                     } else if (keyValue instanceof String) {\r
398                         removedTerm = new Term(key, (String) keyValue);\r
399                     } else {\r
400                         // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
401                         System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document key to remove '" + keyValue + "', only " + String.class + " and " + Resource.class + " are supported.");\r
402                         continue nextDocument;\r
403                     }\r
404 \r
405                     if (setFields(fs, documentData) == null)\r
406                         continue nextDocument;\r
407 \r
408                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
409                         System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document);\r
410 \r
411                     boolean done = false;\r
412                     if(requireChangeInfoOnReplace()) {\r
413                             TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2);\r
414                             if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) {\r
415                                 Document doc = reader.document(exist.scoreDocs[0].doc);\r
416                                 if(!areSame(doc, document)) {\r
417                                     writer.deleteDocuments(removedTerm);\r
418                                     writer.addDocument(document);\r
419                                     didReplace |= true;\r
420                                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
421                                         System.out.println("-replaced single existing");\r
422                                 } else {\r
423                                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
424                                         System.out.println("-was actually same than single existing");\r
425                                 }\r
426                                 done = true;\r
427                             } \r
428                     }\r
429                     if(!done) {\r
430                         writer.deleteDocuments(removedTerm);\r
431                         writer.addDocument(document);\r
432                         didReplace |= true;\r
433                         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
434                                 System.out.println("-had many or none - removed all existing");\r
435                     }\r
436                     \r
437                 }\r
438 \r
439             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
440                 end = System.nanoTime();\r
441                 double ms = (end - start) * 1e-6;\r
442                 System.out.println(getDescriptor() + "Replaced " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");\r
443             }\r
444 \r
445         } finally {\r
446         }\r
447         \r
448         return didReplace;\r
449         \r
450     }\r
451     \r
452     protected boolean requireChangeInfoOnReplace() {\r
453         return true;\r
454     }\r
455     \r
456     private boolean areSame(Document d1, Document d2) {\r
457         List<IndexableField> fs1 = d1.getFields();\r
458         List<IndexableField> fs2 = d2.getFields();\r
459         if(fs1.size() != fs2.size()) return false;\r
460         for(int i=0;i<fs1.size();i++) {\r
461                 IndexableField f1 = fs1.get(i);\r
462                 IndexableField f2 = fs2.get(i);\r
463                 String s1 = f1.stringValue();\r
464                 String s2 = f2.stringValue();\r
465             if (IndexPolicy.TRACE_INDEX_UPDATE)\r
466                 System.err.println("areSame " + s1 + " vs. " + s2 );\r
467                 if(!ObjectUtils.objectEquals(s1,s2)) return false;\r
468         }\r
469         return true;\r
470     }\r
471 \r
472     final RequestProcessor session;\r
473 \r
474     final Resource         relation;\r
475 \r
476     /**\r
477      * The schema of the index, i.e. the fields that will be indexed per\r
478      * document for the specified relation. Since the relation stays the same\r
479      * throughout the lifetime of this class, the index schema is also assumed\r
480      * to the same. This means that {@link GenericRelation#getFields()} is\r
481      * assumed to stay the same.\r
482      */\r
483     final IndexSchema      schema;\r
484 \r
485     Resource         input;\r
486 \r
487     File             indexPath;\r
488 \r
489     Directory        directory;\r
490 \r
491     IndexReader      reader;\r
492 \r
493     IndexWriter      writer;\r
494 \r
495     IndexSearcher    searcher;\r
496 \r
497     IndexedRelationsSearcherBase(RequestProcessor session, Resource relation, Resource input) {\r
498         this.session = session;\r
499         this.relation = relation;\r
500         this.input = input;\r
501         this.indexPath = getIndexDirectory(session.getSession(), relation, input);\r
502         if(isIndexAvailable()) {\r
503                 state = State.READY;\r
504         } else {\r
505                 state = State.NONE;\r
506         }\r
507         this.schema = IndexSchema.readFromRelation(session, relation);\r
508     }\r
509 \r
510     Directory getDirectory(Session session) throws IOException {\r
511         return FSDirectory.open(indexPath);\r
512     }\r
513 \r
514     abstract String getDescriptor();\r
515     \r
516     /**\r
517      * Ensures that searcher is in read or write state.\r
518      * \r
519      * @param forWriting <code>true</code> to open index for writing,\r
520      *        <code>false</code> for reading\r
521      * @return true is required state was reached       \r
522      *        \r
523      */\r
524     boolean startAccess(IProgressMonitor monitor, Session session, boolean forWriting) {\r
525         if(forWriting) {\r
526                 changeState(monitor, session, State.WRITE);\r
527                 return checkState(State.WRITE);\r
528         } else {\r
529                 changeState(monitor, session, State.READ);\r
530                 return checkState(State.READ);\r
531         }\r
532     }\r
533 \r
534     boolean hasAccess(boolean forWriting) {\r
535         \r
536         if (forWriting)\r
537                 return checkState(State.WRITE); \r
538         else\r
539                 return checkState(State.WRITE) || checkState(State.READ);\r
540         \r
541     }\r
542     \r
543     void assertAccessOpen(boolean forWriting) {\r
544         if (forWriting)\r
545                 if(!checkState(State.WRITE)) \r
546                 throw new IllegalStateException("index not opened for writing (directory=" + directory + ", reader=" + reader + ")");\r
547         else\r
548                 if(!(checkState(State.WRITE) || checkState(State.READ))) \r
549                 throw new IllegalStateException("index not opened for reading (directory=" + directory + ", writer=" + writer + ")");\r
550     }\r
551     \r
552     void closeWriter(IndexWriter writer) throws CorruptIndexException, IOException {\r
553         if (writer == null)\r
554             return;\r
555 \r
556         try {\r
557             // May throw OOME, see IndexWriter javadoc for the correct actions.\r
558             writer.close(false);\r
559         } catch (OutOfMemoryError e) {\r
560             writer.close();\r
561             throw e;\r
562         }\r
563     }\r
564 \r
565     private static String getPattern(GenericRelation relation, int boundCount) {\r
566         String result = "";\r
567         for (int i = 0; i < boundCount; i++)\r
568             result += "b";\r
569         for (int i = 0; i < relation.getFields().length - boundCount; i++)\r
570             result += "f";\r
571         return result;\r
572     }\r
573     \r
574     private static final int INDEXING_THREAD_COUNT = 2;\r
575     \r
576     private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() {\r
577         @Override\r
578         public Thread newThread(Runnable r) {\r
579             Thread t = new Thread(r, "Lucene Index Creator");\r
580             if (t.isDaemon())\r
581                 t.setDaemon(true);\r
582             if (t.getPriority() != Thread.NORM_PRIORITY)\r
583                 t.setPriority(Thread.NORM_PRIORITY);\r
584             return t;\r
585         }\r
586     });\r
587 \r
588     void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)\r
589             throws IOException, DatabaseException\r
590     {\r
591         IndexingJob.jobifyIfPossible(\r
592                 monitor,\r
593                 "Reindexing " + NameUtils.getSafeLabel(graph, input),\r
594                 mon -> {\r
595                     try {\r
596                         initializeIndexImpl(mon, graph, bound, overwrite);\r
597                     } catch (IOException e) {\r
598                         throw new DatabaseException(e);\r
599                     }\r
600                 });\r
601     }\r
602 \r
603     void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,\r
604     DatabaseException {\r
605 \r
606         final SubMonitor mon = SubMonitor.convert(monitor, 100);\r
607 \r
608         if (IndexPolicy.TRACE_INDEX_INIT)\r
609             System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");\r
610         mon.beginTask("Initializing Index", 100);\r
611 \r
612         if (overwrite) {\r
613             mon.subTask("Erasing previous index");\r
614             FileUtils.deleteAll(indexPath);\r
615         }\r
616 \r
617         final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();\r
618         final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();\r
619 \r
620         try {\r
621             mon.subTask("Start index write");\r
622             createDirectory(indexPath);\r
623 \r
624             directory.set(FSDirectory.open(indexPath));\r
625             IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);\r
626             writer.set(new IndexWriter(directory.get(), conf));\r
627 \r
628             mon.worked(5);\r
629 \r
630             final GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
631             if (r == null)\r
632                 throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
633                 + "could not be adapted to GenericRelation.");\r
634 \r
635             long realizeStart = 0;\r
636             if (IndexPolicy.PERF_INDEX_INIT)\r
637                 realizeStart = System.nanoTime();\r
638 \r
639             mon.subTask("Calculating indexed content");\r
640             GenericRelation selection = r.select(getPattern(r, bound.length), bound);\r
641             mon.worked(5);\r
642             List<Object[]> results = selection.realize(graph);\r
643             mon.worked(40);\r
644 \r
645             if (IndexPolicy.PERF_INDEX_INIT)\r
646                 System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");\r
647             if (IndexPolicy.TRACE_INDEX_INIT)\r
648                 System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");\r
649 \r
650             long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;\r
651 \r
652             mon.subTask("Indexing content");\r
653             final Semaphore s = new Semaphore(0);\r
654             mon.setWorkRemaining(results.size());\r
655 \r
656             for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {\r
657                 final int startIndex = i;\r
658                 executor.submit(() -> {\r
659                     try {\r
660                         Document document = new Document();\r
661                         Field[] fs = makeFieldsForRelation(r, bound.length, document);\r
662 \r
663                         for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {\r
664                             if (setFields(fs, results.get(index)) == null)\r
665                                 continue;\r
666                             try {\r
667                                 writer.get().addDocument(document);\r
668                             } catch (CorruptIndexException e) {\r
669                                 throw new IllegalStateException(e);\r
670                             } catch (IOException e) {\r
671                                 throw new IllegalStateException(e);\r
672                             } finally {\r
673                                 synchronized (mon) {\r
674                                     mon.worked(1);\r
675                                 }\r
676                             }\r
677                         }\r
678 \r
679                         s.release();\r
680                     } catch (DatabaseException e) {\r
681                         throw new IllegalStateException(e);\r
682                     }\r
683                 });\r
684             }\r
685 \r
686             try {\r
687                 s.acquire(INDEXING_THREAD_COUNT);\r
688             } catch (InterruptedException e) {\r
689                 Logger.defaultLogError(e);\r
690             }\r
691 \r
692             // http://www.gossamer-threads.com/lists/lucene/java-dev/47895\r
693             // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0\r
694             // advise against calling optimize at all. So let's not do it anymore.\r
695             //writer.get().optimize();\r
696             //writer.get().commit();\r
697 \r
698             mon.subTask("Flushing");\r
699 \r
700             if (IndexPolicy.PERF_INDEX_INIT)\r
701                 System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");\r
702 \r
703         } catch (DatabaseException e) {\r
704             \r
705             Logger.defaultLogError(e);\r
706             \r
707         } finally {\r
708             try {\r
709                 closeWriter(writer.getAndSet(null));\r
710             } finally {\r
711                 directory.getAndSet(null).close();\r
712             }\r
713         }\r
714     }\r
715 \r
716     \r
717     public List<Object[]> debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException {\r
718     \r
719             Query query = new MatchAllDocsQuery(); \r
720         \r
721             TopDocs td = searcher.search(query, Integer.MAX_VALUE);\r
722     \r
723             ScoreDoc[ ] scoreDocs = td.scoreDocs; \r
724             List<Object[]> result = new ArrayList<Object[]>(scoreDocs.length);\r
725         \r
726             for(ScoreDoc scoreDoc:scoreDocs) {\r
727         \r
728                 try {\r
729         \r
730                     Document doc = reader.document(scoreDoc.doc);\r
731                     List<IndexableField> fs = doc.getFields();\r
732                     Object[] o = new Object[fs.size()];\r
733                     int index = 0; \r
734                     for (IndexableField f : fs) {\r
735                     o[index++] = f.stringValue();\r
736                     }\r
737                     result.add(o);\r
738         \r
739                 } catch (CorruptIndexException e) {\r
740                     throw new DatabaseException(e);\r
741                 } catch (IOException e) {\r
742                     throw new DatabaseException(e);\r
743                 }\r
744 \r
745             }\r
746             \r
747             return result;\r
748             \r
749     }\r
750 \r
751     \r
752     List<Map<String, Object>> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,\r
753     DatabaseException {\r
754 \r
755         // An empty search string will crash QueryParser\r
756         // Just return no results for empty queries.\r
757         //System.out.println("search: '" + search + "'");\r
758         if (search.isEmpty())\r
759             return Collections.emptyList();\r
760 \r
761         assertAccessOpen(false);\r
762 \r
763         Query query = Queries.parse(search, schema);\r
764 \r
765         long start = System.nanoTime();\r
766 \r
767         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());\r
768         if (maxResultCount == 0)\r
769             return Collections.emptyList();\r
770         \r
771         final TopDocs docs = searcher.search(query, null, maxResultCount);\r
772         \r
773 //        for(Object[] o : debugDocs(monitor)) {\r
774 //            System.err.println("-" + Arrays.toString(o));\r
775 //        }\r
776         \r
777         if (IndexPolicy.PERF_INDEX_QUERY) {\r
778             long end = System.nanoTime();\r
779             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");\r
780         }\r
781 \r
782         if (docs.totalHits == 0) {\r
783             return Collections.emptyList();\r
784         }\r
785 \r
786         return processor.syncRequest(new Read<List<Map<String, Object>>>() {\r
787 \r
788             @Override\r
789             public List<Map<String, Object>> perform(ReadGraph graph) throws DatabaseException {\r
790 \r
791                 GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
792                 if (r == null)\r
793                     throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
794                             + "could not be adapted to GenericRelation.");\r
795 \r
796                 SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
797 \r
798                 List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(docs.scoreDocs.length);\r
799                 \r
800                 final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();\r
801                 \r
802                 for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
803 \r
804                     try {\r
805 \r
806                         reader.document(scoreDoc.doc, visitor);\r
807                         \r
808                         Document doc = visitor.getDocument();\r
809 \r
810                         List<IndexableField> fs = doc.getFields();\r
811                         Map<String, Object> entry = new THashMap<String, Object>(fs.size());\r
812                         for (IndexableField f : fs) {\r
813                             IndexSchema.Type type = schema.typeMap.get(f.name());\r
814                             if (type == IndexSchema.Type.LONG) {\r
815                                 entry.put(f.name(), support.getResource(f.numericValue().longValue()));\r
816                             } else {\r
817                                 entry.put(f.name(), f.stringValue());\r
818                             }\r
819                         }\r
820                         \r
821                         result.add(entry);\r
822 \r
823                     } catch (CorruptIndexException e) {\r
824                         throw new DatabaseException(e);\r
825                     } catch (IOException e) {\r
826                         throw new DatabaseException(e);\r
827                     }\r
828 \r
829                 }\r
830 \r
831                 return result;\r
832 \r
833             }\r
834         });\r
835     }\r
836 \r
837     static class ResourceVisitor extends StoredFieldVisitor {\r
838         \r
839         public long id;\r
840 \r
841                 @Override\r
842                 public Status needsField(FieldInfo fieldInfo) throws IOException {\r
843                         if("Resource".equals(fieldInfo.name)) return Status.YES;\r
844                         return Status.NO;\r
845                 }\r
846                 \r
847                 @Override\r
848                 public void longField(FieldInfo fieldInfo, long value) throws IOException {\r
849                         id = value;\r
850                 }\r
851         \r
852     };\r
853     \r
854     static class DumpVisitor extends StoredFieldVisitor {\r
855 \r
856         public List<Object> values;\r
857         \r
858         DumpVisitor(List<Object> values) {\r
859                 this.values = values;\r
860         }\r
861 \r
862                 @Override\r
863                 public Status needsField(FieldInfo fieldInfo) throws IOException {\r
864                         return Status.YES;\r
865                 }\r
866                 \r
867                 @Override\r
868                 public void longField(FieldInfo fieldInfo, long value) throws IOException {\r
869                         values.add(value);\r
870                 }\r
871                 \r
872                 @Override\r
873                 public void stringField(FieldInfo fieldInfo, String value) throws IOException {\r
874                         values.add(value);\r
875                 }\r
876 \r
877     }\r
878 \r
879     List<Resource> doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,\r
880     DatabaseException {\r
881 \r
882         // An empty search string will crash QueryParser\r
883         // Just return no results for empty queries.\r
884         //System.out.println("search: '" + search + "'");\r
885         if (search.isEmpty())\r
886             return Collections.emptyList();\r
887 \r
888         assertAccessOpen(false);\r
889 \r
890         Query query = Queries.parse(search, schema);\r
891 \r
892         long start = System.nanoTime();\r
893 \r
894         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());\r
895         if (maxResultCount == 0)\r
896             return Collections.emptyList();\r
897         \r
898         final TopDocs docs = searcher.search(query, null, maxResultCount);\r
899         \r
900 //        for(Object[] o : debugDocs(monitor)) {\r
901 //            System.err.println("-" + Arrays.toString(o));\r
902 //        }\r
903         \r
904         if (IndexPolicy.PERF_INDEX_QUERY) {\r
905             long end = System.nanoTime();\r
906             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");\r
907         }\r
908 \r
909         if (docs.totalHits == 0) {\r
910             return Collections.emptyList();\r
911         }\r
912         \r
913         return processor.syncRequest(new Read<List<Resource>>() {\r
914 \r
915             @Override\r
916             public List<Resource> perform(ReadGraph graph) throws DatabaseException {\r
917 \r
918                 CollectionSupport cs = graph.getService(CollectionSupport.class);\r
919                 SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
920                 \r
921                 List<Resource> result = cs.createList();\r
922                 \r
923                 ResourceVisitor visitor = new ResourceVisitor();\r
924                 \r
925                 for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
926 \r
927                     try {\r
928                         \r
929                         reader.document(scoreDoc.doc, visitor);\r
930                         result.add(support.getResource(visitor.id));\r
931 \r
932                     } catch (CorruptIndexException e) {\r
933                         throw new DatabaseException(e);\r
934                     } catch (IOException e) {\r
935                         throw new DatabaseException(e);\r
936                     }\r
937 \r
938                 }\r
939 \r
940                 return result;\r
941 \r
942             }\r
943         });\r
944     }\r
945 \r
946     List<Object> doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException,\r
947     DatabaseException {\r
948 \r
949         assertAccessOpen(false);\r
950 \r
951         Query query = new MatchAllDocsQuery(); \r
952 \r
953         final TopDocs docs = searcher.search(query, Integer.MAX_VALUE);\r
954         \r
955         ArrayList<Object> result = new ArrayList<Object>();\r
956         \r
957         DumpVisitor visitor = new DumpVisitor(result);\r
958                 \r
959         for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
960 \r
961                 try {\r
962 \r
963                         reader.document(scoreDoc.doc, visitor);\r
964 \r
965                 } catch (CorruptIndexException e) {\r
966                         throw new DatabaseException(e);\r
967                 } catch (IOException e) {\r
968                         throw new DatabaseException(e);\r
969                 }\r
970 \r
971         }\r
972 \r
973         return result;\r
974 \r
975     }\r
976     \r
977     protected static File getIndexDirectory(Session session, Resource relation, Resource input) {\r
978         File path = DatabaseIndexing.getIndexLocation(session, relation, input);\r
979 //        System.out.println("getIndexDirectory = " + path);\r
980         return path;\r
981     }\r
982 \r
983     private static void createDirectory(File path) throws IOException {\r
984         if (path.exists() && !path.isDirectory())\r
985             throw new IOException("Could not create index directory " + path + ", a file by that name already exists");\r
986         path.mkdirs();\r
987         if (!path.exists())\r
988             throw new IOException("Could not create index directory " + path + " for an unknown reason");\r
989         if (!path.isDirectory())\r
990             throw new IOException("Could not create index directory " + path + ", a file by that name already exists");\r
991     }\r
992 \r
993     File getIndexPath() {\r
994         return indexPath;\r
995     }\r
996 \r
997     boolean isIndexAvailable() {\r
998         return (indexPath.exists() && indexPath.isDirectory());\r
999     }\r
1000     \r
1001     Throwable bestEffortClear(IProgressMonitor monitor, Session session) {\r
1002         return null;\r
1003     }\r
1004 \r
1005     /*\r
1006      * Start from scratch. Clear all caches and rebuild the index. \r
1007      */\r
1008     Throwable clearDirectory(IProgressMonitor monitor, Session session) {\r
1009         \r
1010                 File file = getIndexPath();\r
1011 \r
1012         try {\r
1013 \r
1014                 for(int i=0;i<15;i++) {\r
1015                         FileUtils.deleteDir(file);\r
1016                         if(!file.exists()) {\r
1017                                 return null;\r
1018                         }\r
1019                         try {\r
1020                                 Thread.sleep(i*100);\r
1021                         } catch (InterruptedException e) {\r
1022                         }\r
1023                 }\r
1024 \r
1025         } catch (Throwable t) {\r
1026 \r
1027                 return t;\r
1028                 \r
1029         }\r
1030 \r
1031         return new IllegalStateException("Failed to delete directory " + file.getAbsolutePath());\r
1032 \r
1033     }\r
1034 \r
1035     private Field[] setFields(Field[] fs, Object[] result) {\r
1036         for (int i = 0; i < result.length; i++) {\r
1037             Object value = result[i];\r
1038             if (value instanceof String) {\r
1039                 if (IndexPolicy.DEBUG_INDEX_INIT)\r
1040                     System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String");\r
1041                 fs[i].setStringValue((String) value);\r
1042             } else if (value instanceof Long) {\r
1043                 if (IndexPolicy.DEBUG_INDEX_INIT)\r
1044                     System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");\r
1045                 fs[i].setLongValue((Long) value);\r
1046             } else {\r
1047                 Logger.defaultLogError("Can only index Long and String fields, encountered " + value);\r
1048                 return null;\r
1049             }\r
1050         }\r
1051         return fs;\r
1052     }\r
1053 \r
1054 }\r