--- /dev/null
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.GraphClientImpl2;
+import org.simantics.db.common.utils.Logger;
+
+/*
+ * The order rule of synchronization for LRU and LRUObject is:
+ * ยง Always lock LRUObject first!
+ *
+ */
+
+public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
+
+ public static boolean VERIFY = true;
+
+ final private long swapTime = 5L*1000000000L;
+ final private int swapSize = 200;
+
+ final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
+ final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
+
+ final private Semaphore mutex = new Semaphore(1);
+ final private String identifier;
+
+ private Path writeDir;
+
+ private Thread mutexOwner;
+
+ public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
+
+ public LRU(String identifier, Path writeDir) {
+ this.identifier = identifier;
+ this.writeDir = writeDir;
+ resume();
+ }
+
+ /*
+ * Public interface
+ */
+
+ public void acquireMutex() {
+
+ try {
+
+ while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
+ System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
+ }
+
+ if(VERIFY)
+ mutexOwner = Thread.currentThread();
+
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void releaseMutex() {
+ mutex.release();
+ mutexOwner = null;
+ }
+
+ public void shutdown() {
+ if (GraphClientImpl2.DEBUG)
+ System.err.println("Shutting down LRU writers " + writers);
+ writers.shutdown();
+ try {
+ writers.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void resume() {
+ writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, identifier + " File Writer");
+ }
+
+ });
+ if (GraphClientImpl2.DEBUG)
+ System.err.println("Resuming LRU writers " + writers);
+ }
+
+ /*
+ * This method violates the synchronization order rule between LRU and MapVAlue
+ * External synchronization is used to ensure correct operation
+ */
+ public void persist(ArrayList<String> state) {
+
+ acquireMutex();
+
+ try {
+
+ for (MapValue value : values()) {
+ value.acquireMutex();
+ // for debugging purposes
+ boolean persisted = false;
+ try {
+ // Persist the value if needed
+ persisted = value.persist();
+ } finally {
+ // WriteRunnable may want to
+ value.releaseMutex();
+ }
+ // Wait pending if value was actually persisted
+ waitPending(value, false);
+ // Take lock again
+ value.acquireMutex();
+ try {
+ // Record the value
+ state.add(value.getStateKey());
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ value.releaseMutex();
+ }
+ }
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ }
+
+ public MapValue getWithoutMutex(MapKey key) {
+
+ acquireMutex();
+ try {
+ return get(key);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ }
+
+ public MapValue get(MapKey key) {
+
+ if(VERIFY) verifyAccess();
+
+ return map.get(key);
+
+ }
+
+ public void map(MapValue info) {
+
+ if(VERIFY) verifyAccess();
+
+ map.put(info.getKey(), info);
+
+ }
+
+ public Collection<MapValue> values() {
+
+ if(VERIFY) verifyAccess();
+
+ return map.values();
+
+ }
+
+ public boolean swapForced() {
+
+ acquireMutex();
+
+ try {
+ return swap(0, 0, null);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ }
+
+ public boolean swap(long lifeTime, int targetSize) {
+
+ if(VERIFY) verifyAccess();
+
+ return swap(lifeTime, targetSize, null);
+
+ }
+
+ /*
+ * This is called under global lock
+ */
+ public void setWriteDir(Path dir) {
+
+ this.writeDir = dir;
+
+ }
+
+
+ /*
+ * Package access
+ */
+
+ void insert(MapValue info, long accessTime) {
+
+ if(VERIFY) verifyAccess();
+
+ map.put(info.getKey(), info);
+ priorityQueue.put(accessTime, info.getKey());
+
+ }
+
+ /*
+ * We have access to ClusterLRU - try to refresh value if available
+ */
+ boolean tryRefresh(MapValue info) {
+
+ if(VERIFY) verifyAccess();
+
+ if(!info.tryAcquireMutex())
+ return false;
+
+ try {
+
+ priorityQueue.remove(info.getLastAccessTime());
+ info.accessed();
+ map.put(info.getKey(), info);
+ priorityQueue.put(info.getLastAccessTime(), info.getKey());
+
+ return true;
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ info.releaseMutex();
+
+ }
+
+ }
+
+ /*
+ * We have access to MapValue and no access to clusterLRU
+ */
+ void refresh(MapValue info, boolean needMutex) {
+
+ if(VERIFY) {
+ if(!needMutex) verifyAccess();
+ info.verifyAccess();
+ }
+
+ if(needMutex)
+ acquireMutex();
+
+ try {
+
+ priorityQueue.remove(info.getLastAccessTime());
+ info.accessed();
+ map.put(info.getKey(), info);
+ priorityQueue.put(info.getLastAccessTime(), info.getKey());
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ if(needMutex)
+ releaseMutex();
+
+ }
+
+ }
+
+ /*
+ * Private implementation
+ */
+
+ public int size() {
+
+ if(VERIFY) verifyAccess();
+
+ return priorityQueue.size();
+
+ }
+
+ boolean swap(MapKey excluded) {
+
+ if(VERIFY) verifyAccess();
+
+ return swap(swapTime, swapSize, excluded);
+
+ }
+
+ boolean swap(long lifeTime, int targetSize, MapKey excluded) {
+
+ if(VERIFY) verifyAccess();
+
+ MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
+ if(valueToSwap != null) {
+
+ if(valueToSwap.tryAcquireMutex()) {
+
+ try {
+
+ if(valueToSwap.canBePersisted()) {
+ valueToSwap.persist();
+ return true;
+ }
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ valueToSwap.releaseMutex();
+ }
+ }
+
+ }
+
+ return false;
+
+ }
+
+
+ private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) {
+
+ if(VERIFY) verifyAccess();
+
+ for(int i=0;i<10;i++) {
+
+ long candidate = getSwapCandidate(lifeTime, targetSize);
+ if(candidate == 0) return null;
+
+ MapKey key = priorityQueue.remove(candidate);
+ if(key.equals(excluded)) {
+ tryRefresh(map.get(key));
+ continue;
+ }
+
+ return map.get(key);
+
+ }
+
+ return null;
+
+ }
+
+
+ private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) {
+
+ if(VERIFY) verifyAccess();
+
+ for(int i=0;i<10;i++) {
+
+ // Lock LRU and get a candidate
+ MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
+ if(value == null) return null;
+
+ if(value.tryAcquireMutex()) {
+
+ try {
+
+ // This may lock the object
+ if(value.canBePersisted()) return value;
+ // Insert back the value
+ refresh(value, false);
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ value.releaseMutex();
+
+ }
+
+ }
+
+ }
+
+ return null;
+
+ }
+
+ private long getSwapCandidate(long lifeTime, int targetSize) {
+
+ if(VERIFY) verifyAccess();
+
+ if(priorityQueue.isEmpty()) return 0;
+
+ long currentTime = System.nanoTime();
+ Long lowest = priorityQueue.firstKey();
+
+ if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
+ return lowest;
+ }
+
+ return 0;
+
+ }
+
+ /*
+ * Tries to persist this object. Can fail if the object cannot be persisted at this time.
+ *
+ */
+ boolean persist(Object object_) {
+
+ MapValue object = (MapValue)object_;
+
+ if(VERIFY) object.verifyAccess();
+
+ if(object.isDirty()) {
+
+ // It is possible that this just became unpersistable. Fail here in this case.
+ if(!object.canBePersisted()) {
+ return false;
+ }
+
+ assert(object.isResident());
+
+ Path f = writeDir.resolve(object.getFileName());
+
+ WriteRunnable runnable = new WriteRunnable(f, object);
+
+ synchronized(pending) {
+ WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
+ assert(existing == null);
+ }
+
+ writers.execute(runnable);
+
+ object.setResident(false);
+ object.setDirty(false);
+
+ return true;
+
+ } else if(object.isResident()) {
+
+ object.release();
+ object.setResident(false);
+ return false;
+
+ }
+
+ return false;
+
+ }
+
+ int makeResident(Object object_, boolean keepResident) {
+
+ MapValue object = (MapValue)object_;
+
+ if(VERIFY) object.verifyAccess();
+
+ try {
+
+ object.setForceResident(keepResident);
+
+ if(object.isResident()) {
+ refresh(object, true);
+ return 0;
+ }
+
+ waitPending(object, true);
+
+ byte[] data = object.readFile();
+
+ object.fromFile(data);
+ object.setResident(true);
+
+ acquireMutex();
+ try {
+ refresh(object, false);
+ swap(swapTime, swapSize, object.getKey());
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ releaseMutex();
+ }
+
+ return data.length;
+
+ } catch (IOException e) {
+
+ e.printStackTrace();
+
+ }
+
+ return 0;
+
+ }
+
+ static int readCounter = 0;
+ static int writeCounter = 0;
+
+ ScheduledThreadPoolExecutor writers;
+
+ void waitPending(MapValue value, boolean hasMutex) {
+
+ WriteRunnable r = null;
+ boolean inProgress = false;
+ synchronized(pending) {
+ r = pending.get(value.getKey().toString());
+ if(r != null) {
+ synchronized(r) {
+ if(r.committed) {
+ // just being written - just need to wait
+ inProgress = true;
+ } else {
+ r.committed = true;
+ // we do the writing
+ }
+ }
+ }
+ }
+ if(r != null) {
+ if(inProgress) {
+// System.err.println("reader waits for WriteRunnable to finish");
+ try {
+ r.s.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+// System.err.println("reader took WriteRunnable");
+ try {
+ r.runReally(hasMutex);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Logger.defaultLogError(e);
+ }
+ }
+ }
+
+ }
+
+ public class WriteRunnable implements Runnable {
+
+ Path bytes;
+ MapValue impl;
+ boolean committed = false;
+ private Semaphore s = new Semaphore(0);
+
+ WriteRunnable(Path bytes, MapValue impl) {
+ this.bytes = bytes;
+ this.impl = impl;
+ }
+
+ @Override
+ public void run() {
+ synchronized(impl) {
+
+ synchronized(this) {
+
+ if(committed) return;
+
+ committed = true;
+
+ }
+ try {
+ runReally(false);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Logger.defaultLogError(e);
+ }
+ }
+ }
+
+ public void runReally(boolean hasMutex) throws IOException {
+
+ if(!hasMutex)
+ impl.acquireMutex();
+
+ try {
+
+ // These have been set in method persist
+ assert(!impl.isResident());
+ assert(!impl.isDirty());
+
+ impl.toFile(bytes);
+
+ synchronized(pending) {
+ pending.remove(impl.getKey().toString());
+ s.release(Integer.MAX_VALUE);
+ }
+ } finally {
+ if(!hasMutex)
+ impl.releaseMutex();
+ }
+
+ }
+
+ }
+
+ public Path getDirectory() {
+ return writeDir;
+ }
+
+ /*
+ * Protected implementation
+ *
+ */
+
+ protected void verifyAccess() {
+// assert (mutex.availablePermits() == 0);
+ if (mutex.availablePermits() != 0)
+ throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
+ }
+
+ /*
+ * Private implementation
+ *
+ */
+
+
+}