package org.simantics.acorn; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.simantics.utils.logging.TimeLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; private final GraphClientImpl2 client; private final ExecutorService[] clusterUpdateThreads; private final List[] updateSchedules; private Thread mainProgramThread; private boolean alive = true; private Semaphore deathBarrier = new Semaphore(0); final ClusterManager clusters; private final OperationQueue operationQueue = new OperationQueue(this); static class ClusterThreadFactory implements ThreadFactory { final String name; final boolean daemon; public ClusterThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); thread.setDaemon(daemon); return thread; } } @SuppressWarnings("unchecked") MainProgram(GraphClientImpl2 client, ClusterManager clusters) { this.client = client; this.clusters = clusters; this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS]; this.updateSchedules = new ArrayList[CLUSTER_THREADS]; for(int i=0;i(); } } void startTransaction(long id) { operationQueue.startTransaction(id); } private static Comparator clusterComparator = new Comparator() { @Override public int compare(ClusterUID o1, ClusterUID o2) { return Long.compare(o1.second, o2.second); } }; @Override public void run() { mainProgramThread = Thread.currentThread(); try { TreeMap> updates = new TreeMap<>(clusterComparator); main: while(alive) { if (!updates.isEmpty()) updates.clear(); operationQueue.pumpUpdates(updates); if(updates.isEmpty()) { long duration = operationQueue.waitFor(); if (!alive) break main; if(duration > 4000000000L) { checkIdle(); } } // long sss = System.nanoTime(); runUpdates(updates); runTasksIfEmpty(); /* * Here we are actively processing updates from client. * Maintain necessary caching here. */ clusters.streamLRU.acquireMutex(); try { swapChunks(); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { LOGGER.error("cluster chunk swapping failed", e); } finally { clusters.streamLRU.releaseMutex(); } clusters.csLRU.acquireMutex(); try { swapCS(); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { clusters.csLRU.releaseMutex(); } TimeLogger.log("Performed updates"); } } catch (Throwable t) { LOGGER.error("FATAL: MainProgram died unexpectedly", t); } finally { deathBarrier.release(); } } @FunctionalInterface static interface MainProgramRunnable { void run() throws Exception; default void error(Exception e) { LOGGER.error("An error occured", e); } default void success() {} } private void runUpdates(TreeMap> updates) throws InterruptedException { for(int i=0;i> entry : updates.entrySet()) { ClusterUID key = entry.getKey(); int hash = key.hashCode() & (clusterUpdateThreads.length-1); updateSchedules[hash].addAll(entry.getValue()); } // final AtomicLong elapsed = new AtomicLong(0); int acquireAmount = 0; for(int i=0;i ops = updateSchedules[i]; if (!ops.isEmpty()) { acquireAmount++; clusterUpdateThreads[i].submit(new Callable() { @Override public Object call() throws Exception { //long st = System.nanoTime(); try { for(ClusterUpdateOperation op : ops) { op.run(); } } finally { s.release(); } return null; // long duration = System.nanoTime()-st; // elapsed.addAndGet(duration); // double dur = 1e-9*duration; // if(dur > 0.05) // System.err.println("duration=" + dur + "s. " + ops.size()); } }); } } s.acquire(acquireAmount); } /* * This shall be run when no updates are currently available. */ private void runTasksIfEmpty() { if(operationQueue.isEmpty()) { List todo = new ArrayList<>(); operationQueue.pumpTasks(todo); for(MainProgramRunnable runnable : todo) { try { runnable.run(); runnable.success(); } catch (Exception e) { runnable.error(e); } } } } /* * This gets called when an idle period has been detected */ private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { // Was this a time-out or a new stream request? if(operationQueue.isEmpty()) { /* * We are idling here. * Flush all caches gradually */ // Write pending cs to disk boolean written = clusters.csLRU.swapForced(); while(written) { if(!operationQueue.isEmpty()) break; written = clusters.csLRU.swapForced(); } // Write pending chunks to disk written = clusters.streamLRU.swapForced(); while(written) { if(!operationQueue.isEmpty()) break; written = clusters.streamLRU.swapForced(); } // Write pending files to disk written = clusters.fileLRU.swapForced(); while(written) { if(!operationQueue.isEmpty()) break; written = clusters.fileLRU.swapForced(); } // Write pending clusters to disk written = clusters.clusterLRU.swapForced(); while(written) { if(!operationQueue.isEmpty()) break; written = clusters.clusterLRU.swapForced(); } client.tryMakeSnapshot(); } } /* * This schedules tasks to be run in MainProgram thread * Called from other threads than MainProgram thread * */ void runIdle(MainProgramRunnable task) { operationQueue.scheduleTask(task); } /* * Mutex for streamLRU is assumed here * */ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException { // Cache chunks during update operations while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } /* * Called by DB client write threads */ void committed() { ClusterStreamChunk last = operationQueue.commitLast(); if (!alive) { LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); } } /* * Called by DB client write threads */ void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { if (!alive) { LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); } clusters.streamLRU.acquireMutex(); try { operationQueue.scheduleUpdate(operation); swapChunks(); } catch (IllegalAcornStateException e) { throw e; } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { clusters.streamLRU.releaseMutex(); } } @Override public void close() { alive = false; // This will wake up the sleeping beauty operationQueue.scheduleTask(() -> {}); try { deathBarrier.acquire(); } catch (InterruptedException e) { } for (ExecutorService executor : clusterUpdateThreads) executor.shutdown(); for (int i = 0; i < clusterUpdateThreads.length; i++) { try { ExecutorService executor = clusterUpdateThreads[i]; executor.awaitTermination(500, TimeUnit.MILLISECONDS); clusterUpdateThreads[i] = null; } catch (InterruptedException e) { LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e); } } } void assertMainProgramThread() { assert(Thread.currentThread().equals(mainProgramThread)); } void assertNoMainProgramThread() { assert(!Thread.currentThread().equals(mainProgramThread)); } }