package org.simantics.acorn.lru; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.ClusterManager; import org.simantics.acorn.GraphClientImpl2; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.db.common.utils.Logger; /* * The order rule of synchronization for LRU and LRUObject is: * § Always lock LRUObject first! * */ public class LRU> { public static boolean VERIFY = true; final private long swapTime = 5L*1000000000L; final private int swapSize = 200; final private HashMap map = new HashMap(); final private TreeMap priorityQueue = new TreeMap(); final private Semaphore mutex = new Semaphore(1); final private String identifier; private Path writeDir; private Thread mutexOwner; public Map pending = new HashMap(); protected final ClusterManager manager; public LRU(ClusterManager manager, String identifier, Path writeDir) { this.manager = manager; this.identifier = identifier; this.writeDir = writeDir; resume(); } /* * Public interface */ public void acquireMutex() throws IllegalAcornStateException { try { while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) { System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner); } if(VERIFY) mutexOwner = Thread.currentThread(); } catch (InterruptedException e) { throw new IllegalAcornStateException(e); } } public void releaseMutex() { mutex.release(); mutexOwner = null; } public void shutdown() { if (GraphClientImpl2.DEBUG) System.err.println("Shutting down LRU writers " + writers); writers.shutdown(); try { writers.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } public void resume() { writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, identifier + " File Writer"); } }); if (GraphClientImpl2.DEBUG) System.err.println("Resuming LRU writers " + writers); } /* * This method violates the synchronization order rule between LRU and MapVAlue * External synchronization is used to ensure correct operation */ public void persist(ArrayList state) throws IllegalAcornStateException { acquireMutex(); try { for (MapValue value : values()) { value.acquireMutex(); // for debugging purposes boolean persisted = false; try { // Persist the value if needed persisted = value.persist(); } finally { // WriteRunnable may want to value.releaseMutex(); } // Wait pending if value was actually persisted waitPending(value, false); // Take lock again value.acquireMutex(); try { // Record the value state.add(value.getStateKey()); } finally { value.releaseMutex(); } } } catch (IllegalAcornStateException e) { throw e; } catch (IOException e) { throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e); } catch (Throwable t) { throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t); } finally { releaseMutex(); } } public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException { acquireMutex(); try { return get(key); } finally { releaseMutex(); } } public MapValue get(MapKey key) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.get(key); } public void map(MapValue info) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); } public Collection values() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.values(); } public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException { acquireMutex(); try { return swap(0, 0, null); } finally { releaseMutex(); } } public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); return swap(lifeTime, targetSize, null); } /* * This is called under global lock */ public void setWriteDir(Path dir) { this.writeDir = dir; } /* * Package access */ void insert(MapValue info, long accessTime) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); priorityQueue.put(accessTime, info.getKey()); } /* * We have access to ClusterLRU - try to refresh value if available */ boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); if(!info.tryAcquireMutex()) return false; try { priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); return true; } finally { info.releaseMutex(); } } /* * We have access to MapValue and no access to clusterLRU */ void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) { if(!needMutex) verifyAccess(); info.verifyAccess(); } if(needMutex) acquireMutex(); try { priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); } catch (AcornAccessVerificationException e) { throw e; } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { if(needMutex) releaseMutex(); } } /* * Private implementation */ int size() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return priorityQueue.size(); } boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); return swap(swapTime, swapSize, excluded); } boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded); if(valueToSwap != null) { if(valueToSwap.tryAcquireMutex()) { try { if(valueToSwap.canBePersisted()) { valueToSwap.persist(); return true; } } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { valueToSwap.releaseMutex(); } } } return false; } private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); for(int i=0;i<10;i++) { long candidate = getSwapCandidate(lifeTime, targetSize); if(candidate == 0) return null; MapKey key = priorityQueue.remove(candidate); if(key.equals(excluded)) { tryRefresh(map.get(key)); continue; } return map.get(key); } return null; } private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); for(int i=0;i<10;i++) { // Lock LRU and get a candidate MapValue value = getValueToSwap1(lifeTime, targetSize, excluded); if(value == null) return null; if(value.tryAcquireMutex()) { try { // This may lock the object if(value.canBePersisted()) return value; // Insert back the value refresh(value, false); } finally { value.releaseMutex(); } } } return null; } private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); if(priorityQueue.isEmpty()) return 0; long currentTime = System.nanoTime(); Long lowest = priorityQueue.firstKey(); if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) { return lowest; } return 0; } /* * Tries to persist this object. Can fail if the object cannot be persisted at this time. * */ boolean persist(Object object_) throws AcornAccessVerificationException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); if(object.isDirty()) { // It is possible that this just became unpersistable. Fail here in this case. if(!object.canBePersisted()) { return false; } assert(object.isResident()); Path f = writeDir.resolve(object.getFileName()); WriteRunnable runnable = new WriteRunnable(f, object); synchronized(pending) { WriteRunnable existing = pending.put(object.getKey().toString(), runnable); assert(existing == null); } writers.execute(runnable); object.setResident(false); object.setDirty(false); return true; } else if(object.isResident()) { object.release(); object.setResident(false); return false; } return false; } int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); try { object.setForceResident(keepResident); if(object.isResident()) { refresh(object, true); return 0; } waitPending(object, true); byte[] data = object.readFile(); object.fromFile(data); object.setResident(true); acquireMutex(); try { refresh(object, false); swap(swapTime, swapSize, object.getKey()); } finally { releaseMutex(); } return data.length; } catch (IOException e) { throw new IllegalAcornStateException("Unable to makeResident " + identifier, e); } } static int readCounter = 0; static int writeCounter = 0; ScheduledThreadPoolExecutor writers; void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException { WriteRunnable runnable = null; boolean inProgress = false; synchronized(pending) { runnable = pending.get(value.getKey().toString()); if(runnable != null) { synchronized(runnable) { if(runnable.committed) { // just being written - just need to wait inProgress = true; } else { runnable.committed = true; // we do the writing } } } } if(runnable != null) { if(inProgress) { // System.err.println("reader waits for WriteRunnable to finish"); try { if(hasMutex) { runnable.borrowMutex = true; } runnable.s.acquire(); } catch (InterruptedException e) { throw new IllegalAcornStateException(e); } } else { // System.err.println("reader took WriteRunnable"); runnable.runReally(hasMutex); } } } public class WriteRunnable implements Runnable { private Path bytes; private MapValue impl; private boolean committed = false; private boolean borrowMutex = false; private Semaphore s = new Semaphore(0); WriteRunnable(Path bytes, MapValue impl) { this.bytes = bytes; this.impl = impl; } @Override public void run() { try { synchronized(impl) { synchronized(this) { if(committed) return; committed = true; } runReally(false); } } catch (Throwable t) { if (t instanceof IllegalAcornStateException) { manager.notSafeToMakeSnapshot((IllegalAcornStateException)t); } else { manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t)); } t.printStackTrace(); Logger.defaultLogError(t); } } public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { try { // These have been set in method persist assert (!impl.isResident()); assert (!impl.isDirty()); impl.toFile(bytes); } finally { synchronized (pending) { pending.remove(impl.getKey().toString()); s.release(Integer.MAX_VALUE); } } } // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of // borrowMutex public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException { if (hasMutex) { runWithMutex(); } else { boolean gotMutex = impl.tryAcquireMutex(); boolean done = false; while (!done) { if (gotMutex || borrowMutex) { runWithMutex(); done = true; } else { System.err.println("Retry mutex acquire"); gotMutex = impl.tryAcquireMutex(); } } if (gotMutex) impl.releaseMutex(); } } } public Path getDirectory() { return writeDir; } /* * Protected implementation * */ protected void verifyAccess() throws AcornAccessVerificationException { if (mutex.availablePermits() != 0) throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); } /* * Private implementation * */ }