X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2Flru%2FLRU.java;h=48b7d4b10a9b34c80e9a68c7b536ef3840e48867;hp=508127db9f6f538718d13e20cc3d98de71b1662c;hb=d0a7cc0ee9ab6d9080daf105b04583e523a62e87;hpb=358e725094fd65a92800cdea4d77d9874be022d8 diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java index 508127db9..48b7d4b10 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java @@ -12,17 +12,23 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.ClusterManager; import org.simantics.acorn.GraphClientImpl2; -import org.simantics.db.common.utils.Logger; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * The order rule of synchronization for LRU and LRUObject is: - * § Always lock LRUObject first! + * � Always lock LRUObject first! * */ public class LRU> { - + + private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class); + public static boolean VERIFY = true; final private long swapTime = 5L*1000000000L; @@ -39,8 +45,11 @@ public class LRU> { private Thread mutexOwner; public Map pending = new HashMap(); + + protected final ClusterManager manager; - public LRU(String identifier, Path writeDir) { + public LRU(ClusterManager manager, String identifier, Path writeDir) { + this.manager = manager; this.identifier = identifier; this.writeDir = writeDir; resume(); @@ -50,19 +59,15 @@ public class LRU> { * Public interface */ - public void acquireMutex() { - + public void acquireMutex() throws IllegalAcornStateException { try { - while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) { System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner); } - if(VERIFY) mutexOwner = Thread.currentThread(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + throw new IllegalAcornStateException(e); } } @@ -90,7 +95,6 @@ public class LRU> { public Thread newThread(Runnable r) { return new Thread(r, identifier + " File Writer"); } - }); if (GraphClientImpl2.DEBUG) System.err.println("Resuming LRU writers " + writers); @@ -100,12 +104,10 @@ public class LRU> { * This method violates the synchronization order rule between LRU and MapVAlue * External synchronization is used to ensure correct operation */ - public void persist(ArrayList state) { + public void persist(ArrayList state) throws IllegalAcornStateException { acquireMutex(); - try { - for (MapValue value : values()) { value.acquireMutex(); // for debugging purposes @@ -124,78 +126,75 @@ public class LRU> { try { // Record the value state.add(value.getStateKey()); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { value.releaseMutex(); } } - + } catch (IllegalAcornStateException e) { + throw e; + } catch (IOException e) { + throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e); } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t); } finally { releaseMutex(); } - } - public MapValue getWithoutMutex(MapKey key) { + public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException { acquireMutex(); try { return get(key); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } - } - public MapValue get(MapKey key) { + + + public MapValue purge(MapKey id) { + return map.remove(id); + } + + public MapValue get(MapKey key) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.get(key); - } - public void map(MapValue info) { + public void map(MapValue info) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); - } - public Collection values() { + public Collection values() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.values(); - } - public boolean swapForced() { + public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException { acquireMutex(); try { return swap(0, 0, null); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } } - public boolean swap(long lifeTime, int targetSize) { + public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); return swap(lifeTime, targetSize, null); - } /* @@ -204,7 +203,6 @@ public class LRU> { public void setWriteDir(Path dir) { this.writeDir = dir; - } @@ -212,19 +210,18 @@ public class LRU> { * Package access */ - void insert(MapValue info, long accessTime) { + void insert(MapValue info, long accessTime) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); priorityQueue.put(accessTime, info.getKey()); - } /* * We have access to ClusterLRU - try to refresh value if available */ - boolean tryRefresh(MapValue info) { + boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -232,28 +229,20 @@ public class LRU> { return false; try { - priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); - return true; - - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { - info.releaseMutex(); - } - } /* * We have access to MapValue and no access to clusterLRU */ - void refresh(MapValue info, boolean needMutex) { + void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) { if(!needMutex) verifyAccess(); @@ -270,38 +259,31 @@ public class LRU> { map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); + } catch (AcornAccessVerificationException e) { + throw e; } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { - if(needMutex) releaseMutex(); - } - } /* * Private implementation */ - public int size() { - + int size() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); - return priorityQueue.size(); - } - boolean swap(MapKey excluded) { - + boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); - return swap(swapTime, swapSize, excluded); - } - boolean swap(long lifeTime, int targetSize, MapKey excluded) { + boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -309,29 +291,22 @@ public class LRU> { if(valueToSwap != null) { if(valueToSwap.tryAcquireMutex()) { - try { - if(valueToSwap.canBePersisted()) { valueToSwap.persist(); return true; } - } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { valueToSwap.releaseMutex(); } } - } - return false; - } - - private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) { + private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -347,15 +322,12 @@ public class LRU> { } return map.get(key); - } - return null; - } - private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) { + private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -368,29 +340,20 @@ public class LRU> { if(value.tryAcquireMutex()) { try { - // This may lock the object - if(value.canBePersisted()) return value; + if(value.canBePersisted()) + return value; // Insert back the value refresh(value, false); - - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { - value.releaseMutex(); - } - } - } - return null; - } - private long getSwapCandidate(long lifeTime, int targetSize) { + private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); @@ -411,14 +374,13 @@ public class LRU> { * Tries to persist this object. Can fail if the object cannot be persisted at this time. * */ - boolean persist(Object object_) { + boolean persist(Object object_) throws AcornAccessVerificationException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); if(object.isDirty()) { - // It is possible that this just became unpersistable. Fail here in this case. if(!object.canBePersisted()) { return false; @@ -447,21 +409,17 @@ public class LRU> { object.release(); object.setResident(false); return false; - } - return false; - } - int makeResident(Object object_, boolean keepResident) { + int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); try { - object.setForceResident(keepResident); if(object.isResident()) { @@ -480,22 +438,13 @@ public class LRU> { try { refresh(object, false); swap(swapTime, swapSize, object.getKey()); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } - return data.length; - } catch (IOException e) { - - e.printStackTrace(); - + throw new IllegalAcornStateException("Unable to makeResident " + identifier, e); } - - return 0; - } static int readCounter = 0; @@ -503,50 +452,48 @@ public class LRU> { ScheduledThreadPoolExecutor writers; - void waitPending(MapValue value, boolean hasMutex) { + void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException { - WriteRunnable r = null; + WriteRunnable runnable = null; boolean inProgress = false; synchronized(pending) { - r = pending.get(value.getKey().toString()); - if(r != null) { - synchronized(r) { - if(r.committed) { + runnable = pending.get(value.getKey().toString()); + if(runnable != null) { + synchronized(runnable) { + if(runnable.committed) { // just being written - just need to wait inProgress = true; } else { - r.committed = true; + runnable.committed = true; // we do the writing } } } } - if(r != null) { + if(runnable != null) { if(inProgress) { // System.err.println("reader waits for WriteRunnable to finish"); try { - r.s.acquire(); + if(hasMutex) { + runnable.borrowMutex = true; + } + runnable.s.acquire(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new IllegalAcornStateException(e); } } else { // System.err.println("reader took WriteRunnable"); - try { - r.runReally(hasMutex); - } catch (Throwable e) { - e.printStackTrace(); - Logger.defaultLogError(e); - } + runnable.runReally(hasMutex); } } - } public class WriteRunnable implements Runnable { - Path bytes; - MapValue impl; - boolean committed = false; + private Path bytes; + private MapValue impl; + private boolean committed = false; + private boolean borrowMutex = false; private Semaphore s = new Semaphore(0); WriteRunnable(Path bytes, MapValue impl) { @@ -556,48 +503,91 @@ public class LRU> { @Override public void run() { - synchronized(impl) { - - synchronized(this) { - - if(committed) return; - - committed = true; - - } - try { - runReally(false); - } catch (Throwable e) { - e.printStackTrace(); - Logger.defaultLogError(e); - } - } + try { + synchronized(impl) { + + synchronized(this) { + + if(committed) + return; + + committed = true; + } + runReally(false); + } + } catch (Throwable t) { + if (t instanceof IllegalAcornStateException) { + manager.notSafeToMakeSnapshot((IllegalAcornStateException)t); + } else { + manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t)); + } + t.printStackTrace(); + LOGGER.error("Exception happened in WriteRunnable.run", t); + } } - public void runReally(boolean hasMutex) throws IOException { - - if(!hasMutex) - impl.acquireMutex(); - - try { - - // These have been set in method persist - assert(!impl.isResident()); - assert(!impl.isDirty()); - - impl.toFile(bytes); - - synchronized(pending) { - pending.remove(impl.getKey().toString()); - s.release(Integer.MAX_VALUE); - } - } finally { - if(!hasMutex) - impl.releaseMutex(); - } - - } - + public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + try { + // These have been set in method persist + assert (!impl.isResident()); + assert (!impl.isDirty()); + + impl.toFile(bytes); + } finally { + synchronized (pending) { + pending.remove(impl.getKey().toString()); + s.release(Integer.MAX_VALUE); + } + } + + } + + // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of + // borrowMutex + public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + if (hasMutex) { + + runWithMutex(); + + } else { + + boolean gotMutex = impl.tryAcquireMutex(); + + boolean done = false; + int count = 0; + long startTime = 0; + while (!done) { + + if (gotMutex || borrowMutex) { + runWithMutex(); + done = true; + } else { + if (count % 10 == 0) { + // Taking too long, sleep for a while. + LOGGER.warn("Retry mutex acquire"); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + gotMutex = impl.tryAcquireMutex(); + long currentTime = System.currentTimeMillis(); + if ((currentTime - startTime) > 10) { + startTime = currentTime; + count++; + } + } + + } + + if (gotMutex) + impl.releaseMutex(); + + } + + } } public Path getDirectory() { @@ -609,10 +599,9 @@ public class LRU> { * */ - protected void verifyAccess() { -// assert (mutex.availablePermits() == 0); + protected void verifyAccess() throws AcornAccessVerificationException { if (mutex.availablePermits() != 0) - throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); + throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); } /*