]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java
Merge commit 'b3da313'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / LRU.java
1 package org.simantics.acorn.lru;
2
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.db.common.utils.Logger;
20
21 /*
22  * The order rule of synchronization for LRU and LRUObject is:
23  *   ยง Always lock LRUObject first!
24  * 
25  */
26
27 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
28         
29         public static boolean VERIFY = true;
30
31         final private long swapTime = 5L*1000000000L;
32         final private int swapSize = 200;
33
34         final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
35         final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
36         
37         final private Semaphore mutex = new Semaphore(1);
38         final private String identifier;
39         
40         private Path writeDir;
41         
42         private Thread mutexOwner;
43         
44         public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
45
46     protected final ClusterManager manager;
47         
48         public LRU(ClusterManager manager, String identifier, Path writeDir) {
49             this.manager = manager;
50                 this.identifier = identifier;
51                 this.writeDir = writeDir;
52                 resume();
53         }
54         
55         /*
56          * Public interface
57          */
58         
59         public void acquireMutex() throws IllegalAcornStateException {
60                 try {
61                         while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
62                                 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
63                         }
64                         if(VERIFY)
65                                 mutexOwner = Thread.currentThread();
66                 } catch (InterruptedException e) {
67                         throw new IllegalAcornStateException(e);
68                 }
69         }
70         
71         public void releaseMutex() {
72                 mutex.release();
73                 mutexOwner = null;
74         }
75
76         public void shutdown() {
77             if (GraphClientImpl2.DEBUG)
78                 System.err.println("Shutting down LRU writers " + writers);
79                 writers.shutdown();
80                 try {
81                         writers.awaitTermination(60, TimeUnit.SECONDS);
82                 } catch (InterruptedException e) {
83                         e.printStackTrace();
84                 }
85
86         }
87         
88         public void resume() {
89                 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
90                         
91                         @Override
92                         public Thread newThread(Runnable r) {
93                                 return new Thread(r, identifier + " File Writer");
94                         }
95                 });
96                 if (GraphClientImpl2.DEBUG)
97                     System.err.println("Resuming LRU writers " + writers);
98         }
99
100         /*
101          * This method violates the synchronization order rule between LRU and MapVAlue
102          * External synchronization is used to ensure correct operation
103          */
104         public void persist(ArrayList<String> state) throws IllegalAcornStateException {
105         
106                 acquireMutex();
107                 try {
108                         for (MapValue value : values()) {
109                                 value.acquireMutex();
110                                 // for debugging purposes
111                                 boolean persisted = false;
112                                 try {
113                                         // Persist the value if needed
114                                         persisted = value.persist();
115                                 } finally {
116                                         // WriteRunnable may want to 
117                                         value.releaseMutex();
118                                 }
119                                 // Wait pending if value was actually persisted 
120                                 waitPending(value, false);
121                                 // Take lock again
122                                 value.acquireMutex();
123                                 try {
124                                         // Record the value
125                                         state.add(value.getStateKey());
126                                 } finally {
127                                         value.releaseMutex();
128                                 }
129                         }
130                 } catch (IllegalAcornStateException e) {
131                     throw e;
132                 } catch (IOException e) {
133                     throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
134                 } catch (Throwable t) {
135                     throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
136                 } finally {
137                         releaseMutex();
138                 }
139         }
140
141         public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
142                 
143                 acquireMutex();
144                 try {
145                         return get(key);
146                 } finally {
147                         releaseMutex();
148                 }
149         }
150
151         public MapValue get(MapKey key) throws AcornAccessVerificationException {
152                 
153                 if(VERIFY) verifyAccess();
154                 
155                 return map.get(key);
156         }
157
158         public void map(MapValue info) throws AcornAccessVerificationException {
159                 
160                 if(VERIFY) verifyAccess();
161                 
162                 map.put(info.getKey(), info);
163         }
164
165         public Collection<MapValue> values() throws AcornAccessVerificationException {
166                 
167                 if(VERIFY) verifyAccess();
168                 
169                 return map.values();
170         }
171
172         public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
173                 
174                 acquireMutex();
175                 
176                 try {
177                         return swap(0, 0, null);
178                 } finally {
179                         releaseMutex();
180                 }
181                 
182         }
183
184         public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
185                 
186                 if(VERIFY) verifyAccess();
187
188                 return swap(lifeTime, targetSize, null);
189         }
190
191         /*
192          * This is called under global lock
193          */
194         public void setWriteDir(Path dir) {
195                 
196                 this.writeDir = dir;
197         }
198
199
200         /*
201          * Package access
202          */
203
204         void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
205                 
206                 if(VERIFY) verifyAccess();
207
208                 map.put(info.getKey(), info);
209                 priorityQueue.put(accessTime, info.getKey());
210         }
211
212         /*
213          * We have access to ClusterLRU - try to refresh value if available
214          */
215         boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
216
217                 if(VERIFY) verifyAccess();
218                 
219                 if(!info.tryAcquireMutex())
220                         return false;
221
222                 try {
223                         priorityQueue.remove(info.getLastAccessTime());
224                         info.accessed();
225                         map.put(info.getKey(), info);
226                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
227                         return true;
228                 } finally {
229                         info.releaseMutex();
230                 }
231         }
232
233         /*
234          * We have access to MapValue and no access to clusterLRU
235          */
236         void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
237                 
238                 if(VERIFY) {
239                         if(!needMutex) verifyAccess();
240                         info.verifyAccess();
241                 }
242                 
243                 if(needMutex)
244                         acquireMutex();
245
246                 try {
247
248                         priorityQueue.remove(info.getLastAccessTime());
249                         info.accessed();
250                         map.put(info.getKey(), info);
251                         priorityQueue.put(info.getLastAccessTime(), info.getKey());
252
253                 } catch (AcornAccessVerificationException e) {
254                     throw e;
255                 } catch (Throwable t) {
256                         throw new IllegalAcornStateException(t);
257                 } finally {
258                         if(needMutex)
259                                 releaseMutex();
260                 }
261         }
262
263         /*
264          * Private implementation
265          */
266
267         int size() throws AcornAccessVerificationException {
268                 if(VERIFY) verifyAccess();
269                 return priorityQueue.size();
270         }
271
272         boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
273                 if(VERIFY) verifyAccess();
274                 return swap(swapTime, swapSize, excluded);
275         }
276
277         boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
278
279                 if(VERIFY) verifyAccess();
280
281                 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
282                 if(valueToSwap != null) {
283                         
284                         if(valueToSwap.tryAcquireMutex()) {
285                                 try {
286                                         if(valueToSwap.canBePersisted()) {
287                                                 valueToSwap.persist();
288                                                 return true;
289                                         }
290                                 } catch (Throwable t) {
291                                         throw new IllegalAcornStateException(t);
292                                 } finally {
293                                         valueToSwap.releaseMutex();
294                                 }
295                         }
296                 }
297                 return false;
298         }
299
300         private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
301
302                 if(VERIFY) verifyAccess();
303
304                 for(int i=0;i<10;i++) {
305
306                         long candidate = getSwapCandidate(lifeTime, targetSize);
307                         if(candidate == 0) return null;
308                         
309                         MapKey key = priorityQueue.remove(candidate);
310                         if(key.equals(excluded)) {
311                                 tryRefresh(map.get(key));
312                                 continue;
313                         }
314                         
315                         return map.get(key);
316                 }
317                 return null;
318         }
319         
320         
321         private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
322
323                 if(VERIFY) verifyAccess();
324
325                 for(int i=0;i<10;i++) {
326                         
327                         // Lock LRU and get a candidate
328                         MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
329                         if(value == null) return null;
330                         
331                         if(value.tryAcquireMutex()) {
332
333                                 try {
334                                         // This may lock the object
335                                         if(value.canBePersisted())
336                                             return value;
337                                         // Insert back the value
338                                         refresh(value, false);
339                                 } finally {
340                                         value.releaseMutex();
341                                 }
342                         }
343                 }
344                 return null;
345         }
346         
347         private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
348                 
349                 if(VERIFY) verifyAccess();
350
351                 if(priorityQueue.isEmpty()) return 0;
352
353                 long currentTime = System.nanoTime();
354                 Long lowest = priorityQueue.firstKey();
355
356                 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
357                         return lowest;
358                 }
359
360                 return 0;
361
362         }
363         
364         /*
365          * Tries to persist this object. Can fail if the object cannot be persisted at this time.
366          * 
367          */
368         boolean persist(Object object_) throws AcornAccessVerificationException {
369                 
370                 MapValue object = (MapValue)object_;
371                 
372                 if(VERIFY) object.verifyAccess();
373                 
374                 if(object.isDirty()) {
375                         // It is possible that this just became unpersistable. Fail here in this case.
376                         if(!object.canBePersisted()) {
377                                 return false;
378                         }
379
380                         assert(object.isResident());
381
382                         Path f = writeDir.resolve(object.getFileName());
383
384                         WriteRunnable runnable = new WriteRunnable(f, object);
385                         
386                         synchronized(pending) {
387                                 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
388                                 assert(existing == null);
389                         }
390
391                         writers.execute(runnable);
392
393                         object.setResident(false);
394                         object.setDirty(false);
395
396                         return true;
397
398                 } else if(object.isResident()) {
399
400                         object.release();
401                         object.setResident(false);
402                         return false;
403                 }
404                 return false;
405         }
406         
407         int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
408
409                 MapValue object = (MapValue)object_;
410
411                 if(VERIFY) object.verifyAccess();
412
413                 try {
414                         object.setForceResident(keepResident);
415
416                         if(object.isResident()) {
417                                 refresh(object, true);
418                                 return 0;
419                         }
420
421                         waitPending(object, true);
422
423                         byte[] data = object.readFile();
424
425                         object.fromFile(data);
426                         object.setResident(true);
427
428                         acquireMutex();
429                         try {
430                                 refresh(object, false);
431                                 swap(swapTime, swapSize, object.getKey());
432                         } finally {
433                                 releaseMutex();
434                         }
435                         return data.length;
436                 } catch (IOException e) {
437                         throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
438                 }
439         }
440
441         static int readCounter = 0;
442         static int writeCounter = 0;
443         
444         ScheduledThreadPoolExecutor writers;
445         
446         void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
447                 
448                 WriteRunnable runnable = null;
449                 boolean inProgress = false;
450                 synchronized(pending) {
451                         runnable = pending.get(value.getKey().toString());
452                         if(runnable != null) {
453                                 synchronized(runnable) {
454                                         if(runnable.committed) {
455                                                 // just being written - just need to wait
456                                                 inProgress = true;
457                                         } else {
458                                                 runnable.committed = true;
459                                                 // we do the writing
460                                         }
461                                 }
462                         }
463                 }
464                 if(runnable != null) {
465                         if(inProgress) {
466 //                              System.err.println("reader waits for WriteRunnable to finish");
467                                 try {
468                                     if(hasMutex) {
469                                         runnable.borrowMutex = true;
470                                     }
471                                         runnable.s.acquire();
472                                 } catch (InterruptedException e) {
473                                         throw new IllegalAcornStateException(e);
474                                 }
475                         } else {
476 //                              System.err.println("reader took WriteRunnable");
477                 runnable.runReally(hasMutex);
478                         }
479                 }
480         }
481         
482         public class WriteRunnable implements Runnable {
483
484                 private Path bytes;
485                 private MapValue impl;
486                 private boolean committed = false;
487                 private boolean borrowMutex = false;
488                 private Semaphore s = new Semaphore(0);
489                 
490                 WriteRunnable(Path bytes, MapValue impl) {
491                         this.bytes = bytes;
492                         this.impl = impl;
493                 }
494                 
495                 @Override
496                 public void run() {
497                     try {
498                         synchronized(impl) {
499     
500                                 synchronized(this) {
501                                 
502                                         if(committed)
503                                             return;
504                                         
505                                         committed = true;
506                                 }
507                     runReally(false);
508                         }
509                     } catch (Throwable t) {
510                         if (t instanceof IllegalAcornStateException) {
511                             manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
512                         } else {
513                             manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
514                         }
515                         t.printStackTrace();
516                         Logger.defaultLogError(t);
517                     }
518                 }
519
520         public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
521
522             try {
523                 // These have been set in method persist
524                 assert (!impl.isResident());
525                 assert (!impl.isDirty());
526
527                 impl.toFile(bytes);
528             } finally {
529                 synchronized (pending) {
530                     pending.remove(impl.getKey().toString());
531                     s.release(Integer.MAX_VALUE);
532                 }
533             }
534
535         }
536
537         // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
538         // borrowMutex
539         public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
540
541             if (hasMutex) {
542
543                 runWithMutex();
544
545             } else {
546
547                 boolean gotMutex = impl.tryAcquireMutex();
548
549                 boolean done = false;
550                 while (!done) {
551
552                     if (gotMutex || borrowMutex) {
553                         runWithMutex();
554                         done = true;
555                     } else {
556                         System.err.println("Retry mutex acquire");
557                         gotMutex = impl.tryAcquireMutex();
558                     }
559
560                 }
561
562                 if (gotMutex)
563                     impl.releaseMutex();
564
565             }
566
567         }
568         }
569         
570         public Path getDirectory() {
571                 return writeDir;
572         }
573         
574         /*
575          * Protected implementation 
576          * 
577          */
578         
579         protected void verifyAccess() throws AcornAccessVerificationException {
580                 if (mutex.availablePermits() != 0)
581                     throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
582         }
583         
584         /*
585          * Private implementation 
586          * 
587          */
588         
589         
590 }