]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsSearcherBase.java
4afbb174e0eb565958f51a94c0fdfc836d2219bb
[simantics/platform.git] / bundles / org.simantics.db.indexing / src / org / simantics / db / indexing / IndexedRelationsSearcherBase.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2015 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *     Semantum Oy - Fix for simantics issue #6053\r
12  *******************************************************************************/\r
13 package org.simantics.db.indexing;\r
14 \r
15 import java.io.File;\r
16 import java.io.IOException;\r
17 import java.util.ArrayList;\r
18 import java.util.Collection;\r
19 import java.util.Collections;\r
20 import java.util.Iterator;\r
21 import java.util.List;\r
22 import java.util.Map;\r
23 import java.util.concurrent.ExecutorService;\r
24 import java.util.concurrent.Executors;\r
25 import java.util.concurrent.Semaphore;\r
26 import java.util.concurrent.ThreadFactory;\r
27 import java.util.concurrent.atomic.AtomicReference;\r
28 \r
29 import org.apache.lucene.document.Document;\r
30 import org.apache.lucene.document.DocumentStoredFieldVisitor;\r
31 import org.apache.lucene.document.Field;\r
32 import org.apache.lucene.document.FieldType;\r
33 import org.apache.lucene.document.LongField;\r
34 import org.apache.lucene.document.TextField;\r
35 import org.apache.lucene.index.CorruptIndexException;\r
36 import org.apache.lucene.index.DirectoryReader;\r
37 import org.apache.lucene.index.FieldInfo;\r
38 import org.apache.lucene.index.IndexNotFoundException;\r
39 import org.apache.lucene.index.IndexReader;\r
40 import org.apache.lucene.index.IndexWriter;\r
41 import org.apache.lucene.index.IndexWriterConfig;\r
42 import org.apache.lucene.index.IndexWriterConfig.OpenMode;\r
43 import org.apache.lucene.index.IndexableField;\r
44 import org.apache.lucene.index.StoredFieldVisitor;\r
45 import org.apache.lucene.index.Term;\r
46 import org.apache.lucene.queryparser.classic.ParseException;\r
47 import org.apache.lucene.search.IndexSearcher;\r
48 import org.apache.lucene.search.MatchAllDocsQuery;\r
49 import org.apache.lucene.search.Query;\r
50 import org.apache.lucene.search.ScoreDoc;\r
51 import org.apache.lucene.search.TermQuery;\r
52 import org.apache.lucene.search.TopDocs;\r
53 import org.apache.lucene.store.Directory;\r
54 import org.apache.lucene.store.FSDirectory;\r
55 import org.apache.lucene.util.Version;\r
56 import org.eclipse.core.runtime.IProgressMonitor;\r
57 import org.eclipse.core.runtime.SubMonitor;\r
58 import org.simantics.databoard.util.ObjectUtils;\r
59 import org.simantics.db.ReadGraph;\r
60 import org.simantics.db.RequestProcessor;\r
61 import org.simantics.db.Resource;\r
62 import org.simantics.db.Session;\r
63 import org.simantics.db.common.request.ReadRequest;\r
64 import org.simantics.db.common.request.SafeName;\r
65 import org.simantics.db.common.utils.Logger;\r
66 import org.simantics.db.exception.DatabaseException;\r
67 import org.simantics.db.layer0.adapter.GenericRelation;\r
68 import org.simantics.db.request.Read;\r
69 import org.simantics.db.service.CollectionSupport;\r
70 import org.simantics.db.service.SerialisationSupport;\r
71 import org.simantics.utils.DataContainer;\r
72 import org.simantics.utils.FileUtils;\r
73 import org.simantics.utils.datastructures.Pair;\r
74 \r
75 import gnu.trove.map.hash.THashMap;\r
76 \r
77 /**\r
78  * @author Tuukka Lehtonen\r
79  * @author Antti Villberg\r
80  */\r
81 abstract public class IndexedRelationsSearcherBase {\r
82 \r
83     protected enum State {\r
84         // No index is available\r
85         NONE, \r
86         // An index is available, but there is a problem with it\r
87         PROBLEM, \r
88         // An index is available but no reader or writer is ready\r
89         READY,\r
90         // A reader is ready\r
91         READ, \r
92         // A writer (and a reader) is ready\r
93         WRITE\r
94     }\r
95     \r
96     private State state = State.READY;\r
97     private Throwable exception;\r
98     \r
99     public Throwable getException() {\r
100         return exception;\r
101     }\r
102 \r
103     public void setProblem(Throwable t) {\r
104         this.state = State.PROBLEM;\r
105         this.exception = t;\r
106     }\r
107     \r
108     public void setNone() {\r
109         this.state = State.NONE;\r
110     }\r
111     \r
112     public void setReady() {\r
113         this.state = State.READY;\r
114     }\r
115     \r
116     protected boolean checkState(State state) {\r
117         return this.state == state;\r
118     }\r
119     \r
120     protected void assertState(State state) throws AssertionError {\r
121 \r
122         if(this.state != state) throw new AssertionError("Illegal state, expected " + state.name() + " but was in " + this.state.name());\r
123         \r
124     }\r
125     \r
126     public void changeState(IProgressMonitor monitor, Session session, State state) {\r
127         changeState(monitor, session, state, 0);\r
128     }\r
129 \r
130     protected void changeState(IProgressMonitor monitor, Session session, State state, int depth) {\r
131 \r
132         if(this.state == state) return;\r
133 \r
134         if (IndexPolicy.TRACE_INDEX_MANAGEMENT)\r
135                 System.err.println("Index state " + this.state.name() + " => " + state.name() + " " + this);\r
136 \r
137         // Check transitions\r
138         \r
139         // Try to exit problem state\r
140         if (State.PROBLEM == this.state && depth > 0) {\r
141                 Throwable t = bestEffortClear(monitor, session);\r
142                 if(t != null) {\r
143                                 exception = t;\r
144                                 return;\r
145                 }\r
146                 // Managed to get into initial state\r
147                 this.state = State.NONE;\r
148                 return;\r
149         }\r
150 \r
151         // Cannot move into read from no index\r
152         if (State.NONE ==  this.state && State.READ == state) return;\r
153         // Cannot move into write from no index\r
154         if (State.NONE ==  this.state && State.WRITE == state) return;\r
155         \r
156                 boolean success = false;\r
157 \r
158         try {\r
159 \r
160                 if (searcher != null) {\r
161                         searcher = null;\r
162                 }\r
163                 if (reader != null) {\r
164                         reader.close();\r
165                         reader = null;\r
166                 }\r
167                         closeWriter(writer);\r
168                         directory = null;\r
169                         \r
170                         success = true;\r
171 \r
172                 // Enter new state\r
173                 if (State.READ == state || State.WRITE == state) {\r
174                         \r
175                         success = false;\r
176                         \r
177                         boolean forWriting = State.WRITE == state;\r
178 \r
179                         if (directory != null)\r
180                                 throw new IllegalStateException(getDescriptor() + "Index already loaded");\r
181 \r
182                         SubMonitor mon = SubMonitor.convert(monitor, 100);\r
183 \r
184                         mon.beginTask("Loading index", 100);\r
185 \r
186                         if (IndexPolicy.TRACE_INDEX_LOAD)\r
187                     System.out.println(getDescriptor() + "Loading Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading"));\r
188 \r
189                 long start = System.nanoTime();\r
190 \r
191                 directory = getDirectory(session);\r
192 \r
193                 if (forWriting) {\r
194                     // Never overwrite an index that is about to be loaded.\r
195                     // TODO: could use OpenMode.CREATE_OR_APPEND but must test first\r
196                     IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.APPEND);\r
197                     try {\r
198                         // FIXME: platform #4676\r
199                         writer = new IndexWriter(directory, config);\r
200                     } catch (IndexNotFoundException e) {\r
201                         // There was no pre-existing index on disk. Create it now.\r
202                         writer = new IndexWriter(directory, config.setOpenMode(OpenMode.CREATE));\r
203                         writer.commit();\r
204                     }\r
205                     reader = DirectoryReader.open(directory);\r
206                     searcher = new IndexSearcher(reader);\r
207                 } else {\r
208                     reader = DirectoryReader.open(directory);\r
209                     searcher = new IndexSearcher(reader);\r
210                 }\r
211 \r
212                 long end = System.nanoTime();\r
213 \r
214                 mon.worked(100);\r
215 \r
216                 if (IndexPolicy.PERF_INDEX_LOAD) {\r
217                     double time = (end - start) * 1e-6;\r
218                     System.out.println(getDescriptor() + "Loaded Lucene index from " + indexPath + " for " + (forWriting ? "writing" : "reading") + " in " + time + " ms");\r
219                 }\r
220 \r
221                 success = true;\r
222                 \r
223                 }\r
224                 \r
225         } catch (Throwable t) {\r
226 \r
227                 setProblem(t);\r
228 \r
229         } finally {\r
230 \r
231                 if(!success) {\r
232                         this.state = State.PROBLEM;\r
233                         changeState(monitor, session, State.NONE, depth+1);\r
234                         return;\r
235                 }\r
236 \r
237         }\r
238 \r
239         this.state = state;\r
240         \r
241     }\r
242 \r
243     public static final FieldType STRING_TYPE = new FieldType();\r
244 \r
245     static {\r
246       STRING_TYPE.setIndexed(true);\r
247       STRING_TYPE.setStored(true);\r
248       STRING_TYPE.setTokenized(true);\r
249       STRING_TYPE.freeze();\r
250     }\r
251 \r
252     protected static Field makeField(String fieldName, String fieldClass) throws DatabaseException {\r
253         switch (fieldClass) {\r
254         case "Long":   return new LongField(fieldName, 0L, Field.Store.YES);\r
255         case "String": return new Field    (fieldName, "", STRING_TYPE);\r
256         case "Text":   return new TextField(fieldName, "", Field.Store.YES);\r
257         default:\r
258             throw new DatabaseException("Can only index Long, String and Text fields, encountered field type " + fieldClass);\r
259         }\r
260     }\r
261 \r
262     protected static Field[] makeFieldsForRelation(GenericRelation r, int boundLength, Document document) throws DatabaseException {\r
263         Pair<String, String>[] fields = r.getFields();\r
264         Field[] fs = new Field[Math.max(0, fields.length - boundLength)];\r
265         for (int i = boundLength; i < fields.length; i++) {\r
266             Field f = makeField(fields[i].first, fields[i].second);\r
267             fs[i - boundLength] = f;\r
268             if (document != null)\r
269                 document.add(f);\r
270         }\r
271         return fs;\r
272     }\r
273 \r
274     void insertIndex(IProgressMonitor monitor, GenericRelation r, int boundLength, Collection<Object[]> documentsData)\r
275     throws CorruptIndexException, IOException, DatabaseException {\r
276         assertAccessOpen(true);\r
277 \r
278         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
279             System.out.println(getDescriptor() + "Inserting " + documentsData.size() + " documents into index at " + indexPath);\r
280 \r
281         long start = 0, end = 0;\r
282         if (IndexPolicy.PERF_INDEX_UPDATE)\r
283             start = System.nanoTime();\r
284 \r
285         try {\r
286             Document document = new Document();\r
287             Field[] fs = makeFieldsForRelation(r, boundLength, document);\r
288 \r
289             for (Object[] documentData : documentsData) {\r
290                 for (int i = 0; i < documentData.length; i++) {\r
291                     Object value = documentData[i];\r
292                     if (value instanceof String) {\r
293                         fs[i].setStringValue((String) value);\r
294                     } else if (value instanceof Long) {\r
295                         fs[i].setLongValue((Long) value);\r
296                     } else {\r
297                         System.err.println("Can only index Long and String fields, encountered " + value);\r
298                         // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
299                         continue;\r
300                     }\r
301 //                    System.out.println("index " + fs[i].name() + " = " + result[i]);\r
302                 }\r
303 \r
304                 if (IndexPolicy.TRACE_INDEX_UPDATE)\r
305                     System.out.println(getDescriptor() + "Inserting document " + document);\r
306 \r
307                 writer.addDocument(document);\r
308             }\r
309 \r
310             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
311                 end = System.nanoTime();\r
312                 double ms = (end - start) * 1e-6;\r
313                 System.out.println(getDescriptor() + "Inserted " + documentsData.size() + " documents into index at " + indexPath + " in " + ms + " ms");\r
314             }\r
315 \r
316         } finally {\r
317         }\r
318     }\r
319 \r
320     void removeIndex(IProgressMonitor monitor, GenericRelation r, RequestProcessor processor, String key, Collection<Object> keyValues) throws DatabaseException, CorruptIndexException, IOException {\r
321         assertAccessOpen(true);\r
322 \r
323         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
324             System.out.println(getDescriptor() + "Removing " + keyValues.size() + " documents from index at " + indexPath);\r
325 \r
326         long start = 0, end = 0;\r
327         if (IndexPolicy.PERF_INDEX_UPDATE)\r
328             start = System.nanoTime();\r
329 \r
330         try {\r
331             for (Object keyValue : keyValues) {\r
332                 Term removedTerm = null;\r
333                 if (keyValue instanceof Long) {\r
334                     removedTerm = IndexUtils.longTerm(key, (Long) keyValue);\r
335                 } else if (keyValue instanceof String) {\r
336                     removedTerm = new Term(key, (String) keyValue);\r
337                 } else {\r
338                     // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
339                     continue;\r
340                 }\r
341 \r
342                 if (IndexPolicy.TRACE_INDEX_UPDATE)\r
343                     System.out.println(getDescriptor() + "Removing document with key " + removedTerm);\r
344                 writer.deleteDocuments(removedTerm);\r
345             }\r
346 \r
347             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
348                 end = System.nanoTime();\r
349                 double ms = (end - start) * 1e-6;\r
350                 System.out.println(getDescriptor() + "Removed " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");\r
351             }\r
352 \r
353         } finally {\r
354         }\r
355     }\r
356 \r
357     void removeIndex(IProgressMonitor monitor) throws DatabaseException, CorruptIndexException, IOException {\r
358         assertAccessOpen(true);\r
359 \r
360         long start = 0, end = 0;\r
361         if (IndexPolicy.PERF_INDEX_UPDATE)\r
362             start = System.nanoTime();\r
363 \r
364         try {\r
365 \r
366             writer.deleteAll();\r
367 \r
368             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
369                 end = System.nanoTime();\r
370                 double ms = (end - start) * 1e-6;\r
371                 System.out.println(getDescriptor() + "Removed all documents from index at " + indexPath + " in " + ms + " ms");\r
372             }\r
373 \r
374         } finally {\r
375         }\r
376     }\r
377     \r
378     boolean replaceIndex(IProgressMonitor monitor, String key, Collection<Object> keyValues, GenericRelation r, int boundLength, Collection<Object[]> documentsData) throws CorruptIndexException, IOException, DatabaseException {\r
379 \r
380         boolean didReplace = false;\r
381         \r
382         assertAccessOpen(true);\r
383         if (keyValues.size() != documentsData.size())\r
384             throw new IllegalArgumentException("keyValues size does not match documents data size, " + keyValues.size() + " <> " + documentsData.size());\r
385 \r
386         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
387             System.out.println(getDescriptor() + "Replacing " + keyValues.size() + " documents from index at " + indexPath);\r
388 \r
389         long start = 0, end = 0;\r
390         if (IndexPolicy.PERF_INDEX_UPDATE)\r
391             start = System.nanoTime();\r
392 \r
393         try {\r
394             Iterator<Object> keyIt = keyValues.iterator();\r
395             Iterator<Object[]> documentDataIt = documentsData.iterator();\r
396 \r
397             Document document = new Document();\r
398             Field[] fs = makeFieldsForRelation(r, boundLength, document);\r
399 \r
400             nextDocument:\r
401                 while (keyIt.hasNext()) {\r
402                     Object keyValue = keyIt.next();\r
403                     Object[] documentData = documentDataIt.next();\r
404 \r
405                     Term removedTerm = null;\r
406                     if (keyValue instanceof Long) {\r
407                         removedTerm = IndexUtils.longTerm(key, (Long) keyValue);\r
408                     } else if (keyValue instanceof String) {\r
409                         removedTerm = new Term(key, (String) keyValue);\r
410                     } else {\r
411                         // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
412                         System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document key to remove '" + keyValue + "', only " + String.class + " and " + Resource.class + " are supported.");\r
413                         continue nextDocument;\r
414                     }\r
415 \r
416                     for (int i = 0; i < documentData.length; i++) {\r
417                         Object value = documentData[i];\r
418                         if (value instanceof String) {\r
419                             fs[i].setStringValue((String) value);\r
420                         } else if (keyValue instanceof Long) {\r
421                             fs[i].setLongValue((Long)value);\r
422                         } else {\r
423                             // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
424                             System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document value '" + value + "' for field '" + fs[i].toString() + "', only " + String.class + " and " + Resource.class + " are supported.");\r
425                             continue nextDocument;\r
426                         }\r
427                     }\r
428 \r
429                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
430                         System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document);\r
431 \r
432                     boolean done = false;\r
433                     if(requireChangeInfoOnReplace()) {\r
434                             TopDocs exist = searcher.search(new TermQuery(removedTerm), null, 2);\r
435                             if(exist.scoreDocs.length == 1 && requireChangeInfoOnReplace()) {\r
436                                 Document doc = reader.document(exist.scoreDocs[0].doc);\r
437                                 if(!areSame(doc, document)) {\r
438                                     writer.deleteDocuments(removedTerm);\r
439                                     writer.addDocument(document);\r
440                                     didReplace |= true;\r
441                                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
442                                         System.out.println("-replaced single existing");\r
443                                 } else {\r
444                                     if (IndexPolicy.TRACE_INDEX_UPDATE)\r
445                                         System.out.println("-was actually same than single existing");\r
446                                 }\r
447                                 done = true;\r
448                             } \r
449                     }\r
450                     if(!done) {\r
451                         writer.deleteDocuments(removedTerm);\r
452                         writer.addDocument(document);\r
453                         didReplace |= true;\r
454                         if (IndexPolicy.TRACE_INDEX_UPDATE)\r
455                                 System.out.println("-had many or none - removed all existing");\r
456                     }\r
457                     \r
458                 }\r
459 \r
460             if (IndexPolicy.PERF_INDEX_UPDATE) {\r
461                 end = System.nanoTime();\r
462                 double ms = (end - start) * 1e-6;\r
463                 System.out.println(getDescriptor() + "Replaced " + keyValues.size() + " documents from index at " + indexPath + " in " + ms + " ms");\r
464             }\r
465 \r
466         } finally {\r
467         }\r
468         \r
469         return didReplace;\r
470         \r
471     }\r
472     \r
473     protected boolean requireChangeInfoOnReplace() {\r
474         return true;\r
475     }\r
476     \r
477     private boolean areSame(Document d1, Document d2) {\r
478         List<IndexableField> fs1 = d1.getFields();\r
479         List<IndexableField> fs2 = d2.getFields();\r
480         if(fs1.size() != fs2.size()) return false;\r
481         for(int i=0;i<fs1.size();i++) {\r
482                 IndexableField f1 = fs1.get(i);\r
483                 IndexableField f2 = fs2.get(i);\r
484                 String s1 = f1.stringValue();\r
485                 String s2 = f2.stringValue();\r
486             if (IndexPolicy.TRACE_INDEX_UPDATE)\r
487                 System.err.println("areSame " + s1 + " vs. " + s2 );\r
488                 if(!ObjectUtils.objectEquals(s1,s2)) return false;\r
489         }\r
490         return true;\r
491     }\r
492 \r
493     final RequestProcessor session;\r
494 \r
495     final Resource         relation;\r
496 \r
497     /**\r
498      * The schema of the index, i.e. the fields that will be indexed per\r
499      * document for the specified relation. Since the relation stays the same\r
500      * throughout the lifetime of this class, the index schema is also assumed\r
501      * to the same. This means that {@link GenericRelation#getFields()} is\r
502      * assumed to stay the same.\r
503      */\r
504     final IndexSchema      schema;\r
505 \r
506     Resource         input;\r
507 \r
508     File             indexPath;\r
509 \r
510     Directory        directory;\r
511 \r
512     IndexReader      reader;\r
513 \r
514     IndexWriter      writer;\r
515 \r
516     IndexSearcher    searcher;\r
517 \r
518     IndexedRelationsSearcherBase(RequestProcessor session, Resource relation, Resource input) {\r
519         this.session = session;\r
520         this.relation = relation;\r
521         this.input = input;\r
522         this.indexPath = getIndexDirectory(session.getSession(), relation, input);\r
523         if(isIndexAvailable()) {\r
524                 state = State.READY;\r
525         } else {\r
526                 state = State.NONE;\r
527         }\r
528         this.schema = IndexSchema.readFromRelation(session, relation);\r
529     }\r
530 \r
531     Directory getDirectory(Session session) throws IOException {\r
532         return FSDirectory.open(indexPath);\r
533     }\r
534 \r
535     abstract String getDescriptor();\r
536     \r
537     /**\r
538      * Ensures that searcher is in read or write state.\r
539      * \r
540      * @param forWriting <code>true</code> to open index for writing,\r
541      *        <code>false</code> for reading\r
542      * @return true is required state was reached       \r
543      *        \r
544      */\r
545     boolean startAccess(IProgressMonitor monitor, Session session, boolean forWriting) {\r
546         if(forWriting) {\r
547                 changeState(monitor, session, State.WRITE);\r
548                 return checkState(State.WRITE);\r
549         } else {\r
550                 changeState(monitor, session, State.READ);\r
551                 return checkState(State.READ);\r
552         }\r
553     }\r
554 \r
555     boolean hasAccess(boolean forWriting) {\r
556         \r
557         if (forWriting)\r
558                 return checkState(State.WRITE); \r
559         else\r
560                 return checkState(State.WRITE) || checkState(State.READ);\r
561         \r
562     }\r
563     \r
564     void assertAccessOpen(boolean forWriting) {\r
565         if (forWriting)\r
566                 if(!checkState(State.WRITE)) \r
567                 throw new IllegalStateException("index not opened for writing (directory=" + directory + ", reader=" + reader + ")");\r
568         else\r
569                 if(!(checkState(State.WRITE) || checkState(State.READ))) \r
570                 throw new IllegalStateException("index not opened for reading (directory=" + directory + ", writer=" + writer + ")");\r
571     }\r
572     \r
573     void closeWriter(IndexWriter writer) throws CorruptIndexException, IOException {\r
574         if (writer == null)\r
575             return;\r
576 \r
577         try {\r
578             // May throw OOME, see IndexWriter javadoc for the correct actions.\r
579             writer.close(false);\r
580         } catch (OutOfMemoryError e) {\r
581             writer.close();\r
582             throw e;\r
583         }\r
584     }\r
585 \r
586     private static String getPattern(GenericRelation relation, int boundCount) {\r
587         String result = "";\r
588         for (int i = 0; i < boundCount; i++)\r
589             result += "b";\r
590         for (int i = 0; i < relation.getFields().length - boundCount; i++)\r
591             result += "f";\r
592         return result;\r
593     }\r
594     \r
595     private static final int INDEXING_THREAD_COUNT = 2;\r
596     \r
597     private static final ExecutorService executor = Executors.newFixedThreadPool(INDEXING_THREAD_COUNT, new ThreadFactory() {\r
598         @Override\r
599         public Thread newThread(Runnable r) {\r
600             Thread t = new Thread(r, "Lucene Index Creator");\r
601             if (t.isDaemon())\r
602                 t.setDaemon(true);\r
603             if (t.getPriority() != Thread.NORM_PRIORITY)\r
604                 t.setPriority(Thread.NORM_PRIORITY);\r
605             return t;\r
606         }\r
607     });\r
608 \r
609     void initializeIndex(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,\r
610     DatabaseException {\r
611 \r
612         final SubMonitor mon = SubMonitor.convert(monitor, 100);\r
613 \r
614         if (IndexPolicy.TRACE_INDEX_INIT)\r
615             System.out.println(getDescriptor() + "Initializing index at " + indexPath + " (overwrite = " + overwrite + ")");\r
616         mon.beginTask("Initializing Index", 100);\r
617 \r
618         if (overwrite) {\r
619             mon.subTask("Erasing previous index");\r
620             FileUtils.deleteAll(indexPath);\r
621         }\r
622 \r
623         final AtomicReference<FSDirectory> directory = new AtomicReference<FSDirectory>();\r
624         final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();\r
625 \r
626         try {\r
627             mon.subTask("Create index at " + indexPath.toString());\r
628             createDirectory(indexPath);\r
629 \r
630             directory.set(FSDirectory.open(indexPath));\r
631             IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_4_9, Queries.getAnalyzer()).setOpenMode(OpenMode.CREATE);\r
632             writer.set(new IndexWriter(directory.get(), conf));\r
633 \r
634             mon.worked(5);\r
635 \r
636             final DataContainer<Long> start = new DataContainer<Long>();\r
637 \r
638             graph.syncRequest(new ReadRequest() {\r
639 \r
640                 @Override\r
641                 public void run(ReadGraph graph) throws DatabaseException {\r
642 \r
643                     final GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
644                     if (r == null)\r
645                         throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
646                                 + "could not be adapted to GenericRelation.");\r
647 \r
648                     mon.worked(45);\r
649 \r
650                     GenericRelation selection = r.select(getPattern(r, bound.length), bound);\r
651                     \r
652                     long perfStart = 0;\r
653                     if (IndexPolicy.PERF_INDEX_INIT)\r
654                         perfStart = System.nanoTime();\r
655 \r
656                     final List<Object[]> results = selection.realize(graph);\r
657 \r
658                     if (IndexPolicy.PERF_INDEX_INIT)\r
659                         System.out.println(getDescriptor() + "Realized index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-perfStart)) + " seconds.");\r
660                     \r
661                     if (IndexPolicy.TRACE_INDEX_INIT)\r
662                         System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");\r
663 \r
664                     if (IndexPolicy.PERF_INDEX_INIT)\r
665                         start.set(System.nanoTime());\r
666 \r
667                     final Semaphore s = new Semaphore(0);\r
668 \r
669                     for(int i=0;i<INDEXING_THREAD_COUNT;i++) {\r
670                         \r
671                         final int startIndex = i;\r
672                         \r
673                         executor.submit(new Runnable() {\r
674 \r
675                                                         @Override\r
676                                                         public void run() {\r
677                                                                 \r
678                                                                 try {\r
679                                                                         \r
680                                                     final Document document = new Document();\r
681 \r
682                                                                         Field[] fs = makeFieldsForRelation(r, bound.length, document);\r
683 \r
684                                                                         for (int index = startIndex; index < results.size(); index+=INDEXING_THREAD_COUNT) {\r
685                                                                                 Object[] result = results.get(index);\r
686                                                                                 for (int i = 0; i < result.length; i++) {\r
687                                                                                         Object value = result[i];\r
688                                                                                         if (value instanceof String) {\r
689                                                                                                 if (IndexPolicy.DEBUG_INDEX_INIT)\r
690                                                                                                         System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String");\r
691                                                                                                 fs[i].setStringValue((String) value);\r
692                                                                                         } else if (value instanceof Long) {\r
693                                                                                                 if (IndexPolicy.DEBUG_INDEX_INIT)\r
694                                                                                                         System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");\r
695                                                                                                 fs[i].setLongValue((Long) value);\r
696                                                                                         }\r
697                                                                                 }\r
698                                                                                 try {\r
699                                                                                         writer.get().addDocument(document);\r
700                                                                                 } catch (CorruptIndexException e) {\r
701                                                                                         throw new IllegalStateException(e);\r
702                                                                                 } catch (IOException e) {\r
703                                                                                         throw new IllegalStateException(e);\r
704                                                                                 } finally {\r
705                                                                                         mon.worked(1);\r
706                                                                                 }\r
707                                                                         }\r
708                                                                 } catch (DatabaseException e) {\r
709                                                                     Logger.defaultLogError("DatabaseException occured during initializing index", e);\r
710                                                                 } catch (Throwable t) {\r
711                                                                     Logger.defaultLogError("Fatal error occured during initializing index", t);\r
712                                                                 } finally {\r
713                                                                     s.release();\r
714                                                                 }\r
715                                                         }\r
716                         });\r
717                     }\r
718                     try {\r
719                                                 s.acquire(INDEXING_THREAD_COUNT);\r
720                                         } catch (InterruptedException e) {\r
721                                                 e.printStackTrace();\r
722                                         }\r
723                 }\r
724             });\r
725 \r
726             // http://www.gossamer-threads.com/lists/lucene/java-dev/47895\r
727             // and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0\r
728             // advise against calling optimize at all. So let's not do it anymore.\r
729             //writer.get().optimize();\r
730             //writer.get().commit();\r
731 \r
732             if (IndexPolicy.PERF_INDEX_INIT) {\r
733                 long end = System.nanoTime();\r
734                 System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (end-start.get())) + " seconds.");\r
735             }\r
736 \r
737         } catch (DatabaseException e) {\r
738             \r
739             Logger.defaultLogError(e);\r
740             \r
741         } finally {\r
742             try {\r
743                 closeWriter(writer.getAndSet(null));\r
744             } finally {\r
745                 directory.getAndSet(null).close();\r
746             }\r
747         }\r
748     }\r
749 \r
750     \r
751     public List<Object[]> debugDocs(IProgressMonitor monitor) throws ParseException, IOException, DatabaseException {\r
752     \r
753             Query query = new MatchAllDocsQuery(); \r
754         \r
755             TopDocs td = searcher.search(query, Integer.MAX_VALUE);\r
756     \r
757             ScoreDoc[ ] scoreDocs = td.scoreDocs; \r
758             List<Object[]> result = new ArrayList<Object[]>(scoreDocs.length);\r
759         \r
760             for(ScoreDoc scoreDoc:scoreDocs) {\r
761         \r
762                 try {\r
763         \r
764                     Document doc = reader.document(scoreDoc.doc);\r
765                     List<IndexableField> fs = doc.getFields();\r
766                     Object[] o = new Object[fs.size()];\r
767                     int index = 0; \r
768                     for (IndexableField f : fs) {\r
769                     o[index++] = f.stringValue();\r
770                     }\r
771                     result.add(o);\r
772         \r
773                 } catch (CorruptIndexException e) {\r
774                     throw new DatabaseException(e);\r
775                 } catch (IOException e) {\r
776                     throw new DatabaseException(e);\r
777                 }\r
778 \r
779             }\r
780             \r
781             return result;\r
782             \r
783     }\r
784 \r
785     \r
786     List<Map<String, Object>> doSearch(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,\r
787     DatabaseException {\r
788 \r
789         // An empty search string will crash QueryParser\r
790         // Just return no results for empty queries.\r
791         //System.out.println("search: '" + search + "'");\r
792         if (search.isEmpty())\r
793             return Collections.emptyList();\r
794 \r
795         assertAccessOpen(false);\r
796 \r
797         Query query = Queries.parse(search, schema);\r
798 \r
799         long start = System.nanoTime();\r
800 \r
801         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());\r
802         if (maxResultCount == 0)\r
803             return Collections.emptyList();\r
804         \r
805         final TopDocs docs = searcher.search(query, null, maxResultCount);\r
806         \r
807 //        for(Object[] o : debugDocs(monitor)) {\r
808 //            System.err.println("-" + Arrays.toString(o));\r
809 //        }\r
810         \r
811         if (IndexPolicy.PERF_INDEX_QUERY) {\r
812             long end = System.nanoTime();\r
813             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");\r
814         }\r
815 \r
816         if (docs.totalHits == 0) {\r
817             return Collections.emptyList();\r
818         }\r
819 \r
820         return processor.syncRequest(new Read<List<Map<String, Object>>>() {\r
821 \r
822             @Override\r
823             public List<Map<String, Object>> perform(ReadGraph graph) throws DatabaseException {\r
824 \r
825                 GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
826                 if (r == null)\r
827                     throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
828                             + "could not be adapted to GenericRelation.");\r
829 \r
830                 SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
831 \r
832                 List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(docs.scoreDocs.length);\r
833                 \r
834                 final DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();\r
835                 \r
836                 for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
837 \r
838                     try {\r
839 \r
840                         reader.document(scoreDoc.doc, visitor);\r
841                         \r
842                         Document doc = visitor.getDocument();\r
843 \r
844                         List<IndexableField> fs = doc.getFields();\r
845                         Map<String, Object> entry = new THashMap<String, Object>(fs.size());\r
846                         for (IndexableField f : fs) {\r
847                             IndexSchema.Type type = schema.typeMap.get(f.name());\r
848                             if (type == IndexSchema.Type.LONG) {\r
849                                 entry.put(f.name(), support.getResource(f.numericValue().longValue()));\r
850                             } else {\r
851                                 entry.put(f.name(), f.stringValue());\r
852                             }\r
853                         }\r
854                         \r
855                         result.add(entry);\r
856 \r
857                     } catch (CorruptIndexException e) {\r
858                         throw new DatabaseException(e);\r
859                     } catch (IOException e) {\r
860                         throw new DatabaseException(e);\r
861                     }\r
862 \r
863                 }\r
864 \r
865                 return result;\r
866 \r
867             }\r
868         });\r
869     }\r
870 \r
871     static class ResourceVisitor extends StoredFieldVisitor {\r
872         \r
873         public long id;\r
874 \r
875                 @Override\r
876                 public Status needsField(FieldInfo fieldInfo) throws IOException {\r
877                         if("Resource".equals(fieldInfo.name)) return Status.YES;\r
878                         return Status.NO;\r
879                 }\r
880                 \r
881                 @Override\r
882                 public void longField(FieldInfo fieldInfo, long value) throws IOException {\r
883                         id = value;\r
884                 }\r
885         \r
886     };\r
887     \r
888     static class DumpVisitor extends StoredFieldVisitor {\r
889 \r
890         public List<Object> values;\r
891         \r
892         DumpVisitor(List<Object> values) {\r
893                 this.values = values;\r
894         }\r
895 \r
896                 @Override\r
897                 public Status needsField(FieldInfo fieldInfo) throws IOException {\r
898                         return Status.YES;\r
899                 }\r
900                 \r
901                 @Override\r
902                 public void longField(FieldInfo fieldInfo, long value) throws IOException {\r
903                         values.add(value);\r
904                 }\r
905                 \r
906                 @Override\r
907                 public void stringField(FieldInfo fieldInfo, String value) throws IOException {\r
908                         values.add(value);\r
909                 }\r
910 \r
911     }\r
912 \r
913     List<Resource> doSearchResources(IProgressMonitor monitor, RequestProcessor processor, String search, int maxResultCount) throws ParseException, IOException,\r
914     DatabaseException {\r
915 \r
916         // An empty search string will crash QueryParser\r
917         // Just return no results for empty queries.\r
918         //System.out.println("search: '" + search + "'");\r
919         if (search.isEmpty())\r
920             return Collections.emptyList();\r
921 \r
922         assertAccessOpen(false);\r
923 \r
924         Query query = Queries.parse(search, schema);\r
925 \r
926         long start = System.nanoTime();\r
927 \r
928         maxResultCount = Math.min(maxResultCount, searcher.getIndexReader().numDocs());\r
929         if (maxResultCount == 0)\r
930             return Collections.emptyList();\r
931         \r
932         final TopDocs docs = searcher.search(query, null, maxResultCount);\r
933         \r
934 //        for(Object[] o : debugDocs(monitor)) {\r
935 //            System.err.println("-" + Arrays.toString(o));\r
936 //        }\r
937         \r
938         if (IndexPolicy.PERF_INDEX_QUERY) {\r
939             long end = System.nanoTime();\r
940             System.out.println(getDescriptor() + "search(" + search + ", " + maxResultCount + ") into index at " + indexPath + " took " + (1e-9 * (end-start)) + " seconds.");\r
941         }\r
942 \r
943         if (docs.totalHits == 0) {\r
944             return Collections.emptyList();\r
945         }\r
946         \r
947         return processor.syncRequest(new Read<List<Resource>>() {\r
948 \r
949             @Override\r
950             public List<Resource> perform(ReadGraph graph) throws DatabaseException {\r
951 \r
952                 CollectionSupport cs = graph.getService(CollectionSupport.class);\r
953                 SerialisationSupport support = graph.getService(SerialisationSupport.class);\r
954                 \r
955                 List<Resource> result = cs.createList();\r
956                 \r
957                 ResourceVisitor visitor = new ResourceVisitor();\r
958                 \r
959                 for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
960 \r
961                     try {\r
962                         \r
963                         reader.document(scoreDoc.doc, visitor);\r
964                         result.add(support.getResource(visitor.id));\r
965 \r
966                     } catch (CorruptIndexException e) {\r
967                         throw new DatabaseException(e);\r
968                     } catch (IOException e) {\r
969                         throw new DatabaseException(e);\r
970                     }\r
971 \r
972                 }\r
973 \r
974                 return result;\r
975 \r
976             }\r
977         });\r
978     }\r
979 \r
980     List<Object> doList(IProgressMonitor monitor, RequestProcessor processor) throws ParseException, IOException,\r
981     DatabaseException {\r
982 \r
983         assertAccessOpen(false);\r
984 \r
985         Query query = new MatchAllDocsQuery(); \r
986 \r
987         final TopDocs docs = searcher.search(query, Integer.MAX_VALUE);\r
988         \r
989         ArrayList<Object> result = new ArrayList<Object>();\r
990         \r
991         DumpVisitor visitor = new DumpVisitor(result);\r
992                 \r
993         for (ScoreDoc scoreDoc : docs.scoreDocs) {\r
994 \r
995                 try {\r
996 \r
997                         reader.document(scoreDoc.doc, visitor);\r
998 \r
999                 } catch (CorruptIndexException e) {\r
1000                         throw new DatabaseException(e);\r
1001                 } catch (IOException e) {\r
1002                         throw new DatabaseException(e);\r
1003                 }\r
1004 \r
1005         }\r
1006 \r
1007         return result;\r
1008 \r
1009     }\r
1010     \r
1011     protected static File getIndexDirectory(Session session, Resource relation, Resource input) {\r
1012         File path = DatabaseIndexing.getIndexLocation(session, relation, input);\r
1013 //        System.out.println("getIndexDirectory = " + path);\r
1014         return path;\r
1015     }\r
1016 \r
1017     private static void createDirectory(File path) throws IOException {\r
1018         if (path.exists() && !path.isDirectory())\r
1019             throw new IOException("Could not create index directory " + path + ", a file by that name already exists");\r
1020         path.mkdirs();\r
1021         if (!path.exists())\r
1022             throw new IOException("Could not create index directory " + path + " for an unknown reason");\r
1023         if (!path.isDirectory())\r
1024             throw new IOException("Could not create index directory " + path + ", a file by that name already exists");\r
1025     }\r
1026 \r
1027     File getIndexPath() {\r
1028         return indexPath;\r
1029     }\r
1030 \r
1031     boolean isIndexAvailable() {\r
1032         return (indexPath.exists() && indexPath.isDirectory());\r
1033     }\r
1034     \r
1035     Throwable bestEffortClear(IProgressMonitor monitor, Session session) {\r
1036         return null;\r
1037     }\r
1038 \r
1039     /*\r
1040      * Start from scratch. Clear all caches and rebuild the index. \r
1041      */\r
1042     Throwable clearDirectory(IProgressMonitor monitor, Session session) {\r
1043         \r
1044                 File file = getIndexPath();\r
1045 \r
1046         try {\r
1047 \r
1048                 for(int i=0;i<15;i++) {\r
1049                         FileUtils.deleteDir(file);\r
1050                         if(!file.exists()) {\r
1051                                 return null;\r
1052                         }\r
1053                         try {\r
1054                                 Thread.sleep(i*100);\r
1055                         } catch (InterruptedException e) {\r
1056                         }\r
1057                 }\r
1058 \r
1059         } catch (Throwable t) {\r
1060 \r
1061                 return t;\r
1062                 \r
1063         }\r
1064 \r
1065         return new IllegalStateException("Failed to delete directory " + file.getAbsolutePath());\r
1066 \r
1067     }\r
1068 \r
1069 }\r