+ @FunctionalInterface
+ static interface MainProgramRunnable {
+ void run() throws Exception;
+ default void error(Exception e) {
+ LOGGER.error("An error occured", e);
+ }
+ default void success() {}
+ }
+
+ private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+// long duration = System.nanoTime()-st;
+// elapsed.addAndGet(duration);
+// double dur = 1e-9*duration;
+// if(dur > 0.05)
+// System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);
+
+ }
+
+ /*
+ * This shall be run when no updates are currently available.
+ */
+ private void runTasksIfEmpty() {
+ if(operationQueue.isEmpty()) {
+ List<MainProgramRunnable> todo = new ArrayList<>();
+ operationQueue.pumpTasks(todo);
+ for(MainProgramRunnable runnable : todo) {
+ try {
+ runnable.run();
+ runnable.success();
+ } catch (Exception e) {
+ runnable.error(e);
+ }
+ }
+ }
+ }
+
+ /*
+ * This gets called when an idle period has been detected
+ */
+ private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+ // Was this a time-out or a new stream request?
+ if(operationQueue.isEmpty()) {
+
+ /*
+ * We are idling here.
+ * Flush all caches gradually
+ */
+
+ // Write pending cs to disk
+ boolean written = clusters.csLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.csLRU.swapForced();
+ }
+ // Write pending chunks to disk
+ written = clusters.streamLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.streamLRU.swapForced();
+ }
+ // Write pending files to disk
+ written = clusters.fileLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.fileLRU.swapForced();
+ }
+ // Write pending clusters to disk
+ written = clusters.clusterLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.clusterLRU.swapForced();
+ }
+
+ client.tryMakeSnapshot();
+
+ }
+
+ }
+
+
+ /*
+ * This schedules tasks to be run in MainProgram thread
+ * Called from other threads than MainProgram thread
+ *
+ */
+ void runIdle(MainProgramRunnable task) {
+ operationQueue.scheduleTask(task);
+ }
+