Reduce the amount of annoying Retry mutex acquire print
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / LRU.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 /*
23  * The order rule of synchronization for LRU and LRUObject is:
24  *   � Always lock LRUObject first!
25  * 
26  */
27
28 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
29
30     private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class); 
31     
32         public static boolean VERIFY = true;
33
34         final private long swapTime = 5L*1000000000L;
35         final private int swapSize = 200;
36
37         final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
38         final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
39         
40         final private Semaphore mutex = new Semaphore(1);
41         final private String identifier;
42         
43         private Path writeDir;
44         
45         private Thread mutexOwner;
46         
47         public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
48
49     protected final ClusterManager manager;
50         
51         public LRU(ClusterManager manager, String identifier, Path writeDir) {
52             this.manager = manager;
53                 this.identifier = identifier;
54                 this.writeDir = writeDir;
55                 resume();
56         }
57         
58         /*
59          * Public interface
60          */
61         
62         public void acquireMutex() throws IllegalAcornStateException {
63                 try {
64                         while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
65                                 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
66                         }
67                         if(VERIFY)
68                                 mutexOwner = Thread.currentThread();
69                 } catch (InterruptedException e) {
70                         throw new IllegalAcornStateException(e);
71                 }
72         }
73         
74         public void releaseMutex() {
75                 mutex.release();
76                 mutexOwner = null;
77         }
78
79         public void shutdown() {
80             if (GraphClientImpl2.DEBUG)
81                 System.err.println("Shutting down LRU writers " + writers);
82                 writers.shutdown();
83                 try {
84                         writers.awaitTermination(60, TimeUnit.SECONDS);
85                 } catch (InterruptedException e) {
86                         e.printStackTrace();
87                 }
88
89         }
90         
91         public void resume() {
92                 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
93                         
94                         @Override
95                         public Thread newThread(Runnable r) {
96                                 return new Thread(r, identifier + " File Writer");
97                         }
98                 });
99                 if (GraphClientImpl2.DEBUG)
100                     System.err.println("Resuming LRU writers " + writers);
101         }
102
103         /*
104          * This method violates the synchronization order rule between LRU and MapVAlue
105          * External synchronization is used to ensure correct operation
106          */
107         public void persist(ArrayList<String> state) throws IllegalAcornStateException {
108         
109                 acquireMutex();
110                 try {
111                         for (MapValue value : values()) {
112                                 value.acquireMutex();
113                                 // for debugging purposes
114                                 boolean persisted = false;
115                                 try {
116                                         // Persist the value if needed
117                                         persisted = value.persist();
118                                 } finally {
119                                         // WriteRunnable may want to 
120                                         value.releaseMutex();
121                                 }
122                                 // Wait pending if value was actually persisted 
123                                 waitPending(value, false);
124                                 // Take lock again
125                                 value.acquireMutex();
126                                 try {
127                                         // Record the value
128                                         state.add(value.getStateKey());
129                                 } finally {
130                                         value.releaseMutex();
131                                 }
132                         }
133                 } catch (IllegalAcornStateException e) {
134                     throw e;
135                 } catch (IOException e) {
136                     throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
137                 } catch (Throwable t) {
138                     throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
139                 } finally {
140                         releaseMutex();
141                 }
142         }
143
144         public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
145                 
146                 acquireMutex();
147                 try {
148                         return get(key);
149                 } finally {
150                         releaseMutex();
151                 }
152         }
153
154         
155         
156         public MapValue purge(MapKey id) {
157                 return map.remove(id);
158         }
159
160         public MapValue get(MapKey key) throws AcornAccessVerificationException {
161                 
162                 if(VERIFY) verifyAccess();
163                 
164                 return map.get(key);
165         }
166
167         public void map(MapValue info) throws AcornAccessVerificationException {
168                 
169                 if(VERIFY) verifyAccess();
170                 
171                 map.put(info.getKey(), info);
172         }
173
174         public Collection<MapValue> values() throws AcornAccessVerificationException {
175                 
176                 if(VERIFY) verifyAccess();
177                 
178                 return map.values();
179         }
180
181         public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
182                 
183                 acquireMutex();
184                 
185                 try {
186                         return swap(0, 0, null);
187                 } finally {
188                         releaseMutex();
189                 }
190                 
191         }
192
193         public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
194                 
195                 if(VERIFY) verifyAccess();
196
197                 return swap(lifeTime, targetSize, null);
198         }
199
200         /*
201          * This is called under global lock
202          */
203         public void setWriteDir(Path dir) {
204                 
205                 this.writeDir = dir;
206         }
207
208
209         /*
210          * Package access
211          */
212
213         void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
214                 
215                 if(VERIFY) verifyAccess();
216
217                 map.put(info.getKey(), info);
218                 priorityQueue.put(accessTime, info.getKey());
219         }
220
221         /*
222          * We have access to ClusterLRU - try to refresh value if available
223          */
224         boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
225
226                 if(VERIFY) verifyAccess();
227                 
228                 if(!info.tryAcquireMutex())
229                         return false;
230
231                 try {
232                         priorityQueue.remove(info.getLastAccessTime());
233                         info.accessed();
234                         map.put(info.getKey(), info);
235                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
236                         return true;
237                 } finally {
238                         info.releaseMutex();
239                 }
240         }
241
242         /*
243          * We have access to MapValue and no access to clusterLRU
244          */
245         void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
246                 
247                 if(VERIFY) {
248                         if(!needMutex) verifyAccess();
249                         info.verifyAccess();
250                 }
251                 
252                 if(needMutex)
253                         acquireMutex();
254
255                 try {
256
257                         priorityQueue.remove(info.getLastAccessTime());
258                         info.accessed();
259                         map.put(info.getKey(), info);
260                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
261
262                 } catch (AcornAccessVerificationException e) {
263                     throw e;
264                 } catch (Throwable t) {
265                         throw new IllegalAcornStateException(t);
266                 } finally {
267                         if(needMutex)
268                                 releaseMutex();
269                 }
270         }
271
272         /*
273          * Private implementation
274          */
275
276         int size() throws AcornAccessVerificationException {
277                 if(VERIFY) verifyAccess();
278                 return priorityQueue.size();
279         }
280
281         boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
282                 if(VERIFY) verifyAccess();
283                 return swap(swapTime, swapSize, excluded);
284         }
285
286         boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
287
288                 if(VERIFY) verifyAccess();
289
290                 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
291                 if(valueToSwap != null) {
292                         
293                         if(valueToSwap.tryAcquireMutex()) {
294                                 try {
295                                         if(valueToSwap.canBePersisted()) {
296                                                 valueToSwap.persist();
297                                                 return true;
298                                         }
299                                 } catch (Throwable t) {
300                                         throw new IllegalAcornStateException(t);
301                                 } finally {
302                                         valueToSwap.releaseMutex();
303                                 }
304                         }
305                 }
306                 return false;
307         }
308
309         private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
310
311                 if(VERIFY) verifyAccess();
312
313                 for(int i=0;i<10;i++) {
314
315                         long candidate = getSwapCandidate(lifeTime, targetSize);
316                         if(candidate == 0) return null;
317                         
318                         MapKey key = priorityQueue.remove(candidate);
319                         if(key.equals(excluded)) {
320                                 tryRefresh(map.get(key));
321                                 continue;
322                         }
323                         
324                         return map.get(key);
325                 }
326                 return null;
327         }
328         
329         
330         private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
331
332                 if(VERIFY) verifyAccess();
333
334                 for(int i=0;i<10;i++) {
335                         
336                         // Lock LRU and get a candidate
337                         MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
338                         if(value == null) return null;
339                         
340                         if(value.tryAcquireMutex()) {
341
342                                 try {
343                                         // This may lock the object
344                                         if(value.canBePersisted())
345                                             return value;
346                                         // Insert back the value
347                                         refresh(value, false);
348                                 } finally {
349                                         value.releaseMutex();
350                                 }
351                         }
352                 }
353                 return null;
354         }
355         
356         private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
357                 
358                 if(VERIFY) verifyAccess();
359
360                 if(priorityQueue.isEmpty()) return 0;
361
362                 long currentTime = System.nanoTime();
363                 Long lowest = priorityQueue.firstKey();
364
365                 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
366                         return lowest;
367                 }
368
369                 return 0;
370
371         }
372         
373         /*
374          * Tries to persist this object. Can fail if the object cannot be persisted at this time.
375          * 
376          */
377         boolean persist(Object object_) throws AcornAccessVerificationException {
378                 
379                 MapValue object = (MapValue)object_;
380                 
381                 if(VERIFY) object.verifyAccess();
382                 
383                 if(object.isDirty()) {
384                         // It is possible that this just became unpersistable. Fail here in this case.
385                         if(!object.canBePersisted()) {
386                                 return false;
387                         }
388
389                         assert(object.isResident());
390
391                         Path f = writeDir.resolve(object.getFileName());
392
393                         WriteRunnable runnable = new WriteRunnable(f, object);
394                         
395                         synchronized(pending) {
396                                 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
397                                 assert(existing == null);
398                         }
399
400                         writers.execute(runnable);
401
402                         object.setResident(false);
403                         object.setDirty(false);
404
405                         return true;
406
407                 } else if(object.isResident()) {
408
409                         object.release();
410                         object.setResident(false);
411                         return false;
412                 }
413                 return false;
414         }
415         
416         int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
417
418                 MapValue object = (MapValue)object_;
419
420                 if(VERIFY) object.verifyAccess();
421
422                 try {
423                         object.setForceResident(keepResident);
424
425                         if(object.isResident()) {
426                                 refresh(object, true);
427                                 return 0;
428                         }
429
430                         waitPending(object, true);
431
432                         byte[] data = object.readFile();
433
434                         object.fromFile(data);
435                         object.setResident(true);
436
437                         acquireMutex();
438                         try {
439                                 refresh(object, false);
440                                 swap(swapTime, swapSize, object.getKey());
441                         } finally {
442                                 releaseMutex();
443                         }
444                         return data.length;
445                 } catch (IOException e) {
446                         throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
447                 }
448         }
449
450         static int readCounter = 0;
451         static int writeCounter = 0;
452         
453         ScheduledThreadPoolExecutor writers;
454         
455         void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
456                 
457                 WriteRunnable runnable = null;
458                 boolean inProgress = false;
459                 synchronized(pending) {
460                         runnable = pending.get(value.getKey().toString());
461                         if(runnable != null) {
462                                 synchronized(runnable) {
463                                         if(runnable.committed) {
464                                                 // just being written - just need to wait
465                                                 inProgress = true;
466                                         } else {
467                                                 runnable.committed = true;
468                                                 // we do the writing
469                                         }
470                                 }
471                         }
472                 }
473                 if(runnable != null) {
474                         if(inProgress) {
475 //                              System.err.println("reader waits for WriteRunnable to finish");
476                                 try {
477                                     if(hasMutex) {
478                                         runnable.borrowMutex = true;
479                                     }
480                                         runnable.s.acquire();
481                                 } catch (InterruptedException e) {
482                                         throw new IllegalAcornStateException(e);
483                                 }
484                         } else {
485 //                              System.err.println("reader took WriteRunnable");
486                 runnable.runReally(hasMutex);
487                         }
488                 }
489         }
490         
491         public class WriteRunnable implements Runnable {
492
493                 private Path bytes;
494                 private MapValue impl;
495                 private boolean committed = false;
496                 private boolean borrowMutex = false;
497                 private Semaphore s = new Semaphore(0);
498                 
499                 WriteRunnable(Path bytes, MapValue impl) {
500                         this.bytes = bytes;
501                         this.impl = impl;
502                 }
503                 
504                 @Override
505                 public void run() {
506                     try {
507                         synchronized(impl) {
508     
509                                 synchronized(this) {
510                                 
511                                         if(committed)
512                                             return;
513                                         
514                                         committed = true;
515                                 }
516                     runReally(false);
517                         }
518                     } catch (Throwable t) {
519                         if (t instanceof IllegalAcornStateException) {
520                             manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
521                         } else {
522                             manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
523                         }
524                         t.printStackTrace();
525                         LOGGER.error("Exception happened in WriteRunnable.run", t);
526                     }
527                 }
528
529         public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
530
531             try {
532                 // These have been set in method persist
533                 assert (!impl.isResident());
534                 assert (!impl.isDirty());
535
536                 impl.toFile(bytes);
537             } finally {
538                 synchronized (pending) {
539                     pending.remove(impl.getKey().toString());
540                     s.release(Integer.MAX_VALUE);
541                 }
542             }
543
544         }
545
546         // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
547         // borrowMutex
548         public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
549
550             if (hasMutex) {
551
552                 runWithMutex();
553
554             } else {
555
556                 boolean gotMutex = impl.tryAcquireMutex();
557
558                 boolean done = false;
559                 int count = 0;
560                 long startTime = 0;
561                 while (!done) {
562
563                     if (gotMutex || borrowMutex) {
564                         runWithMutex();
565                         done = true;
566                     } else {
567                         if (count % 10 == 0) {
568                             // Taking too long, sleep for a while.
569                             LOGGER.warn("Retry mutex acquire");
570                             try {
571                                 Thread.sleep(10);
572                             } catch (InterruptedException e) {
573                             }
574                         }
575                         gotMutex = impl.tryAcquireMutex();
576                         long currentTime = System.currentTimeMillis();
577                         if ((currentTime - startTime) > 10) {
578                             startTime = currentTime;
579                             count++;
580                         }
581                     }
582
583                 }
584
585                 if (gotMutex)
586                     impl.releaseMutex();
587
588             }
589
590         }
591         }
592         
593         public Path getDirectory() {
594                 return writeDir;
595         }
596         
597         /*
598          * Protected implementation 
599          * 
600          */
601         
602         protected void verifyAccess() throws AcornAccessVerificationException {
603                 if (mutex.availablePermits() != 0)
604                     throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
605         }
606         
607         /*
608          * Private implementation 
609          * 
610          */
611         
612         
613 }