X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FMainProgram.java;h=b5ba2471b486b10ce91f169c2797cc0673ddc276;hb=d2682c13178d3374794aab7637a4de371e9bef00;hp=78ff9e8995c4162fc347edc9b51bd6f7c69ec4ff;hpb=602614f4502aae85ecf3967abb7152d2d62903e3;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index 78ff9e899..b5ba2471b 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -20,9 +20,13 @@ import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.simantics.utils.logging.TimeLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); + private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; @@ -124,7 +128,7 @@ public class MainProgram implements Runnable, Closeable { try { long start = System.nanoTime(); mutex.release(); - MainProgram.this.wait(5000); + MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed mutex.acquire(); if (!alive) break main; @@ -250,7 +254,45 @@ public class MainProgram implements Runnable, Closeable { deathBarrier.release(); } } + + static interface MainProgramRunnable { + + public void run() throws Exception; + public void done(); + + } + public Exception runIdle(MainProgramRunnable runnable) { + try { + long startTime = System.currentTimeMillis(); + while (true) { + boolean hasMutex = false; + try { + synchronized (MainProgram.this) { + if (hasMutex = mutex.tryAcquire()) { + if (operations.isEmpty()) { + runnable.run(); + return null; + } + } + } + long endTime = System.currentTimeMillis(); + if ((endTime - startTime) > 100) { + startTime = endTime; + LOGGER.info("MainProgram.runIdle() retry mutex acquire!"); + } + } catch (Exception e) { + return e; + } finally { + if (hasMutex) + mutex.release(); + } + } + } finally { + runnable.done(); + } + } + /* * Mutex for streamLRU is assumed here * @@ -258,35 +300,32 @@ public class MainProgram implements Runnable, Closeable { private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations - boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - } + while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations - boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - } + while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } public synchronized void committed() { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if (!alive) { - System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last); + LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); // return; } - if(last != null) last.commit(); + if(last != null) { + last.commit(); + notifyAll(); + } } public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { if (!alive) { - System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation); + LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); // return; } clusters.streamLRU.acquireMutex();