1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
15 import org.simantics.acorn.ClusterManager;
16 import org.simantics.acorn.GraphClientImpl2;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.db.common.utils.Logger;
22 * The order rule of synchronization for LRU and LRUObject is:
23 * ยง Always lock LRUObject first!
27 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
29 public static boolean VERIFY = true;
31 final private long swapTime = 5L*1000000000L;
32 final private int swapSize = 200;
34 final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
35 final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
37 final private Semaphore mutex = new Semaphore(1);
38 final private String identifier;
40 private Path writeDir;
42 private Thread mutexOwner;
44 public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
46 protected final ClusterManager manager;
48 public LRU(ClusterManager manager, String identifier, Path writeDir) {
49 this.manager = manager;
50 this.identifier = identifier;
51 this.writeDir = writeDir;
59 public void acquireMutex() throws IllegalAcornStateException {
61 while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
62 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
65 mutexOwner = Thread.currentThread();
66 } catch (InterruptedException e) {
67 throw new IllegalAcornStateException(e);
71 public void releaseMutex() {
76 public void shutdown() {
77 if (GraphClientImpl2.DEBUG)
78 System.err.println("Shutting down LRU writers " + writers);
81 writers.awaitTermination(60, TimeUnit.SECONDS);
82 } catch (InterruptedException e) {
88 public void resume() {
89 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
92 public Thread newThread(Runnable r) {
93 return new Thread(r, identifier + " File Writer");
96 if (GraphClientImpl2.DEBUG)
97 System.err.println("Resuming LRU writers " + writers);
101 * This method violates the synchronization order rule between LRU and MapVAlue
102 * External synchronization is used to ensure correct operation
104 public void persist(ArrayList<String> state) throws IllegalAcornStateException {
108 for (MapValue value : values()) {
109 value.acquireMutex();
110 // for debugging purposes
111 boolean persisted = false;
113 // Persist the value if needed
114 persisted = value.persist();
116 // WriteRunnable may want to
117 value.releaseMutex();
119 // Wait pending if value was actually persisted
120 waitPending(value, false);
122 value.acquireMutex();
125 state.add(value.getStateKey());
127 value.releaseMutex();
130 } catch (IllegalAcornStateException e) {
132 } catch (IOException e) {
133 throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
134 } catch (Throwable t) {
135 throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
141 public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
151 public MapValue get(MapKey key) throws AcornAccessVerificationException {
153 if(VERIFY) verifyAccess();
158 public void map(MapValue info) throws AcornAccessVerificationException {
160 if(VERIFY) verifyAccess();
162 map.put(info.getKey(), info);
165 public Collection<MapValue> values() throws AcornAccessVerificationException {
167 if(VERIFY) verifyAccess();
172 public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
177 return swap(0, 0, null);
184 public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
186 if(VERIFY) verifyAccess();
188 return swap(lifeTime, targetSize, null);
192 * This is called under global lock
194 public void setWriteDir(Path dir) {
204 void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
206 if(VERIFY) verifyAccess();
208 map.put(info.getKey(), info);
209 priorityQueue.put(accessTime, info.getKey());
213 * We have access to ClusterLRU - try to refresh value if available
215 boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
217 if(VERIFY) verifyAccess();
219 if(!info.tryAcquireMutex())
223 priorityQueue.remove(info.getLastAccessTime());
225 map.put(info.getKey(), info);
226 priorityQueue.put(info.getLastAccessTime(), info.getKey());
234 * We have access to MapValue and no access to clusterLRU
236 void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
239 if(!needMutex) verifyAccess();
248 priorityQueue.remove(info.getLastAccessTime());
250 map.put(info.getKey(), info);
251 priorityQueue.put(info.getLastAccessTime(), info.getKey());
253 } catch (AcornAccessVerificationException e) {
255 } catch (Throwable t) {
256 throw new IllegalAcornStateException(t);
264 * Private implementation
267 int size() throws AcornAccessVerificationException {
268 if(VERIFY) verifyAccess();
269 return priorityQueue.size();
272 boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
273 if(VERIFY) verifyAccess();
274 return swap(swapTime, swapSize, excluded);
277 boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
279 if(VERIFY) verifyAccess();
281 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
282 if(valueToSwap != null) {
284 if(valueToSwap.tryAcquireMutex()) {
286 if(valueToSwap.canBePersisted()) {
287 valueToSwap.persist();
290 } catch (Throwable t) {
291 throw new IllegalAcornStateException(t);
293 valueToSwap.releaseMutex();
300 private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
302 if(VERIFY) verifyAccess();
304 for(int i=0;i<10;i++) {
306 long candidate = getSwapCandidate(lifeTime, targetSize);
307 if(candidate == 0) return null;
309 MapKey key = priorityQueue.remove(candidate);
310 if(key.equals(excluded)) {
311 tryRefresh(map.get(key));
321 private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
323 if(VERIFY) verifyAccess();
325 for(int i=0;i<10;i++) {
327 // Lock LRU and get a candidate
328 MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
329 if(value == null) return null;
331 if(value.tryAcquireMutex()) {
334 // This may lock the object
335 if(value.canBePersisted())
337 // Insert back the value
338 refresh(value, false);
340 value.releaseMutex();
347 private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
349 if(VERIFY) verifyAccess();
351 if(priorityQueue.isEmpty()) return 0;
353 long currentTime = System.nanoTime();
354 Long lowest = priorityQueue.firstKey();
356 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
365 * Tries to persist this object. Can fail if the object cannot be persisted at this time.
368 boolean persist(Object object_) throws AcornAccessVerificationException {
370 MapValue object = (MapValue)object_;
372 if(VERIFY) object.verifyAccess();
374 if(object.isDirty()) {
375 // It is possible that this just became unpersistable. Fail here in this case.
376 if(!object.canBePersisted()) {
380 assert(object.isResident());
382 Path f = writeDir.resolve(object.getFileName());
384 WriteRunnable runnable = new WriteRunnable(f, object);
386 synchronized(pending) {
387 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
388 assert(existing == null);
391 writers.execute(runnable);
393 object.setResident(false);
394 object.setDirty(false);
398 } else if(object.isResident()) {
401 object.setResident(false);
407 int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
409 MapValue object = (MapValue)object_;
411 if(VERIFY) object.verifyAccess();
414 object.setForceResident(keepResident);
416 if(object.isResident()) {
417 refresh(object, true);
421 waitPending(object, true);
423 byte[] data = object.readFile();
425 object.fromFile(data);
426 object.setResident(true);
430 refresh(object, false);
431 swap(swapTime, swapSize, object.getKey());
436 } catch (IOException e) {
437 throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
441 static int readCounter = 0;
442 static int writeCounter = 0;
444 ScheduledThreadPoolExecutor writers;
446 void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
448 WriteRunnable runnable = null;
449 boolean inProgress = false;
450 synchronized(pending) {
451 runnable = pending.get(value.getKey().toString());
452 if(runnable != null) {
453 synchronized(runnable) {
454 if(runnable.committed) {
455 // just being written - just need to wait
458 runnable.committed = true;
464 if(runnable != null) {
466 // System.err.println("reader waits for WriteRunnable to finish");
469 runnable.borrowMutex = true;
471 runnable.s.acquire();
472 } catch (InterruptedException e) {
473 throw new IllegalAcornStateException(e);
476 // System.err.println("reader took WriteRunnable");
477 runnable.runReally(hasMutex);
482 public class WriteRunnable implements Runnable {
485 private MapValue impl;
486 private boolean committed = false;
487 private boolean borrowMutex = false;
488 private Semaphore s = new Semaphore(0);
490 WriteRunnable(Path bytes, MapValue impl) {
509 } catch (Throwable t) {
510 if (t instanceof IllegalAcornStateException) {
511 manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
513 manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
516 Logger.defaultLogError(t);
520 public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
523 // These have been set in method persist
524 assert (!impl.isResident());
525 assert (!impl.isDirty());
529 synchronized (pending) {
530 pending.remove(impl.getKey().toString());
531 s.release(Integer.MAX_VALUE);
537 // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
539 public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
547 boolean gotMutex = impl.tryAcquireMutex();
549 boolean done = false;
552 if (gotMutex || borrowMutex) {
556 System.err.println("Retry mutex acquire");
557 gotMutex = impl.tryAcquireMutex();
570 public Path getDirectory() {
575 * Protected implementation
579 protected void verifyAccess() throws AcornAccessVerificationException {
580 if (mutex.availablePermits() != 0)
581 throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
585 * Private implementation