X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FMainProgram.java;h=1e4b5cbac508623fff9fd9d5017043818ce1ee42;hp=c69c7bea6fe7957bf3c408256a6d9f5cbfed16e4;hb=0df63f38249cbbc584d3a130773680c309fb1f71;hpb=fe1a2f532761669e67da4db4ae15096ced8a04db diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index c69c7bea6..1e4b5cbac 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -1,9 +1,9 @@ package org.simantics.acorn; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -26,23 +26,22 @@ import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); - + private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; private final GraphClientImpl2 client; - private final ClusterManager clusters; private final ExecutorService[] clusterUpdateThreads; private final List[] updateSchedules; - - private int residentOperationBytes = 0; - private long currentChangeSetId = -1; - private int nextChunkId = 0; + + private Thread mainProgramThread; + private boolean alive = true; private Semaphore deathBarrier = new Semaphore(0); - final Semaphore mutex = new Semaphore(1); - final LinkedList operations = new LinkedList<>(); + final ClusterManager clusters; + + private final OperationQueue operationQueue = new OperationQueue(this); static class ClusterThreadFactory implements ThreadFactory { @@ -62,7 +61,8 @@ public class MainProgram implements Runnable, Closeable { } } - public MainProgram(GraphClientImpl2 client, ClusterManager clusters) { + @SuppressWarnings("unchecked") + MainProgram(GraphClientImpl2 client, ClusterManager clusters) { this.client = client; this.clusters = clusters; @@ -74,9 +74,8 @@ public class MainProgram implements Runnable, Closeable { } } - public void startTransaction(long id) { - currentChangeSetId = id; - nextChunkId = 0; + void startTransaction(long id) { + operationQueue.startTransaction(id); } private static Comparator clusterComparator = new Comparator() { @@ -89,138 +88,35 @@ public class MainProgram implements Runnable, Closeable { @Override public void run() { + + mainProgramThread = Thread.currentThread(); + try { - mutex.acquire(); main: while(alive) { TreeMap> updates = new TreeMap>(clusterComparator); - synchronized(MainProgram.this) { + operationQueue.pumpUpdates(updates); - while(!operations.isEmpty() && updates.size() < 100) { + if(updates.isEmpty()) { - ClusterStreamChunk chunk = operations.pollFirst(); + long duration = operationQueue.waitFor(); - for(int i=chunk.nextToProcess;i ops = updates.get(uid); - if(ops == null) { - ops = new ArrayList(); - updates.put(uid, ops); - } - ops.add(o); - } - - chunk.nextToProcess = chunk.operations.size(); - - if(!chunk.isCommitted()) { - assert(operations.isEmpty()); - operations.add(chunk); - break; - } + if (!alive) + break main; + if(duration > 4000000000L) { + checkIdle(); } - - if(updates.isEmpty()) { - try { - long start = System.nanoTime(); - mutex.release(); - MainProgram.this.wait(5000); - mutex.acquire(); - if (!alive) - break main; - long duration = System.nanoTime()-start; - if(duration > 4000000000L) { - - // Was this a time-out or a new stream request? - if(operations.isEmpty()) { - - /* - * We are idling here. - * Flush all caches gradually - */ - - // Write pending cs to disk - boolean written = clusters.csLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.csLRU.swapForced(); - } - // Write pending chunks to disk - written = clusters.streamLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.streamLRU.swapForced(); - } - // Write pending files to disk - written = clusters.fileLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.fileLRU.swapForced(); - } - // Write pending clusters to disk - written = clusters.clusterLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.clusterLRU.swapForced(); - } - - client.tryMakeSnapshot(); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + } // long sss = System.nanoTime(); - for(int i=0;i> entry : updates.entrySet()) { - ClusterUID key = entry.getKey(); - int hash = key.hashCode() & (clusterUpdateThreads.length-1); - updateSchedules[hash].addAll(entry.getValue()); - } - - // final AtomicLong elapsed = new AtomicLong(0); - int acquireAmount = 0; - for(int i=0;i ops = updateSchedules[i]; - if (!ops.isEmpty()) { - acquireAmount++; - clusterUpdateThreads[i].submit(new Callable() { - - @Override - public Object call() throws Exception { - //long st = System.nanoTime(); - try { - for(ClusterUpdateOperation op : ops) { - op.run(); - } - } finally { - s.release(); - } - return null; - - // long duration = System.nanoTime()-st; - // elapsed.addAndGet(duration); - // double dur = 1e-9*duration; - // if(dur > 0.05) - // System.err.println("duration=" + dur + "s. " + ops.size()); - } - }); - } - } - - s.acquire(acquireAmount); + runUpdates(updates); + runTasksIfEmpty(); /* * Here we are actively processing updates from client. @@ -231,7 +127,7 @@ public class MainProgram implements Runnable, Closeable { try { swapChunks(); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { - e.printStackTrace(); + LOGGER.error("cluster chunk swapping failed", e); } finally { clusters.streamLRU.releaseMutex(); } @@ -249,108 +145,184 @@ public class MainProgram implements Runnable, Closeable { } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("FATAL: MainProgram died unexpectedly", t); } finally { deathBarrier.release(); } } - + + @FunctionalInterface static interface MainProgramRunnable { - - public void run() throws Exception; - public void done(); - + void run() throws Exception; + default void error(Exception e) { + LOGGER.error("An error occured", e); + } + default void success() {} } - public Exception runIdle(MainProgramRunnable runnable) { - try { - long startTime = System.currentTimeMillis(); - while (true) { - boolean hasMutex = false; + private void runUpdates(TreeMap> updates) throws InterruptedException { + + for(int i=0;i> entry : updates.entrySet()) { + ClusterUID key = entry.getKey(); + int hash = key.hashCode() & (clusterUpdateThreads.length-1); + updateSchedules[hash].addAll(entry.getValue()); + } + + // final AtomicLong elapsed = new AtomicLong(0); + int acquireAmount = 0; + for(int i=0;i ops = updateSchedules[i]; + if (!ops.isEmpty()) { + acquireAmount++; + clusterUpdateThreads[i].submit(new Callable() { + + @Override + public Object call() throws Exception { + //long st = System.nanoTime(); + try { + for(ClusterUpdateOperation op : ops) { + op.run(); + } + } finally { + s.release(); + } + return null; + +// long duration = System.nanoTime()-st; +// elapsed.addAndGet(duration); +// double dur = 1e-9*duration; +// if(dur > 0.05) +// System.err.println("duration=" + dur + "s. " + ops.size()); + } + }); + } + } + + s.acquire(acquireAmount); + + } + + /* + * This shall be run when no updates are currently available. + */ + private void runTasksIfEmpty() { + if(operationQueue.isEmpty()) { + List todo = new ArrayList<>(); + operationQueue.pumpTasks(todo); + for(MainProgramRunnable runnable : todo) { try { - synchronized (MainProgram.this) { - if (hasMutex = mutex.tryAcquire()) { - if (operations.isEmpty()) { - runnable.run(); - return null; - } - } - } - long endTime = System.currentTimeMillis(); - if ((endTime - startTime) > 100) { - startTime = endTime; - LOGGER.info("MainProgram.runIdle() retry mutex acquire!"); - } + runnable.run(); + runnable.success(); } catch (Exception e) { - return e; - } finally { - if (hasMutex) - mutex.release(); + runnable.error(e); } } - } finally { - runnable.done(); } } - + + /* + * This gets called when an idle period has been detected + */ + private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + // Was this a time-out or a new stream request? + if(operationQueue.isEmpty()) { + + /* + * We are idling here. + * Flush all caches gradually + */ + + // Write pending cs to disk + boolean written = clusters.csLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.csLRU.swapForced(); + } + // Write pending chunks to disk + written = clusters.streamLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.streamLRU.swapForced(); + } + // Write pending files to disk + written = clusters.fileLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.fileLRU.swapForced(); + } + // Write pending clusters to disk + written = clusters.clusterLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.clusterLRU.swapForced(); + } + + client.tryMakeSnapshot(); + + } + + } + + + /* + * This schedules tasks to be run in MainProgram thread + * Called from other threads than MainProgram thread + * + */ + void runIdle(MainProgramRunnable task) { + operationQueue.scheduleTask(task); + } + /* * Mutex for streamLRU is assumed here - * + * */ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations - boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - } + while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations - boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - } + while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } - public synchronized void committed() { + /* + * Called by DB client write threads + */ + void committed() { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + ClusterStreamChunk last = operationQueue.commitLast(); if (!alive) { LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); -// return; } - if(last != null) last.commit(); } - public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { - if (!alive) { + /* + * Called by DB client write threads + */ + void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { + + if (!alive) { LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); -// return; } + clusters.streamLRU.acquireMutex(); try { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); - if(last == null || last.isCommitted()) { - String id = "" + currentChangeSetId + "-" + nextChunkId++; - last = new ClusterStreamChunk(clusters, clusters.streamLRU, id); - operations.add(last); - } - - String chunkId = last.getKey(); - int chunkOffset = last.operations.size(); - operation.scheduled(chunkId + "." + chunkOffset); - - last.addOperation(operation); - + operationQueue.scheduleUpdate(operation); swapChunks(); - notifyAll(); } catch (IllegalAcornStateException e) { throw e; } catch (Throwable t) { @@ -358,14 +330,17 @@ public class MainProgram implements Runnable, Closeable { } finally { clusters.streamLRU.releaseMutex(); } + } @Override public void close() { + alive = false; - synchronized (this) { - notifyAll(); - } + + // This will wake up the sleeping beauty + operationQueue.scheduleTask(() -> {}); + try { deathBarrier.acquire(); } catch (InterruptedException e) { @@ -380,9 +355,17 @@ public class MainProgram implements Runnable, Closeable { executor.awaitTermination(500, TimeUnit.MILLISECONDS); clusterUpdateThreads[i] = null; } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e); } } } + void assertMainProgramThread() { + assert(Thread.currentThread().equals(mainProgramThread)); + } + + void assertNoMainProgramThread() { + assert(!Thread.currentThread().equals(mainProgramThread)); + } + }