package org.simantics.acorn;
import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
import org.simantics.db.service.ClusterUID;
import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MainProgram implements Runnable, Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
+
private static final int CLUSTER_THREADS = 4;
private static final int CHUNK_CACHE_SIZE = 100;
private final GraphClientImpl2 client;
- private final ClusterManager clusters;
private final ExecutorService[] clusterUpdateThreads;
private final List<ClusterUpdateOperation>[] updateSchedules;
-
- private int residentOperationBytes = 0;
- private long currentChangeSetId = -1;
- private int nextChunkId = 0;
+
+ private Thread mainProgramThread;
+
private boolean alive = true;
private Semaphore deathBarrier = new Semaphore(0);
- final Semaphore mutex = new Semaphore(1);
- final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+ final ClusterManager clusters;
+
+ private final OperationQueue operationQueue = new OperationQueue(this);
static class ClusterThreadFactory implements ThreadFactory {
}
}
- public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+ @SuppressWarnings("unchecked")
+ MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
this.client = client;
this.clusters = clusters;
}
}
- public void startTransaction(long id) {
- currentChangeSetId = id;
- nextChunkId = 0;
+ void startTransaction(long id) {
+ operationQueue.startTransaction(id);
}
private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
@Override
public void run() {
+
+ mainProgramThread = Thread.currentThread();
+
try {
- mutex.acquire();
main:
while(alive) {
TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
- synchronized(MainProgram.this) {
+ operationQueue.pumpUpdates(updates);
- while(!operations.isEmpty() && updates.size() < 100) {
+ if(updates.isEmpty()) {
- ClusterStreamChunk chunk = operations.pollFirst();
+ long start = System.nanoTime();
- for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
- ClusterUpdateOperation o = chunk.operations.get(i);
- ClusterUID uid = o.uid;
- List<ClusterUpdateOperation> ops = updates.get(uid);
- if(ops == null) {
- ops = new ArrayList<ClusterUpdateOperation>();
- updates.put(uid, ops);
- }
- ops.add(o);
- }
+ operationQueue.waitFor();
- chunk.nextToProcess = chunk.operations.size();
-
- if(!chunk.isCommitted()) {
- assert(operations.isEmpty());
- operations.add(chunk);
- break;
- }
+ if (!alive)
+ break main;
+ long duration = System.nanoTime()-start;
+ if(duration > 4000000000L) {
+ checkIdle();
}
-
- if(updates.isEmpty()) {
- try {
- long start = System.nanoTime();
- mutex.release();
- MainProgram.this.wait(5000);
- mutex.acquire();
- if (!alive)
- break main;
- long duration = System.nanoTime()-start;
- if(duration > 4000000000L) {
-
- // Was this a time-out or a new stream request?
- if(operations.isEmpty()) {
-
- /*
- * We are idling here.
- * Flush all caches gradually
- */
-
- // Write pending cs to disk
- boolean written = clusters.csLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.csLRU.swapForced();
- }
- // Write pending chunks to disk
- written = clusters.streamLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.streamLRU.swapForced();
- }
- // Write pending files to disk
- written = clusters.fileLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.fileLRU.swapForced();
- }
- // Write pending clusters to disk
- written = clusters.clusterLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.clusterLRU.swapForced();
- }
-
- client.tryMakeSnapshot();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
-
}
// long sss = System.nanoTime();
- for(int i=0;i<CLUSTER_THREADS;i++)
- updateSchedules[i].clear();
-
- final Semaphore s = new Semaphore(0);
-
- for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
- ClusterUID key = entry.getKey();
- int hash = key.hashCode() & (clusterUpdateThreads.length-1);
- updateSchedules[hash].addAll(entry.getValue());
- }
-
- // final AtomicLong elapsed = new AtomicLong(0);
- int acquireAmount = 0;
- for(int i=0;i<CLUSTER_THREADS;i++) {
- final List<ClusterUpdateOperation> ops = updateSchedules[i];
- if (!ops.isEmpty()) {
- acquireAmount++;
- clusterUpdateThreads[i].execute(() -> {
-
- //long st = System.nanoTime();
- for(ClusterUpdateOperation op : ops) {
- op.run();
- }
- s.release();
- // long duration = System.nanoTime()-st;
- // elapsed.addAndGet(duration);
- // double dur = 1e-9*duration;
- // if(dur > 0.05)
- // System.err.println("duration=" + dur + "s. " + ops.size());
- });
- }
- }
-
- s.acquire(acquireAmount);
+ runUpdates(updates);
+ runTasksIfEmpty();
/*
* Here we are actively processing updates from client.
clusters.streamLRU.acquireMutex();
try {
swapChunks();
- } catch (Throwable t) {
- throw new IllegalStateException(t);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ LOGGER.error("cluster chunk swapping failed", e);
} finally {
clusters.streamLRU.releaseMutex();
}
try {
swapCS();
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
clusters.csLRU.releaseMutex();
}
}
} catch (Throwable t) {
- t.printStackTrace();
+ LOGGER.error("FATAL: MainProgram died unexpectedly", t);
} finally {
deathBarrier.release();
}
+ }
+
+ @FunctionalInterface
+ static interface MainProgramRunnable {
+ void run() throws Exception;
+ default void error(Exception e) {
+ LOGGER.error("An error occured", e);
+ }
+ default void success() {}
+ }
+
+ private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+// long duration = System.nanoTime()-st;
+// elapsed.addAndGet(duration);
+// double dur = 1e-9*duration;
+// if(dur > 0.05)
+// System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);
}
/*
- * Mutex for streamLRU is assumed here
- *
+ * This shall be run when no updates are currently available.
+ */
+ private void runTasksIfEmpty() {
+ if(operationQueue.isEmpty()) {
+ List<MainProgramRunnable> todo = new ArrayList<>();
+ operationQueue.pumpTasks(todo);
+ for(MainProgramRunnable runnable : todo) {
+ try {
+ runnable.run();
+ runnable.success();
+ } catch (Exception e) {
+ runnable.error(e);
+ }
+ }
+ }
+ }
+
+ /*
+ * This gets called when an idle period has been detected
*/
- private void swapChunks() {
+ private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+ // Was this a time-out or a new stream request?
+ if(operationQueue.isEmpty()) {
+
+ /*
+ * We are idling here.
+ * Flush all caches gradually
+ */
+
+ // Write pending cs to disk
+ boolean written = clusters.csLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.csLRU.swapForced();
+ }
+ // Write pending chunks to disk
+ written = clusters.streamLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.streamLRU.swapForced();
+ }
+ // Write pending files to disk
+ written = clusters.fileLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.fileLRU.swapForced();
+ }
+ // Write pending clusters to disk
+ written = clusters.clusterLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.clusterLRU.swapForced();
+ }
+
+ client.tryMakeSnapshot();
- // Cache chunks during update operations
- boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- while(written) {
- written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
}
}
- private void swapCS() {
+
+ /*
+ * This schedules tasks to be run in MainProgram thread
+ * Called from other threads than MainProgram thread
+ *
+ */
+ void runIdle(MainProgramRunnable task) {
+ operationQueue.scheduleTask(task);
+ }
+
+ /*
+ * Mutex for streamLRU is assumed here
+ *
+ */
+ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
// Cache chunks during update operations
- boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- while(written) {
- written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- }
+ while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
+ }
+
+ private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
+ // Cache chunks during update operations
+ while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
}
- public synchronized void committed() {
+ /*
+ * Called by DB client write threads
+ */
+ void committed() {
- ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ ClusterStreamChunk last = operationQueue.commitLast();
if (!alive) {
- System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
-// return;
+ LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
}
- if(last != null) last.commit();
}
- public synchronized void schedule(ClusterUpdateOperation operation) {
- if (!alive) {
- System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
-// return;
+ /*
+ * Called by DB client write threads
+ */
+ void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
+
+ if (!alive) {
+ LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
}
+
clusters.streamLRU.acquireMutex();
try {
- ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
- if(last == null || last.isCommitted()) {
- String id = "" + currentChangeSetId + "-" + nextChunkId++;
- last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
- operations.add(last);
- }
-
- String chunkId = last.getKey();
- int chunkOffset = last.operations.size();
- operation.scheduled(chunkId + "." + chunkOffset);
-
- last.addOperation(operation);
-
+ operationQueue.scheduleUpdate(operation);
swapChunks();
- notifyAll();
-
+ } catch (IllegalAcornStateException e) {
+ throw e;
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
-
clusters.streamLRU.releaseMutex();
-
}
}
@Override
public void close() {
+
alive = false;
- synchronized (this) {
- notifyAll();
- }
+
+ // This will wake up the sleeping beauty
+ operationQueue.scheduleTask(() -> {});
+
try {
deathBarrier.acquire();
} catch (InterruptedException e) {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
clusterUpdateThreads[i] = null;
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
}
}
}
+ void assertMainProgramThread() {
+ assert(Thread.currentThread().equals(mainProgramThread));
+ }
+
+ void assertNoMainProgramThread() {
+ assert(!Thread.currentThread().equals(mainProgramThread));
+ }
+
}