1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.db.common.utils.Logger;
22 * The order rule of synchronization for LRU and LRUObject is:
23 * ยง Always lock LRUObject first!
27 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
29 public static boolean VERIFY = true;
31 final private long swapTime = 5L*1000000000L;
32 final private int swapSize = 200;
34 final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
35 final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
37 final private Semaphore mutex = new Semaphore(1);
38 final private String identifier;
40 private Path writeDir;
42 private Thread mutexOwner;
44 public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
46 protected final ClusterManager manager;
48 public LRU(ClusterManager manager, String identifier, Path writeDir) {
49 this.manager = manager;
50 this.identifier = identifier;
51 this.writeDir = writeDir;
59 public void acquireMutex() throws IllegalAcornStateException {
61 while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
62 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
65 mutexOwner = Thread.currentThread();
66 } catch (InterruptedException e) {
67 throw new IllegalAcornStateException(e);
71 public void releaseMutex() {
76 public void shutdown() {
77 if (GraphClientImpl2.DEBUG)
78 System.err.println("Shutting down LRU writers " + writers);
81 writers.awaitTermination(60, TimeUnit.SECONDS);
82 } catch (InterruptedException e) {
88 public void resume() {
89 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
92 public Thread newThread(Runnable r) {
93 return new Thread(r, identifier + " File Writer");
96 if (GraphClientImpl2.DEBUG)
97 System.err.println("Resuming LRU writers " + writers);
101 * This method violates the synchronization order rule between LRU and MapVAlue
102 * External synchronization is used to ensure correct operation
104 public void persist(ArrayList<String> state) throws IllegalAcornStateException {
108 for (MapValue value : values()) {
109 value.acquireMutex();
110 // for debugging purposes
111 boolean persisted = false;
113 // Persist the value if needed
114 persisted = value.persist();
116 // WriteRunnable may want to
117 value.releaseMutex();
119 // Wait pending if value was actually persisted
120 waitPending(value, false);
122 value.acquireMutex();
125 state.add(value.getStateKey());
127 value.releaseMutex();
130 } catch (IllegalAcornStateException e) {
132 } catch (IOException e) {
133 throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
134 } catch (Throwable t) {
135 throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
141 public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
153 public MapValue purge(MapKey id) {
154 return map.remove(id);
157 public MapValue get(MapKey key) throws AcornAccessVerificationException {
159 if(VERIFY) verifyAccess();
164 public void map(MapValue info) throws AcornAccessVerificationException {
166 if(VERIFY) verifyAccess();
168 map.put(info.getKey(), info);
171 public Collection<MapValue> values() throws AcornAccessVerificationException {
173 if(VERIFY) verifyAccess();
178 public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
183 return swap(0, 0, null);
190 public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
192 if(VERIFY) verifyAccess();
194 return swap(lifeTime, targetSize, null);
198 * This is called under global lock
200 public void setWriteDir(Path dir) {
210 void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
212 if(VERIFY) verifyAccess();
214 map.put(info.getKey(), info);
215 priorityQueue.put(accessTime, info.getKey());
219 * We have access to ClusterLRU - try to refresh value if available
221 boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
223 if(VERIFY) verifyAccess();
225 if(!info.tryAcquireMutex())
229 priorityQueue.remove(info.getLastAccessTime());
231 map.put(info.getKey(), info);
232 priorityQueue.put(info.getLastAccessTime(), info.getKey());
240 * We have access to MapValue and no access to clusterLRU
242 void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
245 if(!needMutex) verifyAccess();
254 priorityQueue.remove(info.getLastAccessTime());
256 map.put(info.getKey(), info);
257 priorityQueue.put(info.getLastAccessTime(), info.getKey());
259 } catch (AcornAccessVerificationException e) {
261 } catch (Throwable t) {
262 throw new IllegalAcornStateException(t);
270 * Private implementation
273 int size() throws AcornAccessVerificationException {
274 if(VERIFY) verifyAccess();
275 return priorityQueue.size();
278 boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
279 if(VERIFY) verifyAccess();
280 return swap(swapTime, swapSize, excluded);
283 boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
285 if(VERIFY) verifyAccess();
287 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
288 if(valueToSwap != null) {
290 if(valueToSwap.tryAcquireMutex()) {
292 if(valueToSwap.canBePersisted()) {
293 valueToSwap.persist();
296 } catch (Throwable t) {
297 throw new IllegalAcornStateException(t);
299 valueToSwap.releaseMutex();
306 private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
308 if(VERIFY) verifyAccess();
310 for(int i=0;i<10;i++) {
312 long candidate = getSwapCandidate(lifeTime, targetSize);
313 if(candidate == 0) return null;
315 MapKey key = priorityQueue.remove(candidate);
316 if(key.equals(excluded)) {
317 tryRefresh(map.get(key));
327 private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
329 if(VERIFY) verifyAccess();
331 for(int i=0;i<10;i++) {
333 // Lock LRU and get a candidate
334 MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
335 if(value == null) return null;
337 if(value.tryAcquireMutex()) {
340 // This may lock the object
341 if(value.canBePersisted())
343 // Insert back the value
344 refresh(value, false);
346 value.releaseMutex();
353 private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
355 if(VERIFY) verifyAccess();
357 if(priorityQueue.isEmpty()) return 0;
359 long currentTime = System.nanoTime();
360 Long lowest = priorityQueue.firstKey();
362 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
371 * Tries to persist this object. Can fail if the object cannot be persisted at this time.
374 boolean persist(Object object_) throws AcornAccessVerificationException {
376 MapValue object = (MapValue)object_;
378 if(VERIFY) object.verifyAccess();
380 if(object.isDirty()) {
381 // It is possible that this just became unpersistable. Fail here in this case.
382 if(!object.canBePersisted()) {
386 assert(object.isResident());
388 Path f = writeDir.resolve(object.getFileName());
390 WriteRunnable runnable = new WriteRunnable(f, object);
392 synchronized(pending) {
393 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
394 assert(existing == null);
397 writers.execute(runnable);
399 object.setResident(false);
400 object.setDirty(false);
404 } else if(object.isResident()) {
407 object.setResident(false);
413 int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
415 MapValue object = (MapValue)object_;
417 if(VERIFY) object.verifyAccess();
420 object.setForceResident(keepResident);
422 if(object.isResident()) {
423 refresh(object, true);
427 waitPending(object, true);
429 byte[] data = object.readFile();
431 object.fromFile(data);
432 object.setResident(true);
436 refresh(object, false);
437 swap(swapTime, swapSize, object.getKey());
442 } catch (IOException e) {
443 throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
447 static int readCounter = 0;
448 static int writeCounter = 0;
450 ScheduledThreadPoolExecutor writers;
452 void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
454 WriteRunnable runnable = null;
455 boolean inProgress = false;
456 synchronized(pending) {
457 runnable = pending.get(value.getKey().toString());
458 if(runnable != null) {
459 synchronized(runnable) {
460 if(runnable.committed) {
461 // just being written - just need to wait
464 runnable.committed = true;
470 if(runnable != null) {
472 // System.err.println("reader waits for WriteRunnable to finish");
475 runnable.borrowMutex = true;
477 runnable.s.acquire();
478 } catch (InterruptedException e) {
479 throw new IllegalAcornStateException(e);
482 // System.err.println("reader took WriteRunnable");
483 runnable.runReally(hasMutex);
488 public class WriteRunnable implements Runnable {
491 private MapValue impl;
492 private boolean committed = false;
493 private boolean borrowMutex = false;
494 private Semaphore s = new Semaphore(0);
496 WriteRunnable(Path bytes, MapValue impl) {
515 } catch (Throwable t) {
516 if (t instanceof IllegalAcornStateException) {
517 manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
519 manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
522 Logger.defaultLogError(t);
526 public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
529 // These have been set in method persist
530 assert (!impl.isResident());
531 assert (!impl.isDirty());
535 synchronized (pending) {
536 pending.remove(impl.getKey().toString());
537 s.release(Integer.MAX_VALUE);
543 // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
545 public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
553 boolean gotMutex = impl.tryAcquireMutex();
555 boolean done = false;
558 if (gotMutex || borrowMutex) {
562 System.err.println("Retry mutex acquire");
563 gotMutex = impl.tryAcquireMutex();
576 public Path getDirectory() {
581 * Protected implementation
585 protected void verifyAccess() throws AcornAccessVerificationException {
586 if (mutex.availablePermits() != 0)
587 throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
591 * Private implementation