]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java
Initial version of purge
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / LRU.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.db.common.utils.Logger;
20
21 /*
22  * The order rule of synchronization for LRU and LRUObject is:
23  *   ยง Always lock LRUObject first!
24  * 
25  */
26
27 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
28         
29         public static boolean VERIFY = true;
30
31         final private long swapTime = 5L*1000000000L;
32         final private int swapSize = 200;
33
34         final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
35         final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
36         
37         final private Semaphore mutex = new Semaphore(1);
38         final private String identifier;
39         
40         private Path writeDir;
41         
42         private Thread mutexOwner;
43         
44         public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
45
46     protected final ClusterManager manager;
47         
48         public LRU(ClusterManager manager, String identifier, Path writeDir) {
49             this.manager = manager;
50                 this.identifier = identifier;
51                 this.writeDir = writeDir;
52                 resume();
53         }
54         
55         /*
56          * Public interface
57          */
58         
59         public void acquireMutex() throws IllegalAcornStateException {
60                 try {
61                         while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
62                                 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
63                         }
64                         if(VERIFY)
65                                 mutexOwner = Thread.currentThread();
66                 } catch (InterruptedException e) {
67                         throw new IllegalAcornStateException(e);
68                 }
69         }
70         
71         public void releaseMutex() {
72                 mutex.release();
73                 mutexOwner = null;
74         }
75
76         public void shutdown() {
77             if (GraphClientImpl2.DEBUG)
78                 System.err.println("Shutting down LRU writers " + writers);
79                 writers.shutdown();
80                 try {
81                         writers.awaitTermination(60, TimeUnit.SECONDS);
82                 } catch (InterruptedException e) {
83                         e.printStackTrace();
84                 }
85
86         }
87         
88         public void resume() {
89                 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
90                         
91                         @Override
92                         public Thread newThread(Runnable r) {
93                                 return new Thread(r, identifier + " File Writer");
94                         }
95                 });
96                 if (GraphClientImpl2.DEBUG)
97                     System.err.println("Resuming LRU writers " + writers);
98         }
99
100         /*
101          * This method violates the synchronization order rule between LRU and MapVAlue
102          * External synchronization is used to ensure correct operation
103          */
104         public void persist(ArrayList<String> state) throws IllegalAcornStateException {
105         
106                 acquireMutex();
107                 try {
108                         for (MapValue value : values()) {
109                                 value.acquireMutex();
110                                 // for debugging purposes
111                                 boolean persisted = false;
112                                 try {
113                                         // Persist the value if needed
114                                         persisted = value.persist();
115                                 } finally {
116                                         // WriteRunnable may want to 
117                                         value.releaseMutex();
118                                 }
119                                 // Wait pending if value was actually persisted 
120                                 waitPending(value, false);
121                                 // Take lock again
122                                 value.acquireMutex();
123                                 try {
124                                         // Record the value
125                                         state.add(value.getStateKey());
126                                 } finally {
127                                         value.releaseMutex();
128                                 }
129                         }
130                 } catch (IllegalAcornStateException e) {
131                     throw e;
132                 } catch (IOException e) {
133                     throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
134                 } catch (Throwable t) {
135                     throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
136                 } finally {
137                         releaseMutex();
138                 }
139         }
140
141         public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
142                 
143                 acquireMutex();
144                 try {
145                         return get(key);
146                 } finally {
147                         releaseMutex();
148                 }
149         }
150
151         
152         
153         public MapValue purge(MapKey id) {
154                 return map.remove(id);
155         }
156
157         public MapValue get(MapKey key) throws AcornAccessVerificationException {
158                 
159                 if(VERIFY) verifyAccess();
160                 
161                 return map.get(key);
162         }
163
164         public void map(MapValue info) throws AcornAccessVerificationException {
165                 
166                 if(VERIFY) verifyAccess();
167                 
168                 map.put(info.getKey(), info);
169         }
170
171         public Collection<MapValue> values() throws AcornAccessVerificationException {
172                 
173                 if(VERIFY) verifyAccess();
174                 
175                 return map.values();
176         }
177
178         public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
179                 
180                 acquireMutex();
181                 
182                 try {
183                         return swap(0, 0, null);
184                 } finally {
185                         releaseMutex();
186                 }
187                 
188         }
189
190         public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
191                 
192                 if(VERIFY) verifyAccess();
193
194                 return swap(lifeTime, targetSize, null);
195         }
196
197         /*
198          * This is called under global lock
199          */
200         public void setWriteDir(Path dir) {
201                 
202                 this.writeDir = dir;
203         }
204
205
206         /*
207          * Package access
208          */
209
210         void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
211                 
212                 if(VERIFY) verifyAccess();
213
214                 map.put(info.getKey(), info);
215                 priorityQueue.put(accessTime, info.getKey());
216         }
217
218         /*
219          * We have access to ClusterLRU - try to refresh value if available
220          */
221         boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
222
223                 if(VERIFY) verifyAccess();
224                 
225                 if(!info.tryAcquireMutex())
226                         return false;
227
228                 try {
229                         priorityQueue.remove(info.getLastAccessTime());
230                         info.accessed();
231                         map.put(info.getKey(), info);
232                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
233                         return true;
234                 } finally {
235                         info.releaseMutex();
236                 }
237         }
238
239         /*
240          * We have access to MapValue and no access to clusterLRU
241          */
242         void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
243                 
244                 if(VERIFY) {
245                         if(!needMutex) verifyAccess();
246                         info.verifyAccess();
247                 }
248                 
249                 if(needMutex)
250                         acquireMutex();
251
252                 try {
253
254                         priorityQueue.remove(info.getLastAccessTime());
255                         info.accessed();
256                         map.put(info.getKey(), info);
257                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
258
259                 } catch (AcornAccessVerificationException e) {
260                     throw e;
261                 } catch (Throwable t) {
262                         throw new IllegalAcornStateException(t);
263                 } finally {
264                         if(needMutex)
265                                 releaseMutex();
266                 }
267         }
268
269         /*
270          * Private implementation
271          */
272
273         int size() throws AcornAccessVerificationException {
274                 if(VERIFY) verifyAccess();
275                 return priorityQueue.size();
276         }
277
278         boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
279                 if(VERIFY) verifyAccess();
280                 return swap(swapTime, swapSize, excluded);
281         }
282
283         boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
284
285                 if(VERIFY) verifyAccess();
286
287                 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
288                 if(valueToSwap != null) {
289                         
290                         if(valueToSwap.tryAcquireMutex()) {
291                                 try {
292                                         if(valueToSwap.canBePersisted()) {
293                                                 valueToSwap.persist();
294                                                 return true;
295                                         }
296                                 } catch (Throwable t) {
297                                         throw new IllegalAcornStateException(t);
298                                 } finally {
299                                         valueToSwap.releaseMutex();
300                                 }
301                         }
302                 }
303                 return false;
304         }
305
306         private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
307
308                 if(VERIFY) verifyAccess();
309
310                 for(int i=0;i<10;i++) {
311
312                         long candidate = getSwapCandidate(lifeTime, targetSize);
313                         if(candidate == 0) return null;
314                         
315                         MapKey key = priorityQueue.remove(candidate);
316                         if(key.equals(excluded)) {
317                                 tryRefresh(map.get(key));
318                                 continue;
319                         }
320                         
321                         return map.get(key);
322                 }
323                 return null;
324         }
325         
326         
327         private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
328
329                 if(VERIFY) verifyAccess();
330
331                 for(int i=0;i<10;i++) {
332                         
333                         // Lock LRU and get a candidate
334                         MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
335                         if(value == null) return null;
336                         
337                         if(value.tryAcquireMutex()) {
338
339                                 try {
340                                         // This may lock the object
341                                         if(value.canBePersisted())
342                                             return value;
343                                         // Insert back the value
344                                         refresh(value, false);
345                                 } finally {
346                                         value.releaseMutex();
347                                 }
348                         }
349                 }
350                 return null;
351         }
352         
353         private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
354                 
355                 if(VERIFY) verifyAccess();
356
357                 if(priorityQueue.isEmpty()) return 0;
358
359                 long currentTime = System.nanoTime();
360                 Long lowest = priorityQueue.firstKey();
361
362                 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
363                         return lowest;
364                 }
365
366                 return 0;
367
368         }
369         
370         /*
371          * Tries to persist this object. Can fail if the object cannot be persisted at this time.
372          * 
373          */
374         boolean persist(Object object_) throws AcornAccessVerificationException {
375                 
376                 MapValue object = (MapValue)object_;
377                 
378                 if(VERIFY) object.verifyAccess();
379                 
380                 if(object.isDirty()) {
381                         // It is possible that this just became unpersistable. Fail here in this case.
382                         if(!object.canBePersisted()) {
383                                 return false;
384                         }
385
386                         assert(object.isResident());
387
388                         Path f = writeDir.resolve(object.getFileName());
389
390                         WriteRunnable runnable = new WriteRunnable(f, object);
391                         
392                         synchronized(pending) {
393                                 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
394                                 assert(existing == null);
395                         }
396
397                         writers.execute(runnable);
398
399                         object.setResident(false);
400                         object.setDirty(false);
401
402                         return true;
403
404                 } else if(object.isResident()) {
405
406                         object.release();
407                         object.setResident(false);
408                         return false;
409                 }
410                 return false;
411         }
412         
413         int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
414
415                 MapValue object = (MapValue)object_;
416
417                 if(VERIFY) object.verifyAccess();
418
419                 try {
420                         object.setForceResident(keepResident);
421
422                         if(object.isResident()) {
423                                 refresh(object, true);
424                                 return 0;
425                         }
426
427                         waitPending(object, true);
428
429                         byte[] data = object.readFile();
430
431                         object.fromFile(data);
432                         object.setResident(true);
433
434                         acquireMutex();
435                         try {
436                                 refresh(object, false);
437                                 swap(swapTime, swapSize, object.getKey());
438                         } finally {
439                                 releaseMutex();
440                         }
441                         return data.length;
442                 } catch (IOException e) {
443                         throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
444                 }
445         }
446
447         static int readCounter = 0;
448         static int writeCounter = 0;
449         
450         ScheduledThreadPoolExecutor writers;
451         
452         void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
453                 
454                 WriteRunnable runnable = null;
455                 boolean inProgress = false;
456                 synchronized(pending) {
457                         runnable = pending.get(value.getKey().toString());
458                         if(runnable != null) {
459                                 synchronized(runnable) {
460                                         if(runnable.committed) {
461                                                 // just being written - just need to wait
462                                                 inProgress = true;
463                                         } else {
464                                                 runnable.committed = true;
465                                                 // we do the writing
466                                         }
467                                 }
468                         }
469                 }
470                 if(runnable != null) {
471                         if(inProgress) {
472 //                              System.err.println("reader waits for WriteRunnable to finish");
473                                 try {
474                                     if(hasMutex) {
475                                         runnable.borrowMutex = true;
476                                     }
477                                         runnable.s.acquire();
478                                 } catch (InterruptedException e) {
479                                         throw new IllegalAcornStateException(e);
480                                 }
481                         } else {
482 //                              System.err.println("reader took WriteRunnable");
483                 runnable.runReally(hasMutex);
484                         }
485                 }
486         }
487         
488         public class WriteRunnable implements Runnable {
489
490                 private Path bytes;
491                 private MapValue impl;
492                 private boolean committed = false;
493                 private boolean borrowMutex = false;
494                 private Semaphore s = new Semaphore(0);
495                 
496                 WriteRunnable(Path bytes, MapValue impl) {
497                         this.bytes = bytes;
498                         this.impl = impl;
499                 }
500                 
501                 @Override
502                 public void run() {
503                     try {
504                         synchronized(impl) {
505     
506                                 synchronized(this) {
507                                 
508                                         if(committed)
509                                             return;
510                                         
511                                         committed = true;
512                                 }
513                     runReally(false);
514                         }
515                     } catch (Throwable t) {
516                         if (t instanceof IllegalAcornStateException) {
517                             manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
518                         } else {
519                             manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
520                         }
521                         t.printStackTrace();
522                         Logger.defaultLogError(t);
523                     }
524                 }
525
526         public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
527
528             try {
529                 // These have been set in method persist
530                 assert (!impl.isResident());
531                 assert (!impl.isDirty());
532
533                 impl.toFile(bytes);
534             } finally {
535                 synchronized (pending) {
536                     pending.remove(impl.getKey().toString());
537                     s.release(Integer.MAX_VALUE);
538                 }
539             }
540
541         }
542
543         // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
544         // borrowMutex
545         public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
546
547             if (hasMutex) {
548
549                 runWithMutex();
550
551             } else {
552
553                 boolean gotMutex = impl.tryAcquireMutex();
554
555                 boolean done = false;
556                 while (!done) {
557
558                     if (gotMutex || borrowMutex) {
559                         runWithMutex();
560                         done = true;
561                     } else {
562                         System.err.println("Retry mutex acquire");
563                         gotMutex = impl.tryAcquireMutex();
564                     }
565
566                 }
567
568                 if (gotMutex)
569                     impl.releaseMutex();
570
571             }
572
573         }
574         }
575         
576         public Path getDirectory() {
577                 return writeDir;
578         }
579         
580         /*
581          * Protected implementation 
582          * 
583          */
584         
585         protected void verifyAccess() throws AcornAccessVerificationException {
586                 if (mutex.availablePermits() != 0)
587                     throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
588         }
589         
590         /*
591          * Private implementation 
592          * 
593          */
594         
595         
596 }