X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2Flru%2FLRU.java;h=323d66d3df12ea34d2d429293801eeacfc95a23b;hp=508127db9f6f538718d13e20cc3d98de71b1662c;hb=d9a9d77c7024260e3e3b3a8558ddc84b2a8b99b3;hpb=0144ba232323a0e4f7ec8fe0681a150faafd7caf diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java index 508127db9..323d66d3d 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java @@ -12,7 +12,10 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.ClusterManager; import org.simantics.acorn.GraphClientImpl2; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.db.common.utils.Logger; /* @@ -39,8 +42,11 @@ public class LRU> { private Thread mutexOwner; public Map pending = new HashMap(); + + protected final ClusterManager manager; - public LRU(String identifier, Path writeDir) { + public LRU(ClusterManager manager, String identifier, Path writeDir) { + this.manager = manager; this.identifier = identifier; this.writeDir = writeDir; resume(); @@ -50,19 +56,15 @@ public class LRU> { * Public interface */ - public void acquireMutex() { - + public void acquireMutex() throws IllegalAcornStateException { try { - while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) { System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner); } - if(VERIFY) mutexOwner = Thread.currentThread(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + throw new IllegalAcornStateException(e); } } @@ -90,7 +92,6 @@ public class LRU> { public Thread newThread(Runnable r) { return new Thread(r, identifier + " File Writer"); } - }); if (GraphClientImpl2.DEBUG) System.err.println("Resuming LRU writers " + writers); @@ -100,12 +101,10 @@ public class LRU> { * This method violates the synchronization order rule between LRU and MapVAlue * External synchronization is used to ensure correct operation */ - public void persist(ArrayList state) { + public void persist(ArrayList state) throws IllegalAcornStateException { acquireMutex(); - try { - for (MapValue value : values()) { value.acquireMutex(); // for debugging purposes @@ -124,78 +123,69 @@ public class LRU> { try { // Record the value state.add(value.getStateKey()); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { value.releaseMutex(); } } - + } catch (IllegalAcornStateException e) { + throw e; + } catch (IOException e) { + throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e); } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t); } finally { releaseMutex(); } - } - public MapValue getWithoutMutex(MapKey key) { + public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException { acquireMutex(); try { return get(key); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } - } - public MapValue get(MapKey key) { + public MapValue get(MapKey key) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.get(key); - } - public void map(MapValue info) { + public void map(MapValue info) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); - } - public Collection values() { + public Collection values() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); return map.values(); - } - public boolean swapForced() { + public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException { acquireMutex(); try { return swap(0, 0, null); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } } - public boolean swap(long lifeTime, int targetSize) { + public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); return swap(lifeTime, targetSize, null); - } /* @@ -204,7 +194,6 @@ public class LRU> { public void setWriteDir(Path dir) { this.writeDir = dir; - } @@ -212,19 +201,18 @@ public class LRU> { * Package access */ - void insert(MapValue info, long accessTime) { + void insert(MapValue info, long accessTime) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); map.put(info.getKey(), info); priorityQueue.put(accessTime, info.getKey()); - } /* * We have access to ClusterLRU - try to refresh value if available */ - boolean tryRefresh(MapValue info) { + boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -232,28 +220,20 @@ public class LRU> { return false; try { - priorityQueue.remove(info.getLastAccessTime()); info.accessed(); map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); - return true; - - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { - info.releaseMutex(); - } - } /* * We have access to MapValue and no access to clusterLRU */ - void refresh(MapValue info, boolean needMutex) { + void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) { if(!needMutex) verifyAccess(); @@ -270,38 +250,31 @@ public class LRU> { map.put(info.getKey(), info); priorityQueue.put(info.getLastAccessTime(), info.getKey()); + } catch (AcornAccessVerificationException e) { + throw e; } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { - if(needMutex) releaseMutex(); - } - } /* * Private implementation */ - public int size() { - + int size() throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); - return priorityQueue.size(); - } - boolean swap(MapKey excluded) { - + boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); - return swap(swapTime, swapSize, excluded); - } - boolean swap(long lifeTime, int targetSize, MapKey excluded) { + boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -309,29 +282,22 @@ public class LRU> { if(valueToSwap != null) { if(valueToSwap.tryAcquireMutex()) { - try { - if(valueToSwap.canBePersisted()) { valueToSwap.persist(); return true; } - } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { valueToSwap.releaseMutex(); } } - } - return false; - } - - private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) { + private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -347,15 +313,12 @@ public class LRU> { } return map.get(key); - } - return null; - } - private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) { + private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException { if(VERIFY) verifyAccess(); @@ -368,29 +331,20 @@ public class LRU> { if(value.tryAcquireMutex()) { try { - // This may lock the object - if(value.canBePersisted()) return value; + if(value.canBePersisted()) + return value; // Insert back the value refresh(value, false); - - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { - value.releaseMutex(); - } - } - } - return null; - } - private long getSwapCandidate(long lifeTime, int targetSize) { + private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException { if(VERIFY) verifyAccess(); @@ -411,14 +365,13 @@ public class LRU> { * Tries to persist this object. Can fail if the object cannot be persisted at this time. * */ - boolean persist(Object object_) { + boolean persist(Object object_) throws AcornAccessVerificationException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); if(object.isDirty()) { - // It is possible that this just became unpersistable. Fail here in this case. if(!object.canBePersisted()) { return false; @@ -447,21 +400,17 @@ public class LRU> { object.release(); object.setResident(false); return false; - } - return false; - } - int makeResident(Object object_, boolean keepResident) { + int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException { MapValue object = (MapValue)object_; if(VERIFY) object.verifyAccess(); try { - object.setForceResident(keepResident); if(object.isResident()) { @@ -480,22 +429,13 @@ public class LRU> { try { refresh(object, false); swap(swapTime, swapSize, object.getKey()); - } catch (Throwable t) { - throw new IllegalStateException(t); } finally { releaseMutex(); } - return data.length; - } catch (IOException e) { - - e.printStackTrace(); - + throw new IllegalAcornStateException("Unable to makeResident " + identifier, e); } - - return 0; - } static int readCounter = 0; @@ -503,50 +443,48 @@ public class LRU> { ScheduledThreadPoolExecutor writers; - void waitPending(MapValue value, boolean hasMutex) { + void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException { - WriteRunnable r = null; + WriteRunnable runnable = null; boolean inProgress = false; synchronized(pending) { - r = pending.get(value.getKey().toString()); - if(r != null) { - synchronized(r) { - if(r.committed) { + runnable = pending.get(value.getKey().toString()); + if(runnable != null) { + synchronized(runnable) { + if(runnable.committed) { // just being written - just need to wait inProgress = true; } else { - r.committed = true; + runnable.committed = true; // we do the writing } } } } - if(r != null) { + if(runnable != null) { if(inProgress) { // System.err.println("reader waits for WriteRunnable to finish"); try { - r.s.acquire(); + if(hasMutex) { + runnable.borrowMutex = true; + } + runnable.s.acquire(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new IllegalAcornStateException(e); } } else { // System.err.println("reader took WriteRunnable"); - try { - r.runReally(hasMutex); - } catch (Throwable e) { - e.printStackTrace(); - Logger.defaultLogError(e); - } + runnable.runReally(hasMutex); } } - } public class WriteRunnable implements Runnable { - Path bytes; - MapValue impl; - boolean committed = false; + private Path bytes; + private MapValue impl; + private boolean committed = false; + private boolean borrowMutex = false; private Semaphore s = new Semaphore(0); WriteRunnable(Path bytes, MapValue impl) { @@ -556,48 +494,77 @@ public class LRU> { @Override public void run() { - synchronized(impl) { - - synchronized(this) { - - if(committed) return; - - committed = true; - - } - try { - runReally(false); - } catch (Throwable e) { - e.printStackTrace(); - Logger.defaultLogError(e); - } - } + try { + synchronized(impl) { + + synchronized(this) { + + if(committed) + return; + + committed = true; + } + runReally(false); + } + } catch (Throwable t) { + if (t instanceof IllegalAcornStateException) { + manager.notSafeToMakeSnapshot((IllegalAcornStateException)t); + } else { + manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t)); + } + t.printStackTrace(); + Logger.defaultLogError(t); + } } - public void runReally(boolean hasMutex) throws IOException { - - if(!hasMutex) - impl.acquireMutex(); - - try { - - // These have been set in method persist - assert(!impl.isResident()); - assert(!impl.isDirty()); - - impl.toFile(bytes); - - synchronized(pending) { - pending.remove(impl.getKey().toString()); - s.release(Integer.MAX_VALUE); - } - } finally { - if(!hasMutex) - impl.releaseMutex(); - } - - } - + public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + try { + // These have been set in method persist + assert (!impl.isResident()); + assert (!impl.isDirty()); + + impl.toFile(bytes); + } finally { + synchronized (pending) { + pending.remove(impl.getKey().toString()); + s.release(Integer.MAX_VALUE); + } + } + + } + + // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of + // borrowMutex + public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + if (hasMutex) { + + runWithMutex(); + + } else { + + boolean gotMutex = impl.tryAcquireMutex(); + + boolean done = false; + while (!done) { + + if (gotMutex || borrowMutex) { + runWithMutex(); + done = true; + } else { + System.err.println("Retry mutex acquire"); + gotMutex = impl.tryAcquireMutex(); + } + + } + + if (gotMutex) + impl.releaseMutex(); + + } + + } } public Path getDirectory() { @@ -609,10 +576,9 @@ public class LRU> { * */ - protected void verifyAccess() { -// assert (mutex.availablePermits() == 0); + protected void verifyAccess() throws AcornAccessVerificationException { if (mutex.availablePermits() != 0) - throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); + throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner); } /*