X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FMainProgram.java;h=78ff9e8995c4162fc347edc9b51bd6f7c69ec4ff;hp=f39a4987d2ecc27e77843d256e2106d7f2d3e2a3;hb=d9a9d77c7024260e3e3b3a8558ddc84b2a8b99b3;hpb=0144ba232323a0e4f7ec8fe0681a150faafd7caf diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index f39a4987d..78ff9e899 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -7,12 +7,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; @@ -167,9 +170,7 @@ public class MainProgram implements Runnable, Closeable { } catch (InterruptedException e) { e.printStackTrace(); } - } - } // long sss = System.nanoTime(); @@ -191,19 +192,27 @@ public class MainProgram implements Runnable, Closeable { final List ops = updateSchedules[i]; if (!ops.isEmpty()) { acquireAmount++; - clusterUpdateThreads[i].execute(() -> { - - //long st = System.nanoTime(); - for(ClusterUpdateOperation op : ops) { - op.run(); - } - s.release(); - // long duration = System.nanoTime()-st; - // elapsed.addAndGet(duration); - // double dur = 1e-9*duration; - // if(dur > 0.05) - // System.err.println("duration=" + dur + "s. " + ops.size()); - }); + clusterUpdateThreads[i].submit(new Callable() { + + @Override + public Object call() throws Exception { + //long st = System.nanoTime(); + try { + for(ClusterUpdateOperation op : ops) { + op.run(); + } + } finally { + s.release(); + } + return null; + + // long duration = System.nanoTime()-st; + // elapsed.addAndGet(duration); + // double dur = 1e-9*duration; + // if(dur > 0.05) + // System.err.println("duration=" + dur + "s. " + ops.size()); + } + }); } } @@ -217,8 +226,8 @@ public class MainProgram implements Runnable, Closeable { clusters.streamLRU.acquireMutex(); try { swapChunks(); - } catch (Throwable t) { - throw new IllegalStateException(t); + } catch (AcornAccessVerificationException | IllegalAcornStateException e) { + e.printStackTrace(); } finally { clusters.streamLRU.releaseMutex(); } @@ -226,7 +235,7 @@ public class MainProgram implements Runnable, Closeable { try { swapCS(); } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { clusters.csLRU.releaseMutex(); } @@ -240,31 +249,28 @@ public class MainProgram implements Runnable, Closeable { } finally { deathBarrier.release(); } - } /* * Mutex for streamLRU is assumed here * */ - private void swapChunks() { + private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } - } - private void swapCS() { + private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } - } public synchronized void committed() { @@ -278,7 +284,7 @@ public class MainProgram implements Runnable, Closeable { } - public synchronized void schedule(ClusterUpdateOperation operation) { + public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { if (!alive) { System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation); // return; @@ -303,15 +309,13 @@ public class MainProgram implements Runnable, Closeable { swapChunks(); notifyAll(); - + } catch (IllegalAcornStateException e) { + throw e; } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { - clusters.streamLRU.releaseMutex(); - } - } @Override