1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
23 * The order rule of synchronization for LRU and LRUObject is:
24 * � Always lock LRUObject first!
28 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
30 private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
32 public static boolean VERIFY = true;
34 final private long swapTime = 5L*1000000000L;
35 final private int swapSize = 200;
37 final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
38 final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
40 final private Semaphore mutex = new Semaphore(1);
41 final private String identifier;
43 private Path writeDir;
45 private Thread mutexOwner;
47 public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
49 protected final ClusterManager manager;
51 public LRU(ClusterManager manager, String identifier, Path writeDir) {
52 this.manager = manager;
53 this.identifier = identifier;
54 this.writeDir = writeDir;
62 public void acquireMutex() throws IllegalAcornStateException {
64 while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
65 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
68 mutexOwner = Thread.currentThread();
69 } catch (InterruptedException e) {
70 throw new IllegalAcornStateException(e);
74 public void releaseMutex() {
79 public void shutdown() {
80 if (GraphClientImpl2.DEBUG)
81 System.err.println("Shutting down LRU writers " + writers);
84 writers.awaitTermination(60, TimeUnit.SECONDS);
85 } catch (InterruptedException e) {
91 public void resume() {
92 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
95 public Thread newThread(Runnable r) {
96 return new Thread(r, identifier + " File Writer");
99 if (GraphClientImpl2.DEBUG)
100 System.err.println("Resuming LRU writers " + writers);
104 * This method violates the synchronization order rule between LRU and MapVAlue
105 * External synchronization is used to ensure correct operation
107 public void persist(ArrayList<String> state) throws IllegalAcornStateException {
111 for (MapValue value : values()) {
112 value.acquireMutex();
113 // for debugging purposes
114 boolean persisted = false;
116 // Persist the value if needed
117 persisted = value.persist();
119 // WriteRunnable may want to
120 value.releaseMutex();
122 // Wait pending if value was actually persisted
123 waitPending(value, false);
125 value.acquireMutex();
128 state.add(value.getStateKey());
130 value.releaseMutex();
133 } catch (IllegalAcornStateException e) {
135 } catch (IOException e) {
136 throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
137 } catch (Throwable t) {
138 throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
144 public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
156 public MapValue purge(MapKey id) {
157 return map.remove(id);
160 public MapValue get(MapKey key) throws AcornAccessVerificationException {
162 if(VERIFY) verifyAccess();
167 public void map(MapValue info) throws AcornAccessVerificationException {
169 if(VERIFY) verifyAccess();
171 map.put(info.getKey(), info);
174 public Collection<MapValue> values() throws AcornAccessVerificationException {
176 if(VERIFY) verifyAccess();
181 public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
186 return swap(0, 0, null);
193 public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
195 if(VERIFY) verifyAccess();
197 return swap(lifeTime, targetSize, null);
201 * This is called under global lock
203 public void setWriteDir(Path dir) {
213 void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
215 if(VERIFY) verifyAccess();
217 map.put(info.getKey(), info);
218 priorityQueue.put(accessTime, info.getKey());
222 * We have access to ClusterLRU - try to refresh value if available
224 boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
226 if(VERIFY) verifyAccess();
228 if(!info.tryAcquireMutex())
232 priorityQueue.remove(info.getLastAccessTime());
234 map.put(info.getKey(), info);
235 priorityQueue.put(info.getLastAccessTime(), info.getKey());
243 * We have access to MapValue and no access to clusterLRU
245 void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
248 if(!needMutex) verifyAccess();
257 priorityQueue.remove(info.getLastAccessTime());
259 map.put(info.getKey(), info);
260 priorityQueue.put(info.getLastAccessTime(), info.getKey());
262 } catch (AcornAccessVerificationException e) {
264 } catch (Throwable t) {
265 throw new IllegalAcornStateException(t);
273 * Private implementation
276 int size() throws AcornAccessVerificationException {
277 if(VERIFY) verifyAccess();
278 return priorityQueue.size();
281 boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
282 if(VERIFY) verifyAccess();
283 return swap(swapTime, swapSize, excluded);
286 boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
288 if(VERIFY) verifyAccess();
290 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
291 if(valueToSwap != null) {
293 if(valueToSwap.tryAcquireMutex()) {
295 if(valueToSwap.canBePersisted()) {
296 valueToSwap.persist();
299 } catch (Throwable t) {
300 throw new IllegalAcornStateException(t);
302 valueToSwap.releaseMutex();
309 private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
311 if(VERIFY) verifyAccess();
313 for(int i=0;i<10;i++) {
315 long candidate = getSwapCandidate(lifeTime, targetSize);
316 if(candidate == 0) return null;
318 MapKey key = priorityQueue.remove(candidate);
319 if(key.equals(excluded)) {
320 tryRefresh(map.get(key));
330 private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
332 if(VERIFY) verifyAccess();
334 for(int i=0;i<10;i++) {
336 // Lock LRU and get a candidate
337 MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
338 if(value == null) return null;
340 if(value.tryAcquireMutex()) {
343 // This may lock the object
344 if(value.canBePersisted())
346 // Insert back the value
347 refresh(value, false);
349 value.releaseMutex();
356 private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
358 if(VERIFY) verifyAccess();
360 if(priorityQueue.isEmpty()) return 0;
362 long currentTime = System.nanoTime();
363 Long lowest = priorityQueue.firstKey();
365 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
374 * Tries to persist this object. Can fail if the object cannot be persisted at this time.
377 boolean persist(Object object_) throws AcornAccessVerificationException {
379 MapValue object = (MapValue)object_;
381 if(VERIFY) object.verifyAccess();
383 if(object.isDirty()) {
384 // It is possible that this just became unpersistable. Fail here in this case.
385 if(!object.canBePersisted()) {
389 assert(object.isResident());
391 Path f = writeDir.resolve(object.getFileName());
393 WriteRunnable runnable = new WriteRunnable(f, object);
395 synchronized(pending) {
396 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
397 assert(existing == null);
400 writers.execute(runnable);
402 object.setResident(false);
403 object.setDirty(false);
407 } else if(object.isResident()) {
410 object.setResident(false);
416 int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
418 MapValue object = (MapValue)object_;
420 if(VERIFY) object.verifyAccess();
423 object.setForceResident(keepResident);
425 if(object.isResident()) {
426 refresh(object, true);
430 waitPending(object, true);
432 byte[] data = object.readFile();
434 object.fromFile(data);
435 object.setResident(true);
439 refresh(object, false);
440 swap(swapTime, swapSize, object.getKey());
445 } catch (IOException e) {
446 throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
450 static int readCounter = 0;
451 static int writeCounter = 0;
453 ScheduledThreadPoolExecutor writers;
455 void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
457 WriteRunnable runnable = null;
458 boolean inProgress = false;
459 synchronized(pending) {
460 runnable = pending.get(value.getKey().toString());
461 if(runnable != null) {
462 synchronized(runnable) {
463 if(runnable.committed) {
464 // just being written - just need to wait
467 runnable.committed = true;
473 if(runnable != null) {
475 // System.err.println("reader waits for WriteRunnable to finish");
478 runnable.borrowMutex = true;
480 runnable.s.acquire();
481 } catch (InterruptedException e) {
482 throw new IllegalAcornStateException(e);
485 // System.err.println("reader took WriteRunnable");
486 runnable.runReally(hasMutex);
491 public class WriteRunnable implements Runnable {
494 private MapValue impl;
495 private boolean committed = false;
496 private boolean borrowMutex = false;
497 private Semaphore s = new Semaphore(0);
499 WriteRunnable(Path bytes, MapValue impl) {
518 } catch (Throwable t) {
519 if (t instanceof IllegalAcornStateException) {
520 manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
522 manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
525 LOGGER.error("Exception happened in WriteRunnable.run", t);
529 public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
532 // These have been set in method persist
533 assert (!impl.isResident());
534 assert (!impl.isDirty());
538 synchronized (pending) {
539 pending.remove(impl.getKey().toString());
540 s.release(Integer.MAX_VALUE);
546 // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
548 public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
556 boolean gotMutex = impl.tryAcquireMutex();
558 boolean done = false;
561 if (gotMutex || borrowMutex) {
565 LOGGER.warn("Retry mutex acquire");
566 gotMutex = impl.tryAcquireMutex();
579 public Path getDirectory() {
584 * Protected implementation
588 protected void verifyAccess() throws AcornAccessVerificationException {
589 if (mutex.availablePermits() != 0)
590 throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
594 * Private implementation