- if(!chunk.isCommitted()) {
- assert(operations.isEmpty());
- operations.add(chunk);
- break;
- }
-
- }
-
- if(updates.isEmpty()) {
- try {
- long start = System.nanoTime();
- mutex.release();
- MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed
- mutex.acquire();
- if (!alive)
- break main;
- long duration = System.nanoTime()-start;
- if(duration > 4000000000L) {
-
- // Was this a time-out or a new stream request?
- if(operations.isEmpty()) {
-
- /*
- * We are idling here.
- * Flush all caches gradually
- */
-
- // Write pending cs to disk
- boolean written = clusters.csLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.csLRU.swapForced();
- }
- // Write pending chunks to disk
- written = clusters.streamLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.streamLRU.swapForced();
- }
- // Write pending files to disk
- written = clusters.fileLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.fileLRU.swapForced();
- }
- // Write pending clusters to disk
- written = clusters.clusterLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.clusterLRU.swapForced();
- }
-
- client.tryMakeSnapshot();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }