1 package org.simantics.acorn.lru;
3 import java.io.IOException;
4 import java.nio.file.Path;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.HashMap;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.Semaphore;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
15 import org.simantics.acorn.GraphClientImpl2;
16 import org.simantics.db.common.utils.Logger;
19 * The order rule of synchronization for LRU and LRUObject is:
20 * ยง Always lock LRUObject first!
24 public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
26 public static boolean VERIFY = true;
28 final private long swapTime = 5L*1000000000L;
29 final private int swapSize = 200;
31 final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
32 final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
34 final private Semaphore mutex = new Semaphore(1);
35 final private String identifier;
37 private Path writeDir;
39 private Thread mutexOwner;
41 public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
43 public LRU(String identifier, Path writeDir) {
44 this.identifier = identifier;
45 this.writeDir = writeDir;
53 public void acquireMutex() {
57 while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
58 System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
62 mutexOwner = Thread.currentThread();
64 } catch (InterruptedException e) {
65 throw new IllegalStateException(e);
69 public void releaseMutex() {
74 public void shutdown() {
75 if (GraphClientImpl2.DEBUG)
76 System.err.println("Shutting down LRU writers " + writers);
79 writers.awaitTermination(60, TimeUnit.SECONDS);
80 } catch (InterruptedException e) {
86 public void resume() {
87 writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
90 public Thread newThread(Runnable r) {
91 return new Thread(r, identifier + " File Writer");
95 if (GraphClientImpl2.DEBUG)
96 System.err.println("Resuming LRU writers " + writers);
100 * This method violates the synchronization order rule between LRU and MapVAlue
101 * External synchronization is used to ensure correct operation
103 public void persist(ArrayList<String> state) {
109 for (MapValue value : values()) {
110 value.acquireMutex();
111 // for debugging purposes
112 boolean persisted = false;
114 // Persist the value if needed
115 persisted = value.persist();
117 // WriteRunnable may want to
118 value.releaseMutex();
120 // Wait pending if value was actually persisted
121 waitPending(value, false);
123 value.acquireMutex();
126 state.add(value.getStateKey());
127 } catch (Throwable t) {
128 throw new IllegalStateException(t);
130 value.releaseMutex();
134 } catch (Throwable t) {
135 throw new IllegalStateException(t);
142 public MapValue getWithoutMutex(MapKey key) {
147 } catch (Throwable t) {
148 throw new IllegalStateException(t);
155 public MapValue get(MapKey key) {
157 if(VERIFY) verifyAccess();
163 public void map(MapValue info) {
165 if(VERIFY) verifyAccess();
167 map.put(info.getKey(), info);
171 public Collection<MapValue> values() {
173 if(VERIFY) verifyAccess();
179 public boolean swapForced() {
184 return swap(0, 0, null);
185 } catch (Throwable t) {
186 throw new IllegalStateException(t);
193 public boolean swap(long lifeTime, int targetSize) {
195 if(VERIFY) verifyAccess();
197 return swap(lifeTime, targetSize, null);
202 * This is called under global lock
204 public void setWriteDir(Path dir) {
215 void insert(MapValue info, long accessTime) {
217 if(VERIFY) verifyAccess();
219 map.put(info.getKey(), info);
220 priorityQueue.put(accessTime, info.getKey());
225 * We have access to ClusterLRU - try to refresh value if available
227 boolean tryRefresh(MapValue info) {
229 if(VERIFY) verifyAccess();
231 if(!info.tryAcquireMutex())
236 priorityQueue.remove(info.getLastAccessTime());
238 map.put(info.getKey(), info);
239 priorityQueue.put(info.getLastAccessTime(), info.getKey());
243 } catch (Throwable t) {
244 throw new IllegalStateException(t);
254 * We have access to MapValue and no access to clusterLRU
256 void refresh(MapValue info, boolean needMutex) {
259 if(!needMutex) verifyAccess();
268 priorityQueue.remove(info.getLastAccessTime());
270 map.put(info.getKey(), info);
271 priorityQueue.put(info.getLastAccessTime(), info.getKey());
273 } catch (Throwable t) {
274 throw new IllegalStateException(t);
285 * Private implementation
290 if(VERIFY) verifyAccess();
292 return priorityQueue.size();
296 boolean swap(MapKey excluded) {
298 if(VERIFY) verifyAccess();
300 return swap(swapTime, swapSize, excluded);
304 boolean swap(long lifeTime, int targetSize, MapKey excluded) {
306 if(VERIFY) verifyAccess();
308 MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
309 if(valueToSwap != null) {
311 if(valueToSwap.tryAcquireMutex()) {
315 if(valueToSwap.canBePersisted()) {
316 valueToSwap.persist();
320 } catch (Throwable t) {
321 throw new IllegalStateException(t);
323 valueToSwap.releaseMutex();
334 private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) {
336 if(VERIFY) verifyAccess();
338 for(int i=0;i<10;i++) {
340 long candidate = getSwapCandidate(lifeTime, targetSize);
341 if(candidate == 0) return null;
343 MapKey key = priorityQueue.remove(candidate);
344 if(key.equals(excluded)) {
345 tryRefresh(map.get(key));
358 private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) {
360 if(VERIFY) verifyAccess();
362 for(int i=0;i<10;i++) {
364 // Lock LRU and get a candidate
365 MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
366 if(value == null) return null;
368 if(value.tryAcquireMutex()) {
372 // This may lock the object
373 if(value.canBePersisted()) return value;
374 // Insert back the value
375 refresh(value, false);
377 } catch (Throwable t) {
378 throw new IllegalStateException(t);
381 value.releaseMutex();
393 private long getSwapCandidate(long lifeTime, int targetSize) {
395 if(VERIFY) verifyAccess();
397 if(priorityQueue.isEmpty()) return 0;
399 long currentTime = System.nanoTime();
400 Long lowest = priorityQueue.firstKey();
402 if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
411 * Tries to persist this object. Can fail if the object cannot be persisted at this time.
414 boolean persist(Object object_) {
416 MapValue object = (MapValue)object_;
418 if(VERIFY) object.verifyAccess();
420 if(object.isDirty()) {
422 // It is possible that this just became unpersistable. Fail here in this case.
423 if(!object.canBePersisted()) {
427 assert(object.isResident());
429 Path f = writeDir.resolve(object.getFileName());
431 WriteRunnable runnable = new WriteRunnable(f, object);
433 synchronized(pending) {
434 WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
435 assert(existing == null);
438 writers.execute(runnable);
440 object.setResident(false);
441 object.setDirty(false);
445 } else if(object.isResident()) {
448 object.setResident(false);
457 int makeResident(Object object_, boolean keepResident) {
459 MapValue object = (MapValue)object_;
461 if(VERIFY) object.verifyAccess();
465 object.setForceResident(keepResident);
467 if(object.isResident()) {
468 refresh(object, true);
472 waitPending(object, true);
474 byte[] data = object.readFile();
476 object.fromFile(data);
477 object.setResident(true);
481 refresh(object, false);
482 swap(swapTime, swapSize, object.getKey());
483 } catch (Throwable t) {
484 throw new IllegalStateException(t);
491 } catch (IOException e) {
501 static int readCounter = 0;
502 static int writeCounter = 0;
504 ScheduledThreadPoolExecutor writers;
506 void waitPending(MapValue value, boolean hasMutex) {
508 WriteRunnable r = null;
509 boolean inProgress = false;
510 synchronized(pending) {
511 r = pending.get(value.getKey().toString());
515 // just being written - just need to wait
526 // System.err.println("reader waits for WriteRunnable to finish");
529 } catch (InterruptedException e) {
533 // System.err.println("reader took WriteRunnable");
535 r.runReally(hasMutex);
536 } catch (Throwable e) {
538 Logger.defaultLogError(e);
545 public class WriteRunnable implements Runnable {
549 boolean committed = false;
550 private Semaphore s = new Semaphore(0);
552 WriteRunnable(Path bytes, MapValue impl) {
563 if(committed) return;
570 } catch (Throwable e) {
572 Logger.defaultLogError(e);
577 public void runReally(boolean hasMutex) throws IOException {
584 // These have been set in method persist
585 assert(!impl.isResident());
586 assert(!impl.isDirty());
590 synchronized(pending) {
591 pending.remove(impl.getKey().toString());
592 s.release(Integer.MAX_VALUE);
603 public Path getDirectory() {
608 * Protected implementation
612 protected void verifyAccess() {
613 // assert (mutex.availablePermits() == 0);
614 if (mutex.availablePermits() != 0)
615 throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
619 * Private implementation