import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.ClusterManager;
import org.simantics.acorn.GraphClientImpl2;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.db.common.utils.Logger;
/*
private Thread mutexOwner;
public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
+
+ protected final ClusterManager manager;
- public LRU(String identifier, Path writeDir) {
+ public LRU(ClusterManager manager, String identifier, Path writeDir) {
+ this.manager = manager;
this.identifier = identifier;
this.writeDir = writeDir;
resume();
* Public interface
*/
- public void acquireMutex() {
-
+ public void acquireMutex() throws IllegalAcornStateException {
try {
-
while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
}
-
if(VERIFY)
mutexOwner = Thread.currentThread();
-
} catch (InterruptedException e) {
- throw new IllegalStateException(e);
+ throw new IllegalAcornStateException(e);
}
}
public Thread newThread(Runnable r) {
return new Thread(r, identifier + " File Writer");
}
-
});
if (GraphClientImpl2.DEBUG)
System.err.println("Resuming LRU writers " + writers);
* This method violates the synchronization order rule between LRU and MapVAlue
* External synchronization is used to ensure correct operation
*/
- public void persist(ArrayList<String> state) {
+ public void persist(ArrayList<String> state) throws IllegalAcornStateException {
acquireMutex();
-
try {
-
for (MapValue value : values()) {
value.acquireMutex();
// for debugging purposes
try {
// Record the value
state.add(value.getStateKey());
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
value.releaseMutex();
}
}
-
+ } catch (IllegalAcornStateException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new IllegalAcornStateException("Unable to waitPending for " + this.identifier, e);
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException("Fatal error occured for " + this.identifier, t);
} finally {
releaseMutex();
}
-
}
- public MapValue getWithoutMutex(MapKey key) {
+ public MapValue getWithoutMutex(MapKey key) throws AcornAccessVerificationException, IllegalAcornStateException {
acquireMutex();
try {
return get(key);
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
releaseMutex();
}
-
}
- public MapValue get(MapKey key) {
+ public MapValue get(MapKey key) throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
return map.get(key);
-
}
- public void map(MapValue info) {
+ public void map(MapValue info) throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
map.put(info.getKey(), info);
-
}
- public Collection<MapValue> values() {
+ public Collection<MapValue> values() throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
return map.values();
-
}
- public boolean swapForced() {
+ public boolean swapForced() throws IllegalAcornStateException, AcornAccessVerificationException {
acquireMutex();
try {
return swap(0, 0, null);
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
releaseMutex();
}
}
- public boolean swap(long lifeTime, int targetSize) {
+ public boolean swap(long lifeTime, int targetSize) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
return swap(lifeTime, targetSize, null);
-
}
/*
public void setWriteDir(Path dir) {
this.writeDir = dir;
-
}
* Package access
*/
- void insert(MapValue info, long accessTime) {
+ void insert(MapValue info, long accessTime) throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
map.put(info.getKey(), info);
priorityQueue.put(accessTime, info.getKey());
-
}
/*
* We have access to ClusterLRU - try to refresh value if available
*/
- boolean tryRefresh(MapValue info) {
+ boolean tryRefresh(MapValue info) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
return false;
try {
-
priorityQueue.remove(info.getLastAccessTime());
info.accessed();
map.put(info.getKey(), info);
priorityQueue.put(info.getLastAccessTime(), info.getKey());
-
return true;
-
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
-
info.releaseMutex();
-
}
-
}
/*
* We have access to MapValue and no access to clusterLRU
*/
- void refresh(MapValue info, boolean needMutex) {
+ void refresh(MapValue info, boolean needMutex) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) {
if(!needMutex) verifyAccess();
map.put(info.getKey(), info);
priorityQueue.put(info.getLastAccessTime(), info.getKey());
+ } catch (AcornAccessVerificationException e) {
+ throw e;
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
-
if(needMutex)
releaseMutex();
-
}
-
}
/*
* Private implementation
*/
- public int size() {
-
+ int size() throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
-
return priorityQueue.size();
-
}
- boolean swap(MapKey excluded) {
-
+ boolean swap(MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
-
return swap(swapTime, swapSize, excluded);
-
}
- boolean swap(long lifeTime, int targetSize, MapKey excluded) {
+ boolean swap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
if(valueToSwap != null) {
if(valueToSwap.tryAcquireMutex()) {
-
try {
-
if(valueToSwap.canBePersisted()) {
valueToSwap.persist();
return true;
}
-
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
valueToSwap.releaseMutex();
}
}
-
}
-
return false;
-
}
-
- private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) {
+ private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
}
return map.get(key);
-
}
-
return null;
-
}
- private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) {
+ private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) throws AcornAccessVerificationException, IllegalAcornStateException {
if(VERIFY) verifyAccess();
if(value.tryAcquireMutex()) {
try {
-
// This may lock the object
- if(value.canBePersisted()) return value;
+ if(value.canBePersisted())
+ return value;
// Insert back the value
refresh(value, false);
-
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
-
value.releaseMutex();
-
}
-
}
-
}
-
return null;
-
}
- private long getSwapCandidate(long lifeTime, int targetSize) {
+ private long getSwapCandidate(long lifeTime, int targetSize) throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
* Tries to persist this object. Can fail if the object cannot be persisted at this time.
*
*/
- boolean persist(Object object_) {
+ boolean persist(Object object_) throws AcornAccessVerificationException {
MapValue object = (MapValue)object_;
if(VERIFY) object.verifyAccess();
if(object.isDirty()) {
-
// It is possible that this just became unpersistable. Fail here in this case.
if(!object.canBePersisted()) {
return false;
object.release();
object.setResident(false);
return false;
-
}
-
return false;
-
}
- int makeResident(Object object_, boolean keepResident) {
+ int makeResident(Object object_, boolean keepResident) throws AcornAccessVerificationException, IllegalAcornStateException {
MapValue object = (MapValue)object_;
if(VERIFY) object.verifyAccess();
try {
-
object.setForceResident(keepResident);
if(object.isResident()) {
try {
refresh(object, false);
swap(swapTime, swapSize, object.getKey());
- } catch (Throwable t) {
- throw new IllegalStateException(t);
} finally {
releaseMutex();
}
-
return data.length;
-
} catch (IOException e) {
-
- e.printStackTrace();
-
+ throw new IllegalAcornStateException("Unable to makeResident " + identifier, e);
}
-
- return 0;
-
}
static int readCounter = 0;
ScheduledThreadPoolExecutor writers;
- void waitPending(MapValue value, boolean hasMutex) {
+ void waitPending(MapValue value, boolean hasMutex) throws IOException, AcornAccessVerificationException, IllegalAcornStateException {
- WriteRunnable r = null;
+ WriteRunnable runnable = null;
boolean inProgress = false;
synchronized(pending) {
- r = pending.get(value.getKey().toString());
- if(r != null) {
- synchronized(r) {
- if(r.committed) {
+ runnable = pending.get(value.getKey().toString());
+ if(runnable != null) {
+ synchronized(runnable) {
+ if(runnable.committed) {
// just being written - just need to wait
inProgress = true;
} else {
- r.committed = true;
+ runnable.committed = true;
// we do the writing
}
}
}
}
- if(r != null) {
+ if(runnable != null) {
if(inProgress) {
// System.err.println("reader waits for WriteRunnable to finish");
try {
- r.s.acquire();
+ if(hasMutex) {
+ runnable.borrowMutex = true;
+ }
+ runnable.s.acquire();
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw new IllegalAcornStateException(e);
}
} else {
// System.err.println("reader took WriteRunnable");
- try {
- r.runReally(hasMutex);
- } catch (Throwable e) {
- e.printStackTrace();
- Logger.defaultLogError(e);
- }
+ runnable.runReally(hasMutex);
}
}
-
}
public class WriteRunnable implements Runnable {
- Path bytes;
- MapValue impl;
- boolean committed = false;
+ private Path bytes;
+ private MapValue impl;
+ private boolean committed = false;
+ private boolean borrowMutex = false;
private Semaphore s = new Semaphore(0);
WriteRunnable(Path bytes, MapValue impl) {
@Override
public void run() {
- synchronized(impl) {
-
- synchronized(this) {
-
- if(committed) return;
-
- committed = true;
-
- }
- try {
- runReally(false);
- } catch (Throwable e) {
- e.printStackTrace();
- Logger.defaultLogError(e);
- }
- }
+ try {
+ synchronized(impl) {
+
+ synchronized(this) {
+
+ if(committed)
+ return;
+
+ committed = true;
+ }
+ runReally(false);
+ }
+ } catch (Throwable t) {
+ if (t instanceof IllegalAcornStateException) {
+ manager.notSafeToMakeSnapshot((IllegalAcornStateException)t);
+ } else {
+ manager.notSafeToMakeSnapshot(new IllegalAcornStateException(t));
+ }
+ t.printStackTrace();
+ Logger.defaultLogError(t);
+ }
}
- public void runReally(boolean hasMutex) throws IOException {
-
- if(!hasMutex)
- impl.acquireMutex();
-
- try {
-
- // These have been set in method persist
- assert(!impl.isResident());
- assert(!impl.isDirty());
-
- impl.toFile(bytes);
-
- synchronized(pending) {
- pending.remove(impl.getKey().toString());
- s.release(Integer.MAX_VALUE);
- }
- } finally {
- if(!hasMutex)
- impl.releaseMutex();
- }
-
- }
-
+ public void runWithMutex() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+ try {
+ // These have been set in method persist
+ assert (!impl.isResident());
+ assert (!impl.isDirty());
+
+ impl.toFile(bytes);
+ } finally {
+ synchronized (pending) {
+ pending.remove(impl.getKey().toString());
+ s.release(Integer.MAX_VALUE);
+ }
+ }
+
+ }
+
+ // Fix WriteRunnable.runReally() to use LRU.MapValue mutex instead of
+ // borrowMutex
+ public void runReally(boolean hasMutex) throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+ if (hasMutex) {
+
+ runWithMutex();
+
+ } else {
+
+ boolean gotMutex = impl.tryAcquireMutex();
+
+ boolean done = false;
+ while (!done) {
+
+ if (gotMutex || borrowMutex) {
+ runWithMutex();
+ done = true;
+ } else {
+ System.err.println("Retry mutex acquire");
+ gotMutex = impl.tryAcquireMutex();
+ }
+
+ }
+
+ if (gotMutex)
+ impl.releaseMutex();
+
+ }
+
+ }
}
public Path getDirectory() {
*
*/
- protected void verifyAccess() {
-// assert (mutex.availablePermits() == 0);
+ protected void verifyAccess() throws AcornAccessVerificationException {
if (mutex.availablePermits() != 0)
- throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
+ throw new AcornAccessVerificationException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
}
/*