]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
MainProgram polls nanoTime too often
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
index 78ff9e8995c4162fc347edc9b51bd6f7c69ec4ff..1e4b5cbac508623fff9fd9d5017043818ce1ee42 100644 (file)
@@ -1,9 +1,9 @@
 package org.simantics.acorn;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -20,25 +20,28 @@ import org.simantics.acorn.lru.ClusterStreamChunk;
 import org.simantics.acorn.lru.ClusterUpdateOperation;
 import org.simantics.db.service.ClusterUID;
 import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MainProgram implements Runnable, Closeable {
 
+       private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
+
        private static final int CLUSTER_THREADS = 4;
        private static final int CHUNK_CACHE_SIZE = 100;
 
        private final GraphClientImpl2 client;
-       private final ClusterManager clusters;
        private final ExecutorService[] clusterUpdateThreads;
     private final List<ClusterUpdateOperation>[] updateSchedules;
-       
-       private int residentOperationBytes = 0;
-       private long currentChangeSetId = -1;
-       private int nextChunkId = 0;
+
+       private Thread mainProgramThread;
+
        private boolean alive = true;
        private Semaphore deathBarrier = new Semaphore(0);
 
-       final Semaphore mutex = new Semaphore(1);
-       final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+       final ClusterManager clusters;
+
+       private final OperationQueue operationQueue = new OperationQueue(this);
 
        static class ClusterThreadFactory implements ThreadFactory {
 
@@ -58,7 +61,8 @@ public class MainProgram implements Runnable, Closeable {
                }
        }
 
-       public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+       @SuppressWarnings("unchecked")
+       MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
 
                this.client = client;
                this.clusters = clusters;
@@ -70,9 +74,8 @@ public class MainProgram implements Runnable, Closeable {
                }
        }
 
