]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.indexing/src/org/simantics/db/indexing/IndexedRelationsImpl.java
Removed contact application support prints
[simantics/platform.git] / bundles / org.simantics.db.indexing / src / org / simantics / db / indexing / IndexedRelationsImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.indexing;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.WeakHashMap;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ForkJoinPool;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantReadWriteLock;
28
29 import org.apache.lucene.queryparser.classic.ParseException;
30 import org.eclipse.core.runtime.IProgressMonitor;
31 import org.eclipse.core.runtime.SubMonitor;
32 import org.simantics.db.ReadGraph;
33 import org.simantics.db.RequestProcessor;
34 import org.simantics.db.Resource;
35 import org.simantics.db.common.request.ReadRequest;
36 import org.simantics.db.common.request.UniqueRead;
37 import org.simantics.db.common.utils.NameUtils;
38 import org.simantics.db.exception.DatabaseException;
39 import org.simantics.db.exception.InvalidResourceReferenceException;
40 import org.simantics.db.indexing.IndexedRelationsSearcherBase.State;
41 import org.simantics.db.layer0.adapter.GenericRelation;
42 import org.simantics.db.layer0.genericrelation.IndexException;
43 import org.simantics.db.layer0.genericrelation.IndexedRelations;
44 import org.simantics.db.layer0.util.Layer0Utils;
45 import org.simantics.db.service.QueryControl;
46 import org.simantics.db.service.SerialisationSupport;
47 import org.simantics.operation.Layer0X;
48 import org.simantics.utils.datastructures.Pair;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * @author Tuukka Lehtonen
54  * @author Antti Villberg
55  */
56 public class IndexedRelationsImpl implements IndexedRelations {
57
58     private static final Logger LOGGER = LoggerFactory.getLogger(IndexedRelationsImpl.class);
59     
60     Map<Object, RWLock> indexLocks = new WeakHashMap<Object, RWLock>();
61
62     static class LockHandle {
63         public final Object id;
64         public final Lock   lock;
65
66         public LockHandle(Object id, Lock lock) {
67             this.id = id;
68             this.lock = lock;
69         }
70
71         public void unlock() {
72             if (IndexPolicy.TRACE_INDEX_LOCKING)
73                 System.out.println("Unlocking index " + id);
74             lock.unlock();
75         }
76     }
77
78     static class RWLock {
79         public final Object                 id;
80         public final ReentrantReadWriteLock lock;
81
82         public RWLock(Object id) {
83             this.id = id;
84             this.lock = new ReentrantReadWriteLock(true);
85         }
86
87         LockHandle lock(RequestProcessor processor, boolean write) {
88             Lock l = write ? lock.writeLock() : lock.readLock();
89             if(processor instanceof ReadGraph) {
90                 ReadGraph graph = (ReadGraph)processor;
91                 while(!l.tryLock()) {
92                         QueryControl qc = processor.getService(QueryControl.class);
93                         boolean executed = qc.resume(graph);
94                         if(!executed) {
95                                                 try {
96                                                         Thread.sleep(1);
97                                                 } catch (InterruptedException e) {
98                                                 }
99                         }
100                 }
101             } else {
102                 l.lock();
103             }
104             if (IndexPolicy.TRACE_INDEX_LOCKING)
105                 System.out.println("Locked index " + id);
106             return new LockHandle(id, l);
107         }
108         
109         LockHandle tryLock(RequestProcessor processor, boolean write) {
110             Lock l = write ? lock.writeLock() : lock.readLock();
111             if(l.tryLock()) return new LockHandle(id, l);
112             else return null;
113         }
114         
115     }
116
117     private LockHandle lock(RequestProcessor processor, Object lockIdentifier, boolean write) {
118         RWLock rwlock = null;
119         synchronized (indexLocks) {
120             rwlock = indexLocks.get(lockIdentifier);
121             if (rwlock == null) {
122                 rwlock = new RWLock(lockIdentifier);
123                 indexLocks.put(lockIdentifier, rwlock);
124             }
125         }
126         return rwlock.lock(processor, write);
127     }
128
129     private LockHandle tryLock(RequestProcessor processor, Object lockIdentifier, boolean write) {
130         RWLock rwlock = null;
131         synchronized (indexLocks) {
132             rwlock = indexLocks.get(lockIdentifier);
133             if (rwlock == null) {
134                 rwlock = new RWLock(lockIdentifier);
135                 indexLocks.put(lockIdentifier, rwlock);
136             }
137         }
138         return rwlock.tryLock(processor, write);
139     }
140
141     private static IndexedRelationsSearcherBase makeSearcher(final RequestProcessor processor, final Resource relation, final Resource input) {
142         try {
143                         return processor.syncRequest(new UniqueRead<IndexedRelationsSearcherBase>() {
144
145                                 @Override
146                                 public IndexedRelationsSearcherBase perform(ReadGraph graph) throws DatabaseException {
147                                         if(graph.isImmutable(input)) {
148                                                 return MemoryIndexing.getInstance(processor.getSession()).getImmutable(processor, relation, input);
149                                         } else {
150                                                 return MemoryIndexing.getInstance(processor.getSession()).get(processor, relation, input);
151                                         }
152                                 }
153                                 
154                         });
155                 } catch (DatabaseException e) {
156                         throw new IllegalStateException(e);
157                 }
158     }
159     
160     private LockHandle waitLoaded(SubMonitor progress, final IndexedRelationsSearcherBase searcher, RequestProcessor processor, LockHandle lock, final Object lockId, final Resource input) throws IndexException {
161         
162         // Initial state: we are locked, no news about the index
163         // Final state: we are locked and the index has been loaded, the current lock is returned
164
165         // First just check if the index is loaded
166         if (searcher.isIndexAvailable()) {
167                 // We have an index - try to start access
168                 searcher.startAccess(progress.newChild(50), processor.getSession(), false);
169                 // At this point we have three options:
170                 // 1. we have access
171                 if(searcher.hasAccess(false)) return lock;
172                 // 2. something is wrong and the index cannot be cleaned
173                 if(searcher.checkState(State.PROBLEM)) throw new IndexException("Searcher is in problematic state", searcher.getException());
174                 // 3. something was wrong, but the index has been successfully cleaned
175         }
176
177         searcher.assertState(State.NONE);
178
179         // We loop until the index is loaded
180         while(true) {
181                 
182                 // With ReadGraph we can proceed to initialize
183                 if(processor instanceof ReadGraph) {
184
185                 // (re)create the index.
186                 try {
187                     SerialisationSupport ss = processor.getService(SerialisationSupport.class);
188                     searcher.initializeIndex(progress.newChild(40), (ReadGraph)processor, new Object[] { ss.getRandomAccessId(input) }, true);
189                                         searcher.setReady();
190                     searcher.startAccess(progress.newChild(10), processor.getSession(), false);
191                         if(searcher.hasAccess(false)) return lock;
192                 } catch (IOException e) {
193                                         searcher.setProblem(e);
194                     throw new IndexException(e);
195                 } catch (DatabaseException e) {
196                                         searcher.setProblem(e);
197                     throw new IndexException(e);
198                 }
199                         
200                 }
201                 // With session we schedule the job
202                 else {
203
204                         // Release lock
205                         lock.unlock();
206                         
207 //                      final Semaphore s = new Semaphore(0);
208                         
209                         // Schedule job
210                         
211                         boolean success = false;
212                         int tries = 0;
213                         while(!success && (++tries)<10) {
214
215                                 try {
216
217                                         success = processor.sync(new UniqueRead<Boolean>() {
218
219                                                 @Override
220                                                 public Boolean perform(ReadGraph graph) throws DatabaseException {
221
222                                                         // Obtain lock
223                                                         LockHandle lock = tryLock(graph, lockId, true);
224                                                         if(lock == null) return false;
225
226                                                         try {
227
228                                                                 boolean loaded = false;
229                                                                 if (searcher.isIndexAvailable()) {
230                                                                         searcher.startAccess(null, graph.getSession(), false);
231                                                                         // At this point we have three options:
232                                                                         // 1. we have access
233                                                                         if(searcher.hasAccess(false)) loaded = true;
234                                                                         // 2. something is wrong and the index cannot be cleaned
235                                                                         if(searcher.checkState(State.PROBLEM)) throw new DatabaseException("Searcher is in problematic state", searcher.getException());
236                                                                         // 3. something was wrong, but the index has been successfully cleaned
237                                                                 }
238
239                                                                 if(!loaded) {
240                                                                         
241                                                                 if(!searcher.checkState(State.NONE)) 
242                                                                         throw new DatabaseException("Illegal searcher state " + searcher.state());
243
244                                                                         try {
245                                                                             SerialisationSupport ss = graph.getService(SerialisationSupport.class);
246                                                                                 searcher.initializeIndex(null, graph, new Object[] { ss.getRandomAccessId(input) }, true);
247                                                                                 searcher.setReady();
248                                                                         } catch (IOException e) {
249                                                                                 searcher.setProblem(e);
250                                                                                 throw new DatabaseException(e);
251                                                                         }    
252
253                                                                 }
254
255                                                         } finally {
256                                                                 
257                                                                 lock.unlock();
258 //                                                              s.release();
259                                                                 
260                                                         }
261
262                                                         return true;
263
264                                                 }
265
266                                         });
267
268                                 } catch (DatabaseException e) {
269                                         throw new IndexException(e);
270                                 }
271
272                         }
273                         
274                         if(!success)
275                                 throw new IndexException("Did not manage to load index.");
276                 
277                 // Try again
278
279                 // Obtain lock
280                                 lock= lock(processor, lockId, true);                    
281                         
282                         if (searcher.isIndexAvailable()) {
283                                 searcher.startAccess(progress.newChild(50), processor.getSession(), false);
284                                 if(searcher.hasAccess(false)) return lock;
285                                 throw new IndexException("Illegal searcher state " + searcher.state());
286                         }
287                         
288                 }
289                 
290         }
291
292         
293     }
294     
295     @Override
296     public List<Resource> queryResources(IProgressMonitor monitor, String search, RequestProcessor processor,
297             Resource relation, final Resource input, int maxResultCount) {
298         if (processor == null)
299             throw new IllegalArgumentException("null processor");
300         if (relation == null)
301             throw new IllegalArgumentException("null relation");
302         if (input == null)
303             throw new IllegalArgumentException("null input");
304         if (search == null)
305             throw new IllegalArgumentException("null search criterion");
306
307         SubMonitor progress = SubMonitor.convert(monitor, 100);
308
309         // Look for existing index.
310         // Indexes always exist in secondary storage, i.e. disk.
311         // Indexes can be cached in memory when necessary performance-wise.
312
313         final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input);
314
315         final Object lockId = Pair.make(relation, input);
316
317         LockHandle lock = lock(processor, lockId, false);
318
319         // Ensure that index is loaded & ready
320         lock = waitLoaded(progress, searcher, processor, lock, lockId, input);
321
322         // Perform query
323         try {
324             return searcher.doSearchResources(progress.newChild(50), processor, search, maxResultCount);
325         } catch (ParseException e) {
326             // FIXME: should throw an exception, not just ignore.
327             e.printStackTrace();
328             return Collections.emptyList();
329             //throw new IndexException(e);
330         } catch (IOException e) {
331             throw new IndexException(e);
332         } catch (DatabaseException e) {
333             throw new IndexException(e);
334         } finally {
335                 lock.unlock();
336         }
337     }
338
339     @Override
340     public List<Map<String, Object>> query(IProgressMonitor monitor, String search, RequestProcessor processor,
341             Resource relation, final Resource input, int maxResultCount) {
342         if (processor == null)
343             throw new IllegalArgumentException("null processor");
344         if (relation == null)
345             throw new IllegalArgumentException("null relation");
346         if (input == null)
347             throw new IllegalArgumentException("null input");
348         if (search == null)
349             throw new IllegalArgumentException("null search criterion");
350
351         SubMonitor progress = SubMonitor.convert(monitor, 100);
352
353         // Look for existing index.
354         // Indexes always exist in secondary storage, i.e. disk.
355         // Indexes can be cached in memory when necessary performance-wise.
356
357         final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relation, input);
358
359         final Object lockId = Pair.make(relation, input);
360
361         LockHandle lock = lock(processor, lockId, false);
362
363         // Ensure that index is loaded & ready
364         lock = waitLoaded(progress, searcher, processor, lock, lockId, input);
365
366         // Perform query
367         try {
368             return searcher.doSearch(progress.newChild(50), processor, search, maxResultCount);
369         } catch (ParseException e) {
370             // FIXME: should throw an exception, not just ignore.
371             e.printStackTrace();
372             return Collections.emptyList();
373             //throw new IndexException(e);
374         } catch (IOException e) {
375             throw new IndexException(e);
376         } catch (DatabaseException e) {
377             throw new IndexException(e);
378         } finally {
379                 lock.unlock();
380         }
381     }
382
383     @Override
384     public void insert(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,  
385             Resource relationResource, Resource input, Collection<Object[]> documents) throws IndexException {
386
387 //        System.out.println("Inserting to index: " + input + " " + documents);
388
389         if (relation == null)
390             throw new IllegalArgumentException("null relation");
391         if (input == null)
392             throw new IllegalArgumentException("null input");
393         if (documents == null)
394             throw new IllegalArgumentException("null documents");
395
396         if (documents.isEmpty())
397             return;
398
399         final SubMonitor progress = SubMonitor.convert(monitor, 100);
400
401         final IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
402
403         LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
404         
405         try {
406                 
407                 DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
408                 if(!searcher.startAccess(null, processor.getSession(), true)) {
409                 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
410                         return;
411                 }
412                 
413             searcher.insertIndex(progress.newChild(40), relation, 1, documents);
414             
415         } catch (InvalidResourceReferenceException e) {
416             throw new IndexException(e);
417         } catch (IOException e) {
418             throw new IndexException(e);
419         } catch (DatabaseException e) {
420             throw new IndexException(e);
421         } finally {
422                 handle.unlock();
423         }
424     }
425
426     @Override
427     public void remove(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
428             Resource relationResource, Resource input, String key, Collection<Object> keyValues) throws IndexException {
429
430         if (relation == null)
431             throw new IllegalArgumentException("null relation");
432         if (input == null)
433             throw new IllegalArgumentException("null input");
434         if (key == null)
435             throw new IllegalArgumentException("null key");
436
437         SubMonitor progress = SubMonitor.convert(monitor, 100);
438
439         IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
440
441         LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
442         try {
443                 
444             DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
445                 if(!searcher.startAccess(null, processor.getSession(), true)) {
446                 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
447                         return;
448                 }
449                 
450             searcher.removeIndex(progress.newChild(40), relation, processor, key, keyValues);
451             
452         } catch (DatabaseException e) {
453             throw new IndexException(e);
454         } catch (IOException e) {
455             throw new IndexException(e);
456         } finally {
457                 handle.unlock();
458         }
459     }
460
461     @Override
462     public void removeAll(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
463             Resource relationResource, Resource input) {
464
465         if (relation == null)
466             throw new IllegalArgumentException("null relation");
467         if (input == null)
468             throw new IllegalArgumentException("null input");
469
470         IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
471         
472         LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
473
474         try {
475                 
476                 Throwable t = searcher.bestEffortClear(monitor, processor.getSession());
477                 if(t != null) searcher.setProblem(t);
478                 else searcher.setNone();
479                 
480                 } finally {
481                         handle.unlock();
482                 }
483         
484     }
485     
486     @Override
487     public boolean replace(IProgressMonitor monitor, RequestProcessor processor, GenericRelation relation,
488             Resource relationResource, Resource input, String key, Collection<Object> keyValues, Collection<Object[]> documents) throws IndexException {
489
490         if (relation == null)
491             throw new IllegalArgumentException("null relation");
492         if (input == null)
493             throw new IllegalArgumentException("null input");
494         if (key == null)
495             throw new IllegalArgumentException("null key");
496
497         SubMonitor progress = SubMonitor.convert(monitor, 100);
498
499         IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
500
501         LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
502
503         boolean didChange = false;
504
505         try {
506                 
507                 DatabaseIndexing.markIndexChanged(processor.getSession(), searcher.getIndexPath());
508                 if(!searcher.startAccess(null, processor.getSession(), true)) {
509                 // Could not write index for some reason. Ignore and let the next index query reinitialize the index.
510                         return true;
511                 }
512             didChange |= searcher.replaceIndex(progress.newChild(40), key, keyValues, relation, 1, documents);
513             
514         } catch (InvalidResourceReferenceException e) {
515             throw new IndexException(e);
516         } catch (IOException e) {
517             throw new IndexException(e);
518         } catch (DatabaseException e) {
519             throw new IndexException(e);
520         } catch (Throwable t) {
521             throw new IndexException(t);
522         } finally {
523                 handle.unlock();
524         }
525         
526         return didChange;
527
528     }
529     
530     @Override
531     public void reset(IProgressMonitor monitor, RequestProcessor processor, Resource relationResource, Resource input) throws IndexException {
532
533         IndexedRelationsSearcherBase searcher = makeSearcher(processor, relationResource, input);
534
535         LockHandle handle = lock(processor, Pair.make(relationResource, input), true);
536         Path path = DatabaseIndexing.getIndexLocation(processor.getSession(), relationResource, input);
537         try {
538             searcher.changeState(monitor, processor.getSession(), State.NONE);
539             if (!searcher.checkState(State.NONE))
540                 throw new IndexException("Could not close index for input " + input + " before removing it");
541
542             DatabaseIndexing.deleteIndex(path);
543         } catch (IOException e) {
544             LOGGER.error("Could not delete {}", path.toAbsolutePath(), e);
545             throw new IndexException(e);
546         } finally {
547             handle.unlock();
548         }
549
550     }
551
552     @Override
553     public void fullRebuild(IProgressMonitor monitor, RequestProcessor processor) throws IndexException {
554         try {
555             processor.syncRequest(new ReadRequest() {
556                 @Override
557                 public void run(ReadGraph graph) throws DatabaseException {
558                     try {
559                         fullRebuild(monitor, graph);
560                     } catch (IOException e) {
561                         throw new DatabaseException(e);
562                     }
563                 }
564             });
565         } catch (DatabaseException e) {
566             throw new IndexException(e);
567         }
568     }
569
570     private void fullRebuild(IProgressMonitor monitor, ReadGraph graph) throws DatabaseException, IOException {
571         long startTime = System.currentTimeMillis();
572         Resource relation = Layer0X.getInstance(graph).DependenciesRelation;
573         SerialisationSupport ss = graph.getService(SerialisationSupport.class);
574         Set<Resource> indexRoots = Layer0Utils.listIndexRoots(graph);
575         List<CompletableFuture<?>> waitFor = new ArrayList<>(indexRoots.size());
576         SubMonitor mon = SubMonitor.convert(monitor, indexRoots.size()*2);
577
578         for (Resource indexRoot : indexRoots) {
579             monitor.subTask(NameUtils.getSafeName(graph, indexRoot));
580
581             IndexedRelationsSearcherBase searcher = makeSearcher(graph, relation, indexRoot);
582
583             GenericRelation r = graph.adapt(relation, GenericRelation.class);
584             if (r == null)
585                 throw new DatabaseException("Given resource " + relation + "could not be adapted to GenericRelation.");
586
587             Object[] bound = new Object[] { ss.getRandomAccessId(indexRoot) };
588             GenericRelation selection = r.select(IndexedRelationsSearcherBase.getPattern(r, bound.length), bound);
589
590             long relStart = System.currentTimeMillis();
591             List<Object[]> results = selection.realize(graph);
592             if (LOGGER.isDebugEnabled()) {
593                 long relEnd = System.currentTimeMillis() - relStart;
594                 LOGGER.debug(indexRoot + " realized " + relEnd);
595             }
596             mon.worked(1);
597
598             CompletableFuture<?> result = new CompletableFuture<>();
599             waitFor.add(result);
600             ForkJoinPool.commonPool().submit(() -> {
601                 long startTime1 = System.currentTimeMillis();
602                 try {
603                     searcher.initializeIndexImpl(result, mon.newChild(1, SubMonitor.SUPPRESS_ALL_LABELS), r, results, bound, true);
604                     searcher.setReady();
605                 } catch (IOException e) {
606                     result.completeExceptionally(e);
607                     LOGGER.error("Could not initialize index", e);
608                 } finally {
609                     if (LOGGER.isDebugEnabled())
610                         LOGGER.debug(indexRoot + " initialized " + (System.currentTimeMillis() - startTime1));
611                 }
612             });
613         }
614         for (CompletableFuture<?> fut : waitFor) {
615             try {
616                 fut.get();
617             } catch (InterruptedException | ExecutionException e) {
618                 throw (IOException) e.getCause();
619             }
620         }
621         if (LOGGER.isInfoEnabled()) {
622             long endTime = System.currentTimeMillis() - startTime;
623             LOGGER.info("All indexes rebuilt in {}", endTime);
624         }
625     }
626
627 }