import org.simantics.acorn.lru.ClusterUpdateOperation;
import org.simantics.db.service.ClusterUID;
import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MainProgram implements Runnable, Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
+
private static final int CLUSTER_THREADS = 4;
private static final int CHUNK_CACHE_SIZE = 100;
try {
long start = System.nanoTime();
mutex.release();
- MainProgram.this.wait(5000);
+ MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed
mutex.acquire();
if (!alive)
break main;
deathBarrier.release();
}
}
+
+ static interface MainProgramRunnable {
+
+ public void run() throws Exception;
+ public void done();
+
+ }
+ public Exception runIdle(MainProgramRunnable runnable) {
+ try {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ boolean hasMutex = false;
+ try {
+ synchronized (MainProgram.this) {
+ if (hasMutex = mutex.tryAcquire()) {
+ if (operations.isEmpty()) {
+ runnable.run();
+ return null;
+ }
+ }
+ }
+ long endTime = System.currentTimeMillis();
+ if ((endTime - startTime) > 100) {
+ startTime = endTime;
+ LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
+ }
+ } catch (Exception e) {
+ return e;
+ } finally {
+ if (hasMutex)
+ mutex.release();
+ }
+ }
+ } finally {
+ runnable.done();
+ }
+ }
+
/*
* Mutex for streamLRU is assumed here
*
private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
// Cache chunks during update operations
- boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- while(written) {
- written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- }
+ while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
}
private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
// Cache chunks during update operations
- boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- while(written) {
- written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
- }
+ while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
}
public synchronized void committed() {
ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
if (!alive) {
- System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
+ LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
// return;
}
- if(last != null) last.commit();
+ if(last != null) {
+ last.commit();
+ notifyAll();
+ }
}
public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
if (!alive) {
- System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
+ LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
// return;
}
clusters.streamLRU.acquireMutex();