]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Uncommon Acorn deadlock fix
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
index f39a4987d2ecc27e77843d256e2106d7f2d3e2a3..c69c7bea6fe7957bf3c408256a6d9f5cbfed16e4 100644 (file)
@@ -7,19 +7,26 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
 import org.simantics.acorn.lru.ClusterStreamChunk;
 import org.simantics.acorn.lru.ClusterUpdateOperation;
 import org.simantics.db.service.ClusterUID;
 import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MainProgram implements Runnable, Closeable {
 
+       private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
+       
        private static final int CLUSTER_THREADS = 4;
        private static final int CHUNK_CACHE_SIZE = 100;
 
@@ -167,9 +174,7 @@ public class MainProgram implements Runnable, Closeable {
                                                } catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                }
-
                                        }
-
                                }
 
 //                             long sss = System.nanoTime();
@@ -191,19 +196,27 @@ public class MainProgram implements Runnable, Closeable {
                                        final List<ClusterUpdateOperation> ops = updateSchedules[i];
                                        if (!ops.isEmpty()) {
                                                acquireAmount++;
-                                               clusterUpdateThreads[i].execute(() -> {
-
-                                                       //long st = System.nanoTime();
-                                                       for(ClusterUpdateOperation op : ops) {
-                                                               op.run();
-                                                       }
-                                                       s.release();
-                                                       //                                                      long duration = System.nanoTime()-st;
-                                                       //                                                      elapsed.addAndGet(duration);
-                                                       //                                                      double dur = 1e-9*duration;
-                                                       //                                                      if(dur > 0.05)
-                                                       //                                                              System.err.println("duration=" + dur + "s. " + ops.size());
-                                               });
+                                               clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+                            @Override
+                            public Object call() throws Exception {
+                                //long st = System.nanoTime();
+                                try {
+                                    for(ClusterUpdateOperation op : ops) {
+                                        op.run();
+                                    }
+                                } finally {
+                                    s.release();
+                                }
+                                return null;
+                                
+    //                          long duration = System.nanoTime()-st;
+    //                          elapsed.addAndGet(duration);
+    //                          double dur = 1e-9*duration;
+    //                          if(dur > 0.05)
+    //                              System.err.println("duration=" + dur + "s. " + ops.size());
+                            }
+                        });
                                        }
                                }
 
@@ -217,8 +230,8 @@ public class MainProgram implements Runnable, Closeable {
                                clusters.streamLRU.acquireMutex();
                                try {
                                        swapChunks();
-                               } catch (Throwable t) {
-                                       throw new IllegalStateException(t);
+                               } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+                                   e.printStackTrace();
                                } finally {
                                        clusters.streamLRU.releaseMutex();
                                }
@@ -226,7 +239,7 @@ public class MainProgram implements Runnable, Closeable {
                                try {
                                        swapCS();
                                } catch (Throwable t) {
-                                       throw new IllegalStateException(t);
+                                       throw new IllegalAcornStateException(t);
                                } finally {
                                        clusters.csLRU.releaseMutex();
                                }
@@ -240,47 +253,82 @@ public class MainProgram implements Runnable, Closeable {
                } finally {
                        deathBarrier.release();
                }
-
+       }
+       
+       static interface MainProgramRunnable {
+               
+               public void run() throws Exception;
+               public void done();
+               
        }
 
+       public Exception runIdle(MainProgramRunnable runnable) {
+               try {
+                       long startTime = System.currentTimeMillis();
+                       while (true) {
+                               boolean hasMutex = false;
+                               try {
+                                       synchronized (MainProgram.this) {
+                                               if (hasMutex = mutex.tryAcquire()) {
+                                                       if (operations.isEmpty()) {
+                                                               runnable.run();
+                                                               return null;
+                                                       }
+                                               }
+                                       }
+                                       long endTime = System.currentTimeMillis(); 
+                                       if ((endTime - startTime) > 100) {
+                                               startTime = endTime; 
+                                               LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
+                                       }
+                               } catch (Exception e) {
+                                       return e;
+                               } finally {
+                                       if (hasMutex)
+                                               mutex.release();
+                               }
+                       }
+               } finally {
+                       runnable.done();
+               }
+       }
+       
        /*
         * Mutex for streamLRU is assumed here
         * 
         */
-       private void swapChunks() {
+       private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
 
                // Cache chunks during update operations
                boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
                while(written) {
                        written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
                }
-
        }
 
-       private void swapCS() {
+       private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
 
                // Cache chunks during update operations
                boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
                while(written) {
                        written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
                }
-
        }
 
        public synchronized void committed() {
 
                ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
         if (!alive) {
-            System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
+            LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
 //          return;
         }
                if(last != null) last.commit();
 
        }
 
-       public synchronized void schedule(ClusterUpdateOperation operation) {
+       public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
            if (!alive) {
-               System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
+               LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
 //             return;
            }
                clusters.streamLRU.acquireMutex();
@@ -303,15 +351,13 @@ public class MainProgram implements Runnable, Closeable {
                        swapChunks();
 
                        notifyAll();
-
+               } catch (IllegalAcornStateException e) {
+                   throw e;
                } catch (Throwable t) {
-                       throw new IllegalStateException(t);
+                       throw new IllegalAcornStateException(t);
                } finally {
-
                        clusters.streamLRU.releaseMutex();
-
                }
-
        }
 
     @Override