X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FMainProgram.java;h=c69c7bea6fe7957bf3c408256a6d9f5cbfed16e4;hp=f39a4987d2ecc27e77843d256e2106d7f2d3e2a3;hb=23fa194c0cf85db6c435f342600d30ddc0f8c620;hpb=23ea9f9235f428865dc4cd3bff6d3dd9e3174653 diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index f39a4987d..c69c7bea6 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -7,19 +7,26 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.simantics.utils.logging.TimeLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); + private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; @@ -167,9 +174,7 @@ public class MainProgram implements Runnable, Closeable { } catch (InterruptedException e) { e.printStackTrace(); } - } - } // long sss = System.nanoTime(); @@ -191,19 +196,27 @@ public class MainProgram implements Runnable, Closeable { final List ops = updateSchedules[i]; if (!ops.isEmpty()) { acquireAmount++; - clusterUpdateThreads[i].execute(() -> { - - //long st = System.nanoTime(); - for(ClusterUpdateOperation op : ops) { - op.run(); - } - s.release(); - // long duration = System.nanoTime()-st; - // elapsed.addAndGet(duration); - // double dur = 1e-9*duration; - // if(dur > 0.05) - // System.err.println("duration=" + dur + "s. " + ops.size()); - }); + clusterUpdateThreads[i].submit(new Callable() { + + @Override + public Object call() throws Exception { + //long st = System.nanoTime(); + try { + for(ClusterUpdateOperation op : ops) { + op.run(); + } + } finally { + s.release(); + } + return null; + + // long duration = System.nanoTime()-st; + // elapsed.addAndGet(duration); + // double dur = 1e-9*duration; + // if(dur > 0.05) + // System.err.println("duration=" + dur + "s. " + ops.size()); + } + }); } } @@ -217,8 +230,8 @@ public class MainProgram implements Runnable, Closeable { clusters.streamLRU.acquireMutex(); try { swapChunks(); - } catch (Throwable t) { - throw new IllegalStateException(t); + } catch (AcornAccessVerificationException | IllegalAcornStateException e) { + e.printStackTrace(); } finally { clusters.streamLRU.releaseMutex(); } @@ -226,7 +239,7 @@ public class MainProgram implements Runnable, Closeable { try { swapCS(); } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { clusters.csLRU.releaseMutex(); } @@ -240,47 +253,82 @@ public class MainProgram implements Runnable, Closeable { } finally { deathBarrier.release(); } - + } + + static interface MainProgramRunnable { + + public void run() throws Exception; + public void done(); + } + public Exception runIdle(MainProgramRunnable runnable) { + try { + long startTime = System.currentTimeMillis(); + while (true) { + boolean hasMutex = false; + try { + synchronized (MainProgram.this) { + if (hasMutex = mutex.tryAcquire()) { + if (operations.isEmpty()) { + runnable.run(); + return null; + } + } + } + long endTime = System.currentTimeMillis(); + if ((endTime - startTime) > 100) { + startTime = endTime; + LOGGER.info("MainProgram.runIdle() retry mutex acquire!"); + } + } catch (Exception e) { + return e; + } finally { + if (hasMutex) + mutex.release(); + } + } + } finally { + runnable.done(); + } + } + /* * Mutex for streamLRU is assumed here * */ - private void swapChunks() { + private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } - } - private void swapCS() { + private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } - } public synchronized void committed() { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if (!alive) { - System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last); + LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); // return; } if(last != null) last.commit(); } - public synchronized void schedule(ClusterUpdateOperation operation) { + public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { if (!alive) { - System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation); + LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); // return; } clusters.streamLRU.acquireMutex(); @@ -303,15 +351,13 @@ public class MainProgram implements Runnable, Closeable { swapChunks(); notifyAll(); - + } catch (IllegalAcornStateException e) { + throw e; } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { - clusters.streamLRU.releaseMutex(); - } - } @Override