+ }
+
+ @FunctionalInterface
+ static interface MainProgramRunnable {
+ void run() throws Exception;
+ default void error(Exception e) {
+ LOGGER.error("An error occured", e);
+ }
+ default void success() {}
+ }
+
+ private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+// long duration = System.nanoTime()-st;
+// elapsed.addAndGet(duration);
+// double dur = 1e-9*duration;
+// if(dur > 0.05)
+// System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);