}
}
+ @SuppressWarnings("unchecked")
MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
this.client = client;
this.updateSchedules = new ArrayList[CLUSTER_THREADS];
for(int i=0;i<clusterUpdateThreads.length;i++) {
clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
- updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
+ updateSchedules[i] = new ArrayList<>();
}
}
try {
+ TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<>(clusterComparator);
+
main:
while(alive) {
- TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
-
+ if (!updates.isEmpty())
+ updates.clear();
operationQueue.pumpUpdates(updates);
if(updates.isEmpty()) {
try {
swapChunks();
} catch (AcornAccessVerificationException | IllegalAcornStateException e) {
- e.printStackTrace();
+ LOGGER.error("cluster chunk swapping failed", e);
} finally {
clusters.streamLRU.releaseMutex();
}
}
} catch (Throwable t) {
- t.printStackTrace();
+ LOGGER.error("FATAL: MainProgram died unexpectedly", t);
} finally {
deathBarrier.release();
}
for(int i=0;i<CLUSTER_THREADS;i++)
updateSchedules[i].clear();
+ if (updates.isEmpty())
+ return;
+
final Semaphore s = new Semaphore(0);
for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
clusterUpdateThreads[i] = null;
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
}
}
}