import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
import org.simantics.db.service.ClusterUID;
} catch (InterruptedException e) {
e.printStackTrace();
}
-
}
-
}
// long sss = System.nanoTime();
final List<ClusterUpdateOperation> ops = updateSchedules[i];
if (!ops.isEmpty()) {
acquireAmount++;
- clusterUpdateThreads[i].execute(() -> {
-
- //long st = System.nanoTime();
- for(ClusterUpdateOperation op : ops) {
- op.run();
- }
- s.release();
- // long duration = System.nanoTime()-st;
- // elapsed.addAndGet(duration);
- // double dur = 1e-9*duration;
- // if(dur > 0.05)
- // System.err.println("duration=" + dur + "s. " + ops.size());
- });
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+ // long duration = System.nanoTime()-st;
+ // elapsed.addAndGet(duration);
+ // double dur = 1e-9*duration;
+ // if(dur > 0.05)
+ // System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
}
}
clusters.streamLRU.acquireMutex();
try {
swapChunks();
- } catch (Throwable t) {
- throw new IllegalStateException(t);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ e.printStackTrace();
} finally {
clusters.streamLRU.releaseMutex();
}
try {
swapCS();
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
clusters.csLRU.releaseMutex();
}
} finally {
deathBarrier.release();
}
-
+ }
+
+ static interface MainProgramRunnable {
+
+ public void run() throws Exception;
+ public void done();
+
}
+ public Exception runIdle(MainProgramRunnable runnable) {
+ try {
+ mutex.acquire();
+ runnable.run();
+ return null;
+ } catch (Exception e) {
+ return e;
+ } finally {
+ runnable.done();
+ mutex.release();
+ }
+ }
+
/*
* Mutex for streamLRU is assumed here
*
*/
- private void swapChunks() {
+ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
// Cache chunks during update operations
boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
while(written) {
written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
}
-
}
- private void swapCS() {
+ private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
// Cache chunks during update operations
boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
while(written) {
written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
}
-
}
public synchronized void committed() {
}
- public synchronized void schedule(ClusterUpdateOperation operation) {
+ public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
if (!alive) {
System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
// return;
swapChunks();
notifyAll();
-
+ } catch (IllegalAcornStateException e) {
+ throw e;
} catch (Throwable t) {
- throw new IllegalStateException(t);
+ throw new IllegalAcornStateException(t);
} finally {
-
clusters.streamLRU.releaseMutex();
-
}
-
}
@Override