X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FMainProgram.java;h=1e4b5cbac508623fff9fd9d5017043818ce1ee42;hp=f39a4987d2ecc27e77843d256e2106d7f2d3e2a3;hb=0df63f38249cbbc584d3a130773680c309fb1f71;hpb=749a1ea2c0a66ae41aad617d1fd81b61cbdb7685 diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index f39a4987d..1e4b5cbac 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -1,41 +1,47 @@ package org.simantics.acorn; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.simantics.utils.logging.TimeLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); + private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; private final GraphClientImpl2 client; - private final ClusterManager clusters; private final ExecutorService[] clusterUpdateThreads; private final List[] updateSchedules; - - private int residentOperationBytes = 0; - private long currentChangeSetId = -1; - private int nextChunkId = 0; + + private Thread mainProgramThread; + private boolean alive = true; private Semaphore deathBarrier = new Semaphore(0); - final Semaphore mutex = new Semaphore(1); - final LinkedList operations = new LinkedList<>(); + final ClusterManager clusters; + + private final OperationQueue operationQueue = new OperationQueue(this); static class ClusterThreadFactory implements ThreadFactory { @@ -55,7 +61,8 @@ public class MainProgram implements Runnable, Closeable { } } - public MainProgram(GraphClientImpl2 client, ClusterManager clusters) { + @SuppressWarnings("unchecked") + MainProgram(GraphClientImpl2 client, ClusterManager clusters) { this.client = client; this.clusters = clusters; @@ -67,9 +74,8 @@ public class MainProgram implements Runnable, Closeable { } } - public void startTransaction(long id) { - currentChangeSetId = id; - nextChunkId = 0; + void startTransaction(long id) { + operationQueue.startTransaction(id); } private static Comparator clusterComparator = new Comparator() { @@ -82,132 +88,35 @@ public class MainProgram implements Runnable, Closeable { @Override public void run() { + + mainProgramThread = Thread.currentThread(); + try { - mutex.acquire(); main: while(alive) { TreeMap> updates = new TreeMap>(clusterComparator); - synchronized(MainProgram.this) { + operationQueue.pumpUpdates(updates); - while(!operations.isEmpty() && updates.size() < 100) { + if(updates.isEmpty()) { - ClusterStreamChunk chunk = operations.pollFirst(); - - for(int i=chunk.nextToProcess;i ops = updates.get(uid); - if(ops == null) { - ops = new ArrayList(); - updates.put(uid, ops); - } - ops.add(o); - } - - chunk.nextToProcess = chunk.operations.size(); - - if(!chunk.isCommitted()) { - assert(operations.isEmpty()); - operations.add(chunk); - break; - } - - } + long duration = operationQueue.waitFor(); - if(updates.isEmpty()) { - try { - long start = System.nanoTime(); - mutex.release(); - MainProgram.this.wait(5000); - mutex.acquire(); - if (!alive) - break main; - long duration = System.nanoTime()-start; - if(duration > 4000000000L) { - - // Was this a time-out or a new stream request? - if(operations.isEmpty()) { - - /* - * We are idling here. - * Flush all caches gradually - */ - - // Write pending cs to disk - boolean written = clusters.csLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.csLRU.swapForced(); - } - // Write pending chunks to disk - written = clusters.streamLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.streamLRU.swapForced(); - } - // Write pending files to disk - written = clusters.fileLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.fileLRU.swapForced(); - } - // Write pending clusters to disk - written = clusters.clusterLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.clusterLRU.swapForced(); - } - - client.tryMakeSnapshot(); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } + if (!alive) + break main; + if(duration > 4000000000L) { + checkIdle(); } - + } // long sss = System.nanoTime(); - for(int i=0;i> entry : updates.entrySet()) { - ClusterUID key = entry.getKey(); - int hash = key.hashCode() & (clusterUpdateThreads.length-1); - updateSchedules[hash].addAll(entry.getValue()); - } - - // final AtomicLong elapsed = new AtomicLong(0); - int acquireAmount = 0; - for(int i=0;i ops = updateSchedules[i]; - if (!ops.isEmpty()) { - acquireAmount++; - clusterUpdateThreads[i].execute(() -> { - - //long st = System.nanoTime(); - for(ClusterUpdateOperation op : ops) { - op.run(); - } - s.release(); - // long duration = System.nanoTime()-st; - // elapsed.addAndGet(duration); - // double dur = 1e-9*duration; - // if(dur > 0.05) - // System.err.println("duration=" + dur + "s. " + ops.size()); - }); - } - } - - s.acquire(acquireAmount); + runUpdates(updates); + runTasksIfEmpty(); /* * Here we are actively processing updates from client. @@ -217,8 +126,8 @@ public class MainProgram implements Runnable, Closeable { clusters.streamLRU.acquireMutex(); try { swapChunks(); - } catch (Throwable t) { - throw new IllegalStateException(t); + } catch (AcornAccessVerificationException | IllegalAcornStateException e) { + LOGGER.error("cluster chunk swapping failed", e); } finally { clusters.streamLRU.releaseMutex(); } @@ -226,7 +135,7 @@ public class MainProgram implements Runnable, Closeable { try { swapCS(); } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { clusters.csLRU.releaseMutex(); } @@ -236,90 +145,202 @@ public class MainProgram implements Runnable, Closeable { } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("FATAL: MainProgram died unexpectedly", t); } finally { deathBarrier.release(); } + } + + @FunctionalInterface + static interface MainProgramRunnable { + void run() throws Exception; + default void error(Exception e) { + LOGGER.error("An error occured", e); + } + default void success() {} + } + + private void runUpdates(TreeMap> updates) throws InterruptedException { + + for(int i=0;i> entry : updates.entrySet()) { + ClusterUID key = entry.getKey(); + int hash = key.hashCode() & (clusterUpdateThreads.length-1); + updateSchedules[hash].addAll(entry.getValue()); + } + + // final AtomicLong elapsed = new AtomicLong(0); + int acquireAmount = 0; + for(int i=0;i ops = updateSchedules[i]; + if (!ops.isEmpty()) { + acquireAmount++; + clusterUpdateThreads[i].submit(new Callable() { + + @Override + public Object call() throws Exception { + //long st = System.nanoTime(); + try { + for(ClusterUpdateOperation op : ops) { + op.run(); + } + } finally { + s.release(); + } + return null; + +// long duration = System.nanoTime()-st; +// elapsed.addAndGet(duration); +// double dur = 1e-9*duration; +// if(dur > 0.05) +// System.err.println("duration=" + dur + "s. " + ops.size()); + } + }); + } + } + + s.acquire(acquireAmount); } /* - * Mutex for streamLRU is assumed here - * + * This shall be run when no updates are currently available. */ - private void swapChunks() { + private void runTasksIfEmpty() { + if(operationQueue.isEmpty()) { + List todo = new ArrayList<>(); + operationQueue.pumpTasks(todo); + for(MainProgramRunnable runnable : todo) { + try { + runnable.run(); + runnable.success(); + } catch (Exception e) { + runnable.error(e); + } + } + } + } + + /* + * This gets called when an idle period has been detected + */ + private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + // Was this a time-out or a new stream request? + if(operationQueue.isEmpty()) { + + /* + * We are idling here. + * Flush all caches gradually + */ + + // Write pending cs to disk + boolean written = clusters.csLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.csLRU.swapForced(); + } + // Write pending chunks to disk + written = clusters.streamLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.streamLRU.swapForced(); + } + // Write pending files to disk + written = clusters.fileLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.fileLRU.swapForced(); + } + // Write pending clusters to disk + written = clusters.clusterLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.clusterLRU.swapForced(); + } + + client.tryMakeSnapshot(); - // Cache chunks during update operations - boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } } - private void swapCS() { + + /* + * This schedules tasks to be run in MainProgram thread + * Called from other threads than MainProgram thread + * + */ + void runIdle(MainProgramRunnable task) { + operationQueue.scheduleTask(task); + } + + /* + * Mutex for streamLRU is assumed here + * + */ + private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations - boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - while(written) { - written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); - } + while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); + } + private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { + + // Cache chunks during update operations + while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } - public synchronized void committed() { + /* + * Called by DB client write threads + */ + void committed() { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + ClusterStreamChunk last = operationQueue.commitLast(); if (!alive) { - System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last); -// return; + LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); } - if(last != null) last.commit(); } - public synchronized void schedule(ClusterUpdateOperation operation) { - if (!alive) { - System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation); -// return; + /* + * Called by DB client write threads + */ + void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { + + if (!alive) { + LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); } + clusters.streamLRU.acquireMutex(); try { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); - if(last == null || last.isCommitted()) { - String id = "" + currentChangeSetId + "-" + nextChunkId++; - last = new ClusterStreamChunk(clusters, clusters.streamLRU, id); - operations.add(last); - } - - String chunkId = last.getKey(); - int chunkOffset = last.operations.size(); - operation.scheduled(chunkId + "." + chunkOffset); - - last.addOperation(operation); - + operationQueue.scheduleUpdate(operation); swapChunks(); - notifyAll(); - + } catch (IllegalAcornStateException e) { + throw e; } catch (Throwable t) { - throw new IllegalStateException(t); + throw new IllegalAcornStateException(t); } finally { - clusters.streamLRU.releaseMutex(); - } } @Override public void close() { + alive = false; - synchronized (this) { - notifyAll(); - } + + // This will wake up the sleeping beauty + operationQueue.scheduleTask(() -> {}); + try { deathBarrier.acquire(); } catch (InterruptedException e) { @@ -334,9 +355,17 @@ public class MainProgram implements Runnable, Closeable { executor.awaitTermination(500, TimeUnit.MILLISECONDS); clusterUpdateThreads[i] = null; } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e); } } } + void assertMainProgramThread() { + assert(Thread.currentThread().equals(mainProgramThread)); + } + + void assertNoMainProgramThread() { + assert(!Thread.currentThread().equals(mainProgramThread)); + } + }