]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / lru / LRU.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/lru/LRU.java
new file mode 100644 (file)
index 0000000..508127d
--- /dev/null
@@ -0,0 +1,624 @@
+package org.simantics.acorn.lru;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.GraphClientImpl2;
+import org.simantics.db.common.utils.Logger;
+
+/*
+ * The order rule of synchronization for LRU and LRUObject is:
+ *   ยง Always lock LRUObject first!
+ * 
+ */
+
+public class LRU<MapKey,MapValue extends LRUObject<MapKey, MapValue>> {
+       
+       public static boolean VERIFY = true;
+
+       final private long swapTime = 5L*1000000000L;
+       final private int swapSize = 200;
+
+       final private HashMap<MapKey, MapValue> map = new HashMap<MapKey, MapValue>();
+       final private TreeMap<Long, MapKey> priorityQueue = new TreeMap<Long, MapKey>();
+       
+       final private Semaphore mutex = new Semaphore(1);
+       final private String identifier;
+       
+       private Path writeDir;
+       
+       private Thread mutexOwner;
+       
+       public Map<String, WriteRunnable> pending = new HashMap<String, WriteRunnable>();
+       
+       public LRU(String identifier, Path writeDir) {
+               this.identifier = identifier;
+               this.writeDir = writeDir;
+               resume();
+       }
+       
+       /*
+        * Public interface
+        */
+       
+       public void acquireMutex() {
+               
+               try {
+                       
+                       while(!mutex.tryAcquire(3, TimeUnit.SECONDS)) {
+                               System.err.println("Mutex is taking a long time to acquire - owner is " + mutexOwner);
+                       }
+                               
+                       if(VERIFY)
+                               mutexOwner = Thread.currentThread();
+                               
+               } catch (InterruptedException e) {
+                       throw new IllegalStateException(e);
+               }
+       }
+       
+       public void releaseMutex() {
+               mutex.release();
+               mutexOwner = null;
+       }
+
+       public void shutdown() {
+           if (GraphClientImpl2.DEBUG)
+               System.err.println("Shutting down LRU writers " + writers);
+               writers.shutdown();
+               try {
+                       writers.awaitTermination(60, TimeUnit.SECONDS);
+               } catch (InterruptedException e) {
+                       e.printStackTrace();
+               }
+
+       }
+       
+       public void resume() {
+               writers = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
+                       
+                       @Override
+                       public Thread newThread(Runnable r) {
+                               return new Thread(r, identifier + " File Writer");
+                       }
+                       
+               });
+               if (GraphClientImpl2.DEBUG)
+                   System.err.println("Resuming LRU writers " + writers);
+       }
+
+       /*
+        * This method violates the synchronization order rule between LRU and MapVAlue
+        * External synchronization is used to ensure correct operation
+        */
+       public void persist(ArrayList<String> state) {
+       
+               acquireMutex();
+               
+               try {
+               
+                       for (MapValue value : values()) {
+                               value.acquireMutex();
+                               // for debugging purposes
+                               boolean persisted = false;
+                               try {
+                                       // Persist the value if needed
+                                       persisted = value.persist();
+                               } finally {
+                                       // WriteRunnable may want to 
+                                       value.releaseMutex();
+                               }
+                               // Wait pending if value was actually persisted 
+                               waitPending(value, false);
+                               // Take lock again
+                               value.acquireMutex();
+                               try {
+                                       // Record the value
+                                       state.add(value.getStateKey());
+                               } catch (Throwable t) {
+                                       throw new IllegalStateException(t);
+                               } finally {
+                                       value.releaseMutex();
+                               }
+                       }
+                       
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+
+       }
+
+       public MapValue getWithoutMutex(MapKey key) {
+               
+               acquireMutex();
+               try {
+                       return get(key);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+               
+       }
+
+       public MapValue get(MapKey key) {
+               
+               if(VERIFY) verifyAccess();
+               
+               return map.get(key);
+               
+       }
+
+       public void map(MapValue info) {
+               
+               if(VERIFY) verifyAccess();
+               
+               map.put(info.getKey(), info);
+               
+       }
+
+       public Collection<MapValue> values() {
+               
+               if(VERIFY) verifyAccess();
+               
+               return map.values();
+               
+       }
+
+       public boolean swapForced() {
+               
+               acquireMutex();
+               
+               try {
+                       return swap(0, 0, null);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       releaseMutex();
+               }
+               
+       }
+
+       public boolean swap(long lifeTime, int targetSize) {
+               
+               if(VERIFY) verifyAccess();
+
+               return swap(lifeTime, targetSize, null);
+               
+       }
+
+       /*
+        * This is called under global lock
+        */
+       public void setWriteDir(Path dir) {
+               
+               this.writeDir = dir;
+               
+       }
+
+
+       /*
+        * Package access
+        */
+
+       void insert(MapValue info, long accessTime) {
+               
+               if(VERIFY) verifyAccess();
+
+               map.put(info.getKey(), info);
+               priorityQueue.put(accessTime, info.getKey());
+               
+       }
+
+       /*
+        * We have access to ClusterLRU - try to refresh value if available
+        */
+       boolean tryRefresh(MapValue info) {
+
+               if(VERIFY) verifyAccess();
+               
+               if(!info.tryAcquireMutex())
+                       return false;
+
+               try {
+
+                       priorityQueue.remove(info.getLastAccessTime());
+                       info.accessed();
+                       map.put(info.getKey(), info);
+                       priorityQueue.put(info.getLastAccessTime(), info.getKey());
+                       
+                       return true;
+
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+
+                       info.releaseMutex();
+                       
+               }
+               
+       }
+
+       /*
+        * We have access to MapValue and no access to clusterLRU
+        */
+       void refresh(MapValue info, boolean needMutex) {
+               
+               if(VERIFY) {
+                       if(!needMutex) verifyAccess();
+                       info.verifyAccess();
+               }
+               
+               if(needMutex)
+                       acquireMutex();
+
+               try {
+
+                       priorityQueue.remove(info.getLastAccessTime());
+                       info.accessed();
+                       map.put(info.getKey(), info);
+                       priorityQueue.put(info.getLastAccessTime(), info.getKey());
+
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+
+                       if(needMutex)
+                               releaseMutex();
+                       
+               }
+               
+       }
+
+       /*
+        * Private implementation
+        */
+
+       public int size() {
+               
+               if(VERIFY) verifyAccess();
+
+               return priorityQueue.size();
+               
+       }
+
+       boolean swap(MapKey excluded) {
+               
+               if(VERIFY) verifyAccess();
+
+               return swap(swapTime, swapSize, excluded);
+               
+       }
+
+       boolean swap(long lifeTime, int targetSize, MapKey excluded) {
+
+               if(VERIFY) verifyAccess();
+
+               MapValue valueToSwap = getValueToSwap(lifeTime, targetSize, excluded);
+               if(valueToSwap != null) {
+                       
+                       if(valueToSwap.tryAcquireMutex()) {
+                               
+                               try {
+                                       
+                                       if(valueToSwap.canBePersisted()) {
+                                               valueToSwap.persist();
+                                               return true;
+                                       }
+                                       
+                               } catch (Throwable t) {
+                                       throw new IllegalStateException(t);
+                               } finally {
+                                       valueToSwap.releaseMutex();
+                               }
+                       }
+                       
+               }
+               
+               return false;
+
+       }
+
+       
+       private MapValue getValueToSwap1(long lifeTime, int targetSize, MapKey excluded) {
+
+               if(VERIFY) verifyAccess();
+
+               for(int i=0;i<10;i++) {
+
+                       long candidate = getSwapCandidate(lifeTime, targetSize);
+                       if(candidate == 0) return null;
+                       
+                       MapKey key = priorityQueue.remove(candidate);
+                       if(key.equals(excluded)) {
+                               tryRefresh(map.get(key));
+                               continue;
+                       }
+                       
+                       return map.get(key);
+                       
+               }
+               
+               return null;
+
+       }
+       
+       
+       private MapValue getValueToSwap(long lifeTime, int targetSize, MapKey excluded) {
+
+               if(VERIFY) verifyAccess();
+
+               for(int i=0;i<10;i++) {
+                       
+                       // Lock LRU and get a candidate
+                       MapValue value = getValueToSwap1(lifeTime, targetSize, excluded);
+                       if(value == null) return null;
+                       
+                       if(value.tryAcquireMutex()) {
+
+                               try {
+                                       
+                                       // This may lock the object
+                                       if(value.canBePersisted()) return value;
+                                       // Insert back the value
+                                       refresh(value, false);
+       
+                               } catch (Throwable t) {
+                                       throw new IllegalStateException(t);
+                               } finally {
+                                       
+                                       value.releaseMutex();
+                                       
+                               }
+                               
+                       }
+                       
+               }
+               
+               return null;
+               
+       }
+       
+       private long getSwapCandidate(long lifeTime, int targetSize) {
+               
+               if(VERIFY) verifyAccess();
+
+               if(priorityQueue.isEmpty()) return 0;
+
+               long currentTime = System.nanoTime();
+               Long lowest = priorityQueue.firstKey();
+
+               if(currentTime - lowest > lifeTime || priorityQueue.size() > targetSize) {
+                       return lowest;
+               }
+
+               return 0;
+
+       }
+       
+       /*
+        * Tries to persist this object. Can fail if the object cannot be persisted at this time.
+        * 
+        */
+       boolean persist(Object object_) {
+               
+               MapValue object = (MapValue)object_;
+               
+               if(VERIFY) object.verifyAccess();
+               
+               if(object.isDirty()) {
+
+                       // It is possible that this just became unpersistable. Fail here in this case.
+                       if(!object.canBePersisted()) {
+                               return false;
+                       }
+
+                       assert(object.isResident());
+
+                       Path f = writeDir.resolve(object.getFileName());
+
+                       WriteRunnable runnable = new WriteRunnable(f, object);
+                       
+                       synchronized(pending) {
+                               WriteRunnable existing = pending.put(object.getKey().toString(), runnable);
+                               assert(existing == null);
+                       }
+
+                       writers.execute(runnable);
+
+                       object.setResident(false);
+                       object.setDirty(false);
+
+                       return true;
+
+               } else if(object.isResident()) {
+
+                       object.release();
+                       object.setResident(false);
+                       return false;
+
+               }
+
+               return false;
+               
+       }
+       
+       int makeResident(Object object_, boolean keepResident) {
+
+               MapValue object = (MapValue)object_;
+
+               if(VERIFY) object.verifyAccess();
+
+               try {
+
+                       object.setForceResident(keepResident);
+
+                       if(object.isResident()) {
+                               refresh(object, true);
+                               return 0;
+                       }
+
+                       waitPending(object, true);
+
+                       byte[] data = object.readFile();
+
+                       object.fromFile(data);
+                       object.setResident(true);
+
+                       acquireMutex();
+                       try {
+                               refresh(object, false);
+                               swap(swapTime, swapSize, object.getKey());
+                       } catch (Throwable t) {
+                               throw new IllegalStateException(t);
+                       } finally {
+                               releaseMutex();
+                       }
+
+                       return data.length;
+
+               } catch (IOException e) {
+                       
+                       e.printStackTrace();
+                       
+               }
+               
+               return 0;
+               
+       }
+
+       static int readCounter = 0;
+       static int writeCounter = 0;
+       
+       ScheduledThreadPoolExecutor writers;
+       
+       void waitPending(MapValue value, boolean hasMutex) {
+               
+               WriteRunnable r = null;
+               boolean inProgress = false;
+               synchronized(pending) {
+                       r = pending.get(value.getKey().toString());
+                       if(r != null) {
+                               synchronized(r) {
+                                       if(r.committed) {
+                                               // just being written - just need to wait
+                                               inProgress = true;
+                                       } else {
+                                               r.committed = true;
+                                               // we do the writing
+                                       }
+                               }
+                       }
+               }
+               if(r != null) {
+                       if(inProgress) {
+//                             System.err.println("reader waits for WriteRunnable to finish");
+                               try {
+                                       r.s.acquire();
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       } else {
+//                             System.err.println("reader took WriteRunnable");
+                               try {
+                    r.runReally(hasMutex);
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    Logger.defaultLogError(e);
+                }
+                       }
+               }
+               
+       }
+       
+       public class WriteRunnable implements Runnable {
+
+               Path bytes;
+               MapValue impl;
+               boolean committed = false;
+               private Semaphore s = new Semaphore(0);
+               
+               WriteRunnable(Path bytes, MapValue impl) {
+                       this.bytes = bytes;
+                       this.impl = impl;
+               }
+               
+               @Override
+               public void run() {
+                       synchronized(impl) {
+
+                               synchronized(this) {
+                               
+                                       if(committed) return;
+                                       
+                                       committed = true;
+                               
+                               }
+                   try {
+                       runReally(false);
+                   } catch (Throwable e) {
+                       e.printStackTrace();
+                       Logger.defaultLogError(e);
+                   }
+                       }
+               }
+
+               public void runReally(boolean hasMutex) throws IOException {
+               
+                       if(!hasMutex)
+                               impl.acquireMutex();
+                       
+                       try {
+                               
+                               // These have been set in method persist
+                               assert(!impl.isResident());
+                               assert(!impl.isDirty());
+                               
+                               impl.toFile(bytes);
+
+                               synchronized(pending) {
+                                       pending.remove(impl.getKey().toString());
+                                       s.release(Integer.MAX_VALUE);
+                               }
+                       } finally {
+                               if(!hasMutex)
+                                       impl.releaseMutex();
+                       }
+                       
+               }
+       
+       }
+       
+       public Path getDirectory() {
+               return writeDir;
+       }
+       
+       /*
+        * Protected implementation 
+        * 
+        */
+       
+       protected void verifyAccess() {
+//         assert (mutex.availablePermits() == 0);
+               if (mutex.availablePermits() != 0)
+                   throw new IllegalStateException("identifier=" + identifier + " mutex has " + mutex.availablePermits() + " available permits, should be 0! Current mutexOwner is " + mutexOwner);
+       }
+       
+       /*
+        * Private implementation 
+        * 
+        */
+       
+       
+}