]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
new file mode 100644 (file)
index 0000000..f39a498
--- /dev/null
@@ -0,0 +1,342 @@
+package org.simantics.acorn;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.logging.TimeLogger;
+
+public class MainProgram implements Runnable, Closeable {
+
+       private static final int CLUSTER_THREADS = 4;
+       private static final int CHUNK_CACHE_SIZE = 100;
+
+       private final GraphClientImpl2 client;
+       private final ClusterManager clusters;
+       private final ExecutorService[] clusterUpdateThreads;
+    private final List<ClusterUpdateOperation>[] updateSchedules;
+       
+       private int residentOperationBytes = 0;
+       private long currentChangeSetId = -1;
+       private int nextChunkId = 0;
+       private boolean alive = true;
+       private Semaphore deathBarrier = new Semaphore(0);
+
+       final Semaphore mutex = new Semaphore(1);
+       final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+       static class ClusterThreadFactory implements ThreadFactory {
+
+               final String name;
+               final boolean daemon;
+
+               public ClusterThreadFactory(String name, boolean daemon) {
+                       this.name = name;
+                       this.daemon = daemon;
+               }
+
+               @Override
+               public Thread newThread(Runnable r) {
+                       Thread thread = new Thread(r, name);
+                       thread.setDaemon(daemon);
+                       return thread;
+               }
+       }
+
+       public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+
+               this.client = client;
+               this.clusters = clusters;
+               this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
+               this.updateSchedules = new ArrayList[CLUSTER_THREADS];
+               for(int i=0;i<clusterUpdateThreads.length;i++) {
+                       clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
+                       updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
+               }
+       }
+
+       public void startTransaction(long id) {
+               currentChangeSetId = id;
+               nextChunkId = 0;
+       }
+
+       private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
+
+               @Override
+               public int compare(ClusterUID o1, ClusterUID o2) {
+                       return Long.compare(o1.second, o2.second);
+               }
+       };
+
+       @Override
+       public void run() {
+               try {
+
+                       mutex.acquire();
+                       main:
+                       while(alive) {
+
+                               TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
+
+                               synchronized(MainProgram.this) {
+
+                                       while(!operations.isEmpty() && updates.size() < 100) {
+
+                                               ClusterStreamChunk chunk = operations.pollFirst();
+
+                                               for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+                                                       ClusterUpdateOperation o = chunk.operations.get(i);
+                                                       ClusterUID uid = o.uid;
+                                                       List<ClusterUpdateOperation> ops = updates.get(uid);
+                                                       if(ops == null) {
+                                                               ops = new ArrayList<ClusterUpdateOperation>();
+                                                               updates.put(uid, ops);
+                                                       }
+                                                       ops.add(o);
+                                               }
+
+                                               chunk.nextToProcess = chunk.operations.size();
+
+                                               if(!chunk.isCommitted()) {
+                                                       assert(operations.isEmpty());
+                                                       operations.add(chunk);
+                                                       break;
+                                               }
+
+                                       }
+
+                                       if(updates.isEmpty()) {
+                                               try {
+                                                       long start = System.nanoTime();
+                                                       mutex.release();
+                                                       MainProgram.this.wait(5000);
+                                                       mutex.acquire();
+                                                       if (!alive)
+                                                               break main;
+                                                       long duration = System.nanoTime()-start;
+                                                       if(duration > 4000000000L) {
+
+                                                               // Was this a time-out or a new stream request?
+                                                               if(operations.isEmpty()) {
+
+                                                                       /*
+                                                                        * We are idling here.
+                                                                        * Flush all caches gradually
+                                                                        */
+
+                                                                       // Write pending cs to disk
+                                                                       boolean written = clusters.csLRU.swapForced();
+                                                                       while(written) {
+                                                                               if(!updates.isEmpty()) break;
+                                                                               written = clusters.csLRU.swapForced();
+                                                                       }
+                                                                       // Write pending chunks to disk
+                                                                       written = clusters.streamLRU.swapForced();
+                                                                       while(written) {
+                                                                               if(!updates.isEmpty()) break;
+                                                                               written = clusters.streamLRU.swapForced();
+                                                                       }
+                                                                       // Write pending files to disk
+                                                                       written = clusters.fileLRU.swapForced();
+                                                                       while(written) {
+                                                                               if(!updates.isEmpty()) break;
+                                                                               written = clusters.fileLRU.swapForced();
+                                                                       }
+                                                                       // Write pending clusters to disk
+                                                                       written = clusters.clusterLRU.swapForced();
+                                                                       while(written) {
+                                                                               if(!updates.isEmpty()) break;
+                                                                               written = clusters.clusterLRU.swapForced();
+                                                                       }
+
+                                                                       client.tryMakeSnapshot();
+                                                               }
+                                                       }
+                                               } catch (InterruptedException e) {
+                                                       e.printStackTrace();
+                                               }
+
+                                       }
+
+                               }
+
+//                             long sss = System.nanoTime();
+
+                               for(int i=0;i<CLUSTER_THREADS;i++)
+                                       updateSchedules[i].clear();
+
+                               final Semaphore s = new Semaphore(0);
+
+                               for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+                                       ClusterUID key = entry.getKey();
+                                       int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+                                       updateSchedules[hash].addAll(entry.getValue());
+                               }
+
+                               //                              final AtomicLong elapsed = new AtomicLong(0);
+                               int acquireAmount = 0;
+                               for(int i=0;i<CLUSTER_THREADS;i++) {
+                                       final List<ClusterUpdateOperation> ops = updateSchedules[i];
+                                       if (!ops.isEmpty()) {
+                                               acquireAmount++;
+                                               clusterUpdateThreads[i].execute(() -> {
+
+                                                       //long st = System.nanoTime();
+                                                       for(ClusterUpdateOperation op : ops) {
+                                                               op.run();
+                                                       }
+                                                       s.release();
+                                                       //                                                      long duration = System.nanoTime()-st;
+                                                       //                                                      elapsed.addAndGet(duration);
+                                                       //                                                      double dur = 1e-9*duration;
+                                                       //                                                      if(dur > 0.05)
+                                                       //                                                              System.err.println("duration=" + dur + "s. " + ops.size());
+                                               });
+                                       }
+                               }
+
+                               s.acquire(acquireAmount);
+
+                               /*
+                                * Here we are actively processing updates from client.
+                                * Maintain necessary caching here.
+                                */
+
+                               clusters.streamLRU.acquireMutex();
+                               try {
+                                       swapChunks();
+                               } catch (Throwable t) {
+                                       throw new IllegalStateException(t);
+                               } finally {
+                                       clusters.streamLRU.releaseMutex();
+                               }
+                               clusters.csLRU.acquireMutex();
+                               try {
+                                       swapCS();
+                               } catch (Throwable t) {
+                                       throw new IllegalStateException(t);
+                               } finally {
+                                       clusters.csLRU.releaseMutex();
+                               }
+
+                               TimeLogger.log("Performed updates");
+
+                       }
+
+               } catch (Throwable t) {
+                       t.printStackTrace();
+               } finally {
+                       deathBarrier.release();
+               }
+
+       }
+
+       /*
+        * Mutex for streamLRU is assumed here
+        * 
+        */
+       private void swapChunks() {
+
+               // Cache chunks during update operations
+               boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+               while(written) {
+                       written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+               }
+
+       }
+
+       private void swapCS() {
+
+               // Cache chunks during update operations
+               boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+               while(written) {
+                       written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+               }
+
+       }
+
+       public synchronized void committed() {
+
+               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+        if (!alive) {
+            System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
+//          return;
+        }
+               if(last != null) last.commit();
+
+       }
+
+       public synchronized void schedule(ClusterUpdateOperation operation) {
+           if (!alive) {
+               System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
+//             return;
+           }
+               clusters.streamLRU.acquireMutex();
+
+               try {
+
+                       ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+                       if(last == null || last.isCommitted()) {
+                               String id = "" + currentChangeSetId + "-" + nextChunkId++;
+                               last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
+                               operations.add(last);
+                       }
+
+                       String chunkId = last.getKey();
+                       int chunkOffset = last.operations.size();
+                       operation.scheduled(chunkId + "." + chunkOffset);
+
+                       last.addOperation(operation);
+
+                       swapChunks();
+
+                       notifyAll();
+
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+
+                       clusters.streamLRU.releaseMutex();
+
+               }
+
+       }
+
+    @Override
+    public void close() {
+        alive = false;
+        synchronized (this) {
+            notifyAll();
+        }
+        try {
+            deathBarrier.acquire();
+        } catch (InterruptedException e) {
+        }
+
+        for (ExecutorService executor : clusterUpdateThreads)
+            executor.shutdown();
+
+        for (int i = 0; i < clusterUpdateThreads.length; i++) {
+            try {
+                ExecutorService executor  = clusterUpdateThreads[i];
+                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+                clusterUpdateThreads[i] = null;
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}