-       public void startTransaction(long id) {
-               currentChangeSetId = id;
-               nextChunkId = 0;
+       void startTransaction(long id) {
+               operationQueue.startTransaction(id);
        }
 
        private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
@@ -85,138 +88,35 @@ public class MainProgram implements Runnable, Closeable {
 
        @Override
        public void run() {
+
+               mainProgramThread = Thread.currentThread();
+
                try {
 
-                       mutex.acquire();
                        main:
                        while(alive) {
 
                                TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
 
-                               synchronized(MainProgram.this) {
+                               operationQueue.pumpUpdates(updates);
 
-                                       while(!operations.isEmpty() && updates.size() < 100) {
+                               if(updates.isEmpty()) {
 
-                                               ClusterStreamChunk chunk = operations.pollFirst();
+                                       long duration = operationQueue.waitFor();
 
-                                               for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
-                                                       ClusterUpdateOperation o = chunk.operations.get(i);
-                                                       ClusterUID uid = o.uid;
-                                                       List<ClusterUpdateOperation> ops = updates.get(uid);
-                                                       if(ops == null) {
-                                                               ops = new ArrayList<ClusterUpdateOperation>();
-                                                               updates.put(uid, ops);
-                                                       }
-                                                       ops.add(o);
-                                               }
-
-                                               chunk.nextToProcess = chunk.operations.size();
-
-                                               if(!chunk.isCommitted()) {
-                                                       assert(operations.isEmpty());
-                                                       operations.add(chunk);
-                                                       break;
-                                               }
+                                       if (!alive)
+                                               break main;
 
+                                       if(duration > 4000000000L) {
+                                               checkIdle();
                                        }
-
-                                       if(updates.isEmpty()) {
-                                               try {
-                                                       long start = System.nanoTime();
-                                                       mutex.release();
-                                                       MainProgram.this.wait(5000);
-                                                       mutex.acquire();
-                                                       if (!alive)
-                                                               break main;
-                                                       long duration = System.nanoTime()-start;
-                                                       if(duration > 4000000000L) {
-
-                                                               // Was this a time-out or a new stream request?
-                                                               if(operations.isEmpty()) {
-
-                                                                       /*
-                                                                        * We are idling here.
-                                                                        * Flush all caches gradually
-                                                                        */
-
-                                                                       // Write pending cs to disk
-                                                                       boolean written = clusters.csLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.csLRU.swapForced();
-                                                                       }
-                                                                       // Write pending chunks to disk
-                                                                       written = clusters.streamLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.streamLRU.swapForced();
-                                                                       }
-                                                                       // Write pending files to disk
-                                                                       written = clusters.fileLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.fileLRU.swapForced();
-                                                                       }
-                                                                       // Write pending clusters to disk
-                                                                       written = clusters.clusterLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.clusterLRU.swapForced();
-                                                                       }
-
-                                                                       client.tryMakeSnapshot();
-                                                               }
-                                                       }
-                                               } catch (InterruptedException e) {
-                                                       e.printStackTrace();
-                                               }
-                                       }
+                                       
                                }
 
 //                             long sss = System.nanoTime();
 
-                               for(int i=0;i<CLUSTER_THREADS;i++)
-                                       updateSchedules[i].clear();
-
-                               final Semaphore s = new Semaphore(0);
-
-                               for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
-                                       ClusterUID key = entry.getKey();
-                                       int hash = key.hashCode() & (clusterUpdateThreads.length-1);
-                                       updateSchedules[hash].addAll(entry.getValue());
-                               }
-
-                               //                              final AtomicLong elapsed = new AtomicLong(0);
-                               int acquireAmount = 0;
-                               for(int i=0;i<CLUSTER_THREADS;i++) {
-                                       final List<ClusterUpdateOperation> ops = updateSchedules[i];
-                                       if (!ops.isEmpty()) {
-                                               acquireAmount++;
-                                               clusterUpdateThreads[i].submit(new Callable<Object>() {
-
-                            @Override
-                            public Object call() throws Exception {
-                                //long st = System.nanoTime();
-                                try {
-                                    for(ClusterUpdateOperation op : ops) {
-                                        op.run();
-                                    }
-                                } finally {
-                                    s.release();
-                                }
-                                return null;
-                                
-    //                          long duration = System.nanoTime()-st;
-    //                          elapsed.addAndGet(duration);
-    //                          double dur = 1e-9*duration;
-    //                          if(dur > 0.05)
-    //                              System.err.println("duration=" + dur + "s. " + ops.size());
-                            }
-                        });
-                                       }
-                               }
-
-                               s.acquire(acquireAmount);
+                               runUpdates(updates);
+                               runTasksIfEmpty();
 
                                /*
                                 * Here we are actively processing updates from client.
@@ -227,7 +127,7 @@ public class MainProgram implements Runnable, Closeable {
                                try {
                                        swapChunks();
                                } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
-                                   e.printStackTrace();
+                                       LOGGER.error("cluster chunk swapping failed", e);
                                } finally {
                                        clusters.streamLRU.releaseMutex();
                                }
@@ -245,70 +145,184 @@ public class MainProgram implements Runnable, Closeable {
                        }
 
                } catch (Throwable t) {
-                       t.printStackTrace();
+                       LOGGER.error("FATAL: MainProgram died unexpectedly", t);
                } finally {
                        deathBarrier.release();
                }
        }
 
+       @FunctionalInterface
+       static interface MainProgramRunnable {
+               void run() throws Exception;
+               default void error(Exception e) {
+                       LOGGER.error("An error occured", e);
+               }
+               default void success() {}
+       }
+
+       private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+               for(int i=0;i<CLUSTER_THREADS;i++)
+                       updateSchedules[i].clear();
+
+               final Semaphore s = new Semaphore(0);
+
+               for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+                       ClusterUID key = entry.getKey();
+                       int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+                       updateSchedules[hash].addAll(entry.getValue());
+               }
+
+               //                              final AtomicLong elapsed = new AtomicLong(0);
+               int acquireAmount = 0;
+               for(int i=0;i<CLUSTER_THREADS;i++) {
+                       final List<ClusterUpdateOperation> ops = updateSchedules[i];
+                       if (!ops.isEmpty()) {
+                               acquireAmount++;
+                               clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+                    @Override
+                    public Object call() throws Exception {
+                        //long st = System.nanoTime();
+                        try {
+                            for(ClusterUpdateOperation op : ops) {
+                                op.run();
+                            }
+                        } finally {
+                            s.release();
+                        }
+                        return null;
+
+//                          long duration = System.nanoTime()-st;
+//                          elapsed.addAndGet(duration);
+//                          double dur = 1e-9*duration;
+//                          if(dur > 0.05)
+//                              System.err.println("duration=" + dur + "s. " + ops.size());
+                    }
+                });
+                       }
+               }
+
+               s.acquire(acquireAmount);
+
+       }
+
+       /*
+        * This shall be run when no updates are currently available.
+        */
+       private void runTasksIfEmpty() {
+               if(operationQueue.isEmpty()) {
+                       List<MainProgramRunnable> todo = new ArrayList<>();
+                       operationQueue.pumpTasks(todo);
+                       for(MainProgramRunnable runnable : todo) {
+                               try {
+                                       runnable.run();
+                                       runnable.success();
+                               } catch (Exception e) {
+                                       runnable.error(e);
+                               }
+                       }
+               }
+       }
+
+       /*
+        * This gets called when an idle period has been detected
+        */
+       private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+               // Was this a time-out or a new stream request?
+               if(operationQueue.isEmpty()) {
+
+                       /*
+                        * We are idling here.
+                        * Flush all caches gradually
+                        */
+
+                       // Write pending cs to disk
+                       boolean written = clusters.csLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.csLRU.swapForced();
+                       }
+                       // Write pending chunks to disk
+                       written = clusters.streamLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.streamLRU.swapForced();
+                       }
+                       // Write pending files to disk
+                       written = clusters.fileLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.fileLRU.swapForced();
+                       }
+                       // Write pending clusters to disk
+                       written = clusters.clusterLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.clusterLRU.swapForced();
+                       }
+
+                       client.tryMakeSnapshot();
+
+               }
+
+       }
+
+
+       /*
+        * This schedules tasks to be run in MainProgram thread
+        * Called from other threads than MainProgram thread
+        *
+        */
+       void runIdle(MainProgramRunnable task) {
+               operationQueue.scheduleTask(task);
+       }
+
        /*
         * Mutex for streamLRU is assumed here
-        * 
+        *
         */
        private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
 
                // Cache chunks during update operations
-               boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
-               while(written) {
-                       written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
-               }
+               while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
        }
 
        private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
 
                // Cache chunks during update operations
-               boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
-               while(written) {
-                       written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
-               }
+               while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
        }
 
-       public synchronized void committed() {
+       /*
+        * Called by DB client write threads
+        */
+       void committed() {
 
-               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               ClusterStreamChunk last = operationQueue.commitLast();
         if (!alive) {
-            System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
-//          return;
+            LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
         }
-               if(last != null) last.commit();
 
        }
 
-       public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
-           if (!alive) {
-               System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
-//             return;
+       /*
+        * Called by DB client write threads
+        */
+       void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
+
+               if (!alive) {
+               LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
            }
+
                clusters.streamLRU.acquireMutex();
 
                try {
 
-                       ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
-                       if(last == null || last.isCommitted()) {
-                               String id = "" + currentChangeSetId + "-" + nextChunkId++;
-                               last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
-                               operations.add(last);
-                       }
-
-                       String chunkId = last.getKey();
-                       int chunkOffset = last.operations.size();
-                       operation.scheduled(chunkId + "." + chunkOffset);
-
-                       last.addOperation(operation);
-
+                       operationQueue.scheduleUpdate(operation);
                        swapChunks();
 
-                       notifyAll();
                } catch (IllegalAcornStateException e) {
                    throw e;
                } catch (Throwable t) {
@@ -316,14 +330,17 @@ public class MainProgram implements Runnable, Closeable {
                } finally {
                        clusters.streamLRU.releaseMutex();
                }
+
        }
 
     @Override
     public void close() {
+
         alive = false;
-        synchronized (this) {
-            notifyAll();
-        }
+
+        // This will wake up the sleeping beauty
+        operationQueue.scheduleTask(() -> {});
+
         try {
             deathBarrier.acquire();
         } catch (InterruptedException e) {
@@ -338,9 +355,17 @@ public class MainProgram implements Runnable, Closeable {
                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
                 clusterUpdateThreads[i] = null;
             } catch (InterruptedException e) {
-                e.printStackTrace();
+                LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
             }
         }
     }
 
+    void assertMainProgramThread() {
+        assert(Thread.currentThread().equals(mainProgramThread));
+    }
+
+    void assertNoMainProgramThread() {
+        assert(!Thread.currentThread().equals(mainProgramThread));
+    }
+
 }