package org.simantics.acorn.lru; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.GraphClientImpl2; import org.simantics.db.common.utils.Logger; /* * The order rule of synchronization for LRU and LRUObject is: * § Always lock LRUObject first! * */ public class LRU> { public static boolean VERIFY = true; final private long swapTime = 5L*1000000000L; final private int swapSize = 200; final private HashMap map = new HashMap(); final private TreeMap priorityQueue = new TreeMap(); final private Semaphore mutex = new Semaphore(1); final private String identifier; private Path writeDir; private Thread mutexOwner; public Map pending = new HashMap(); public LRU(String identifier, Path writeDir) { this.identifier = identifier; this.writeDir = writeDir; resume(); } /* * Public interface */ public void acquireMutex() { try { while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) { System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner); } if(VERIFY) mutexOwner = Thread.currentThread(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } public void releaseMutex() { mutex.release(); mutexOwner = null; } public void shutdown() { if (GraphClientImpl2.DEBUG) System.err.println("Shutting down LRU writers " + writers); writers.shutdown(); try { writers.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } public void resume() { writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, identifier + " File Writer"); } }); if (GraphClientImpl2.DEBUG) System.err.println("Resuming LRU writers " + writers); } /* * This method violates the synchronization order rule between LRU and MapVAlue * External synchronization is used to ensure correct operation */ public void persist(ArrayList state) { acquireMutex(); try { for (MapValue value : values()) { value.acquireMutex(); // for debugging purposes boolean persisted = false; try { // Persist the value if needed persisted = value.persist(); } finally { // WriteRunnable may want to value.releaseMutex(); } // Wait pending if value was actually persisted waitPending(value, false); // Take lock again value.acquireMutex(); try { // Record the value state.add(value.getStateKey()); } catch (Throwable t) { throw new IllegalStateException(t); } finally { value.releaseMutex(); } } } catch (Throwable t) { throw new IllegalStateException(t); } finally { releaseMutex(); } } public MapValue getWithoutMutex(MapKey key) { acquireMutex(); try { return get(key); } catch (Throwable t) { throw new IllegalStateException(t); } finally { releaseMutex(); } } public MapValue get(MapKey key) { if(VERIFY) verifyAccess(); return map.get(key); } public void map(MapValue info) { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); } public Collection values() { if(VERIFY) verifyAccess(); return map.values(); } public boolean swapForced() { acquireMutex(); try { return swap(0, 0, null); } catch (Throwable t) { throw new IllegalStateException(t); } finally { releaseMutex(); } } public boolean swap(long lifeTime, int targetSize) { if(VERIFY) verifyAccess(); return swap(lifeTime, targetSize, null); } /* * This is called under global lock */ public void setWriteDir(Path dir) { this.writeDir = dir; } /* * Package access */ void insert(MapValue info, long accessTime) { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); priorityQueue.put(accessTime, info.getKey()); } /* * We have access to ClusterLRU - try to refresh value if available */ boolean tryRefresh(MapValue info) { if(VERIFY) verifyAccess(); if(!info.tryAcquireMutex()) return false; try { priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); return true; } catch (Throwable t) { throw new IllegalStateException(t); } finally { info.releaseMutex(); } } /* * We have access to MapValue and no access to clusterLRU */ void refresh(MapValue info, boolean needMutex) { if(VERIFY) { if(!needMutex) verifyAccess(); info.verifyAccess(); } if(needMutex) acquireMutex(); try { priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); } catch (Throwable t) { throw new IllegalStateException(t); } finally { if(needMutex) releaseMutex(); } } /* * Private implementation */ public int size() { if(VERIFY) verifyAccess(); return priorityQueue.size(); } boolean swap(MapKey excluded) { if(VERIFY) verifyAccess(); return swap(swapTime, swapSize, excluded); } boolean swap(long lifeTime, int targetSize, MapKey excluded) { if(VERIFY) verifyAccess(); MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded); if(valueToSwap != null) { if(valueToSwap.tryAcquireMutex()) { try { if(valueToSwap.canBePersisted()) { valueToSwap.persist(); return true; } } catch (Throwable t) { throw new IllegalStateException(t); } finally { valueToSwap.releaseMutex(); } } } return false; } private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) { if(VERIFY) verifyAccess(); for(int i=0;i<10;i++) { long candidate = getSwapCandidate(lifeTime, targetSize); if(candidate == 0) return null; MapKey key = priorityQueue.remove(candidate); if(key.equals(excluded)) { tryRefresh(map.get(key)); continue; } return map.get(key); } return null; } private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) { if(VERIFY) verifyAccess(); for(int i=0;i<10;i++) { // Lock LRU and get a candidate MapValue value = getValueToSwap1(lifeTime, targetSize, excluded); if(value == null) return null; if(value.tryAcquireMutex()) { try { // This may lock the object if(value.canBePersisted()) return value; // Insert back the value refresh(value, false); } catch (Throwable t) { throw new IllegalStateException(t); } finally { value.releaseMutex(); } } } return null; } private long getSwapCandidate(long lifeTime, int targetSize) { if(VERIFY) verifyAccess(); if(priorityQueue.isEmpty()) return 0; long currentTime = System.nanoTime(); Long lowest = priorityQueue.firstKey(); if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) { return lowest; } return 0; } /* * Tries to persist this object. Can fail if the object cannot be persisted at this time. * */ boolean persist(Object object_) { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); if(object.isDirty()) { // It is possible that this just became unpersistable. Fail here in this case. if(!object.canBePersisted()) { return false; } assert(object.isResident()); Path f = writeDir.resolve(object.getFileName()); WriteRunnable runnable = new WriteRunnable(f, object); synchronized(pending) { WriteRunnable existing = pending.put(object.getKey().toString(), runnable); assert(existing == null); } writers.execute(runnable); object.setResident(false); object.setDirty(false); return true; } else if(object.isResident()) { object.release(); object.setResident(false); return false; } return false; } int makeResident(Object object_, boolean keepResident) { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); try { object.setForceResident(keepResident); if(object.isResident()) { refresh(object, true); return 0; } waitPending(object, true); byte[] data = object.readFile(); object.fromFile(data); object.setResident(true); acquireMutex(); try { refresh(object, false); swap(swapTime, swapSize, object.getKey()); } catch (Throwable t) { throw new IllegalStateException(t); } finally { releaseMutex(); } return data.length; } catch (IOException e) { e.printStackTrace(); } return 0; } static int readCounter = 0; static int writeCounter = 0; ScheduledThreadPoolExecutor writers; void waitPending(MapValue value, boolean hasMutex) { WriteRunnable r = null; boolean inProgress = false; synchronized(pending) { r = pending.get(value.getKey().toString()); if(r != null) { synchronized(r) { if(r.committed) { // just being written - just need to wait inProgress = true; } else { r.committed = true; // we do the writing } } } } if(r != null) { if(inProgress) { // System.err.println("reader waits for WriteRunnable to finish"); try { r.s.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } } else { // System.err.println("reader took WriteRunnable"); try { r.runReally(hasMutex); } catch (Throwable e) { e.printStackTrace(); Logger.defaultLogError(e); } } } } public class WriteRunnable implements Runnable { Path bytes; MapValue impl; boolean committed = false; private Semaphore s = new Semaphore(0); WriteRunnable(Path bytes, MapValue impl) { this.bytes = bytes; this.impl = impl; } @Override public void run() { synchronized(impl) { synchronized(this) { if(committed) return; committed = true; } try { runReally(false); } catch (Throwable e) { e.printStackTrace(); Logger.defaultLogError(e); } } } public void runReally(boolean hasMutex) throws IOException { if(!hasMutex) impl.acquireMutex(); try { // These have been set in method persist assert(!impl.isResident()); assert(!impl.isDirty()); impl.toFile(bytes); synchronized(pending) { pending.remove(impl.getKey().toString()); s.release(Integer.MAX_VALUE); } } finally { if(!hasMutex) impl.releaseMutex(); } } } public Path getDirectory() { return writeDir; } /* * Protected implementation * */ protected void verifyAccess() { // assert (mutex.availablePermits() == 0); if (mutex.availablePermits() != 0) throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); } /* * Private implementation * */ }