508127db9f6f538718d13e20cc3d98de71b1662c
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / LRU.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14
15 import org.simantics.acorn.GraphClientImpl2;
16 import org.simantics.db.common.utils.Logger;
17
18 /*
19  * The order rule of synchronization for LRU and LRUObject is:
20  *   ยง Always lock LRUObject first!
21  * 
22  */
23
24 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
25         
26         public static boolean VERIFY = true;
27
28         final private long swapTime = 5L*1000000000L;
29         final private int swapSize = 200;
30
31         final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
32         final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
33         
34         final private Semaphore mutex = new Semaphore(1);
35         final private String identifier;
36         
37         private Path writeDir;
38         
39         private Thread mutexOwner;
40         
41         public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
42         
43         public LRU(String identifier, Path writeDir) {
44                 this.identifier = identifier;
45                 this.writeDir = writeDir;
46                 resume();
47         }
48         
49         /*
50          * Public interface
51          */
52         
53         public void acquireMutex() {
54                 
55                 try {
56                         
57                         while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
58                                 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
59                         }
60                                 
61                         if(VERIFY)
62                                 mutexOwner = Thread.currentThread();
63                                 
64                 } catch (InterruptedException e) {
65                         throw new IllegalStateException(e);
66                 }
67         }
68         
69         public void releaseMutex() {
70                 mutex.release();
71                 mutexOwner = null;
72         }
73
74         public void shutdown() {
75             if (GraphClientImpl2.DEBUG)
76                 System.err.println("Shutting down LRU writers " + writers);
77                 writers.shutdown();
78                 try {
79                         writers.awaitTermination(60, TimeUnit.SECONDS);
80                 } catch (InterruptedException e) {
81                         e.printStackTrace();
82                 }
83
84         }
85         
86         public void resume() {
87                 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
88                         
89                         @Override
90                         public Thread newThread(Runnable r) {
91                                 return new Thread(r, identifier + " File Writer");
92                         }
93                         
94                 });
95                 if (GraphClientImpl2.DEBUG)
96                     System.err.println("Resuming LRU writers " + writers);
97         }
98
99         /*
100          * This method violates the synchronization order rule between LRU and MapVAlue
101          * External synchronization is used to ensure correct operation
102          */
103         public void persist(ArrayList<String> state) {
104         
105                 acquireMutex();
106                 
107                 try {
108                 
109                         for (MapValue value : values()) {
110                                 value.acquireMutex();
111                                 // for debugging purposes
112                                 boolean persisted = false;
113                                 try {
114                                         // Persist the value if needed
115                                         persisted = value.persist();
116                                 } finally {
117                                         // WriteRunnable may want to 
118                                         value.releaseMutex();
119                                 }
120                                 // Wait pending if value was actually persisted 
121                                 waitPending(value, false);
122                                 // Take lock again
123                                 value.acquireMutex();
124                                 try {
125                                         // Record the value
126                                         state.add(value.getStateKey());
127                                 } catch (Throwable t) {
128                                         throw new IllegalStateException(t);
129                                 } finally {
130                                         value.releaseMutex();
131                                 }
132                         }
133                         
134                 } catch (Throwable t) {
135                         throw new IllegalStateException(t);
136                 } finally {
137                         releaseMutex();
138                 }
139
140         }
141
142         public MapValue getWithoutMutex(MapKey key) {
143                 
144                 acquireMutex();
145                 try {
146                         return get(key);
147                 } catch (Throwable t) {
148                         throw new IllegalStateException(t);
149                 } finally {
150                         releaseMutex();
151                 }
152                 
153         }
154
155         public MapValue get(MapKey key) {
156                 
157                 if(VERIFY) verifyAccess();
158                 
159                 return map.get(key);
160                 
161         }
162
163         public void map(MapValue info) {
164                 
165                 if(VERIFY) verifyAccess();
166                 
167                 map.put(info.getKey(), info);
168                 
169         }
170
171         public Collection<MapValue> values() {
172                 
173                 if(VERIFY) verifyAccess();
174                 
175                 return map.values();
176                 
177         }
178
179         public boolean swapForced() {
180                 
181                 acquireMutex();
182                 
183                 try {
184                         return swap(0, 0, null);
185                 } catch (Throwable t) {
186                         throw new IllegalStateException(t);
187                 } finally {
188                         releaseMutex();
189                 }
190                 
191         }
192
193         public boolean swap(long lifeTime, int targetSize) {
194                 
195                 if(VERIFY) verifyAccess();
196
197                 return swap(lifeTime, targetSize, null);
198                 
199         }
200
201         /*
202          * This is called under global lock
203          */
204         public void setWriteDir(Path dir) {
205                 
206                 this.writeDir = dir;
207                 
208         }
209
210
211         /*
212          * Package access
213          */
214
215         void insert(MapValue info, long accessTime) {
216                 
217                 if(VERIFY) verifyAccess();
218
219                 map.put(info.getKey(), info);
220                 priorityQueue.put(accessTime, info.getKey());
221                 
222         }
223
224         /*
225          * We have access to ClusterLRU - try to refresh value if available
226          */
227         boolean tryRefresh(MapValue info) {
228
229                 if(VERIFY) verifyAccess();
230                 
231                 if(!info.tryAcquireMutex())
232                         return false;
233
234                 try {
235
236                         priorityQueue.remove(info.getLastAccessTime());
237                         info.accessed();
238                         map.put(info.getKey(), info);
239                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
240                         
241                         return true;
242
243                 } catch (Throwable t) {
244                         throw new IllegalStateException(t);
245                 } finally {
246
247                         info.releaseMutex();
248                         
249                 }
250                 
251         }
252
253         /*
254          * We have access to MapValue and no access to clusterLRU
255          */
256         void refresh(MapValue info, boolean needMutex) {
257                 
258                 if(VERIFY) {
259                         if(!needMutex) verifyAccess();
260                         info.verifyAccess();
261                 }
262                 
263                 if(needMutex)
264                         acquireMutex();
265
266                 try {
267
268                         priorityQueue.remove(info.getLastAccessTime());
269                         info.accessed();
270                         map.put(info.getKey(), info);
271                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
272
273                 } catch (Throwable t) {
274                         throw new IllegalStateException(t);
275                 } finally {
276
277                         if(needMutex)
278                                 releaseMutex();
279                         
280                 }
281                 
282         }
283
284         /*
285          * Private implementation
286          */
287
288         public int size() {
289                 
290                 if(VERIFY) verifyAccess();
291
292                 return priorityQueue.size();
293                 
294         }
295
296         boolean swap(MapKey excluded) {
297                 
298                 if(VERIFY) verifyAccess();
299
300                 return swap(swapTime, swapSize, excluded);
301                 
302         }
303
304         boolean swap(long lifeTime, int targetSize, MapKey excluded) {
305
306                 if(VERIFY) verifyAccess();
307
308                 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
309                 if(valueToSwap != null) {
310                         
311                         if(valueToSwap.tryAcquireMutex()) {
312                                 
313                                 try {
314                                         
315                                         if(valueToSwap.canBePersisted()) {
316                                                 valueToSwap.persist();
317                                                 return true;
318                                         }
319                                         
320                                 } catch (Throwable t) {
321                                         throw new IllegalStateException(t);
322                                 } finally {
323                                         valueToSwap.releaseMutex();
324                                 }
325                         }
326                         
327                 }
328                 
329                 return false;
330
331         }
332
333         
334         private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) {
335
336                 if(VERIFY) verifyAccess();
337
338                 for(int i=0;i<10;i++) {
339
340                         long candidate = getSwapCandidate(lifeTime, targetSize);
341                         if(candidate == 0) return null;
342                         
343                         MapKey key = priorityQueue.remove(candidate);
344                         if(key.equals(excluded)) {
345                                 tryRefresh(map.get(key));
346                                 continue;
347                         }
348                         
349                         return map.get(key);
350                         
351                 }
352                 
353                 return null;
354
355         }
356         
357         
358         private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) {
359
360                 if(VERIFY) verifyAccess();
361
362                 for(int i=0;i<10;i++) {
363                         
364                         // Lock LRU and get a candidate
365                         MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
366                         if(value == null) return null;
367                         
368                         if(value.tryAcquireMutex()) {
369
370                                 try {
371                                         
372                                         // This may lock the object
373                                         if(value.canBePersisted()) return value;
374                                         // Insert back the value
375                                         refresh(value, false);
376         
377                                 } catch (Throwable t) {
378                                         throw new IllegalStateException(t);
379                                 } finally {
380                                         
381                                         value.releaseMutex();
382                                         
383                                 }
384                                 
385                         }
386                         
387                 }
388                 
389                 return null;
390                 
391         }
392         
393         private long getSwapCandidate(long lifeTime, int targetSize) {
394                 
395                 if(VERIFY) verifyAccess();
396
397                 if(priorityQueue.isEmpty()) return 0;
398
399                 long currentTime = System.nanoTime();
400                 Long lowest = priorityQueue.firstKey();
401
402                 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
403                         return lowest;
404                 }
405
406                 return 0;
407
408         }
409         
410         /*
411          * Tries to persist this object. Can fail if the object cannot be persisted at this time.
412          * 
413          */
414         boolean persist(Object object_) {
415                 
416                 MapValue object = (MapValue)object_;
417                 
418                 if(VERIFY) object.verifyAccess();
419                 
420                 if(object.isDirty()) {
421
422                         // It is possible that this just became unpersistable. Fail here in this case.
423                         if(!object.canBePersisted()) {
424                                 return false;
425                         }
426
427                         assert(object.isResident());
428
429                         Path f = writeDir.resolve(object.getFileName());
430
431                         WriteRunnable runnable = new WriteRunnable(f, object);
432                         
433                         synchronized(pending) {
434                                 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
435                                 assert(existing == null);
436                         }
437
438                         writers.execute(runnable);
439
440                         object.setResident(false);
441                         object.setDirty(false);
442
443                         return true;
444
445                 } else if(object.isResident()) {
446
447                         object.release();
448                         object.setResident(false);
449                         return false;
450
451                 }
452
453                 return false;
454                 
455         }
456         
457         int makeResident(Object object_, boolean keepResident) {
458
459                 MapValue object = (MapValue)object_;
460
461                 if(VERIFY) object.verifyAccess();
462
463                 try {
464
465                         object.setForceResident(keepResident);
466
467                         if(object.isResident()) {
468                                 refresh(object, true);
469                                 return 0;
470                         }
471
472                         waitPending(object, true);
473
474                         byte[] data = object.readFile();
475
476                         object.fromFile(data);
477                         object.setResident(true);
478
479                         acquireMutex();
480                         try {
481                                 refresh(object, false);
482                                 swap(swapTime, swapSize, object.getKey());
483                         } catch (Throwable t) {
484                                 throw new IllegalStateException(t);
485                         } finally {
486                                 releaseMutex();
487                         }
488
489                         return data.length;
490
491                 } catch (IOException e) {
492                         
493                         e.printStackTrace();
494                         
495                 }
496                 
497                 return 0;
498                 
499         }
500
501         static int readCounter = 0;
502         static int writeCounter = 0;
503         
504         ScheduledThreadPoolExecutor writers;
505         
506         void waitPending(MapValue value, boolean hasMutex) {
507                 
508                 WriteRunnable r = null;
509                 boolean inProgress = false;
510                 synchronized(pending) {
511                         r = pending.get(value.getKey().toString());
512                         if(r != null) {
513                                 synchronized(r) {
514                                         if(r.committed) {
515                                                 // just being written - just need to wait
516                                                 inProgress = true;
517                                         } else {
518                                                 r.committed = true;
519                                                 // we do the writing
520                                         }
521                                 }
522                         }
523                 }
524                 if(r != null) {
525                         if(inProgress) {
526 //                              System.err.println("reader waits for WriteRunnable to finish");
527                                 try {
528                                         r.s.acquire();
529                                 } catch (InterruptedException e) {
530                                         e.printStackTrace();
531                                 }
532                         } else {
533 //                              System.err.println("reader took WriteRunnable");
534                                 try {
535                     r.runReally(hasMutex);
536                 } catch (Throwable e) {
537                     e.printStackTrace();
538                     Logger.defaultLogError(e);
539                 }
540                         }
541                 }
542                 
543         }
544         
545         public class WriteRunnable implements Runnable {
546
547                 Path bytes;
548                 MapValue impl;
549                 boolean committed = false;
550                 private Semaphore s = new Semaphore(0);
551                 
552                 WriteRunnable(Path bytes, MapValue impl) {
553                         this.bytes = bytes;
554                         this.impl = impl;
555                 }
556                 
557                 @Override
558                 public void run() {
559                         synchronized(impl) {
560
561                                 synchronized(this) {
562                                 
563                                         if(committed) return;
564                                         
565                                         committed = true;
566                                 
567                                 }
568                     try {
569                         runReally(false);
570                     } catch (Throwable e) {
571                         e.printStackTrace();
572                         Logger.defaultLogError(e);
573                     }
574                         }
575                 }
576
577                 public void runReally(boolean hasMutex) throws IOException {
578                 
579                         if(!hasMutex)
580                                 impl.acquireMutex();
581                         
582                         try {
583                                 
584                                 // These have been set in method persist
585                                 assert(!impl.isResident());
586                                 assert(!impl.isDirty());
587                                 
588                                 impl.toFile(bytes);
589
590                                 synchronized(pending) {
591                                         pending.remove(impl.getKey().toString());
592                                         s.release(Integer.MAX_VALUE);
593                                 }
594                         } finally {
595                                 if(!hasMutex)
596                                         impl.releaseMutex();
597                         }
598                         
599                 }
600         
601         }
602         
603         public Path getDirectory() {
604                 return writeDir;
605         }
606         
607         /*
608          * Protected implementation 
609          * 
610          */
611         
612         protected void verifyAccess() {
613 //          assert (mutex.availablePermits() == 0);
614                 if (mutex.availablePermits() != 0)
615                     throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
616         }
617         
618         /*
619          * Private implementation 
620          * 
621          */
622         
623         
624 }