*******************************************************************************/
package org.simantics.acorn;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.backup.AcornBackupProvider;
+import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
import org.simantics.acorn.exception.AcornAccessVerificationException;
import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.internal.ClusterChange;
import org.simantics.acorn.lru.ClusterInfo;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.backup.BackupException;
import org.simantics.db.ClusterCreator;
import org.simantics.db.Database;
import org.simantics.db.ServiceLocator;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.DataContainer;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
import org.slf4j.Logger;
private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
public static final boolean DEBUG = false;
- public final ClusterManager clusters;
-
+ final ClusterManager clusters;
+
private TransactionManager transactionManager = new TransactionManager();
private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
private ServiceLocator locator;
private MainProgram mainProgram;
- static class ClientThreadFactory implements ThreadFactory {
-
+ private static class ClientThreadFactory implements ThreadFactory {
+
final String name;
final boolean daemon;
-
+
public ClientThreadFactory(String name, boolean daemon) {
this.name = name;
this.daemon = daemon;
}
-
+
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, name);
this.locator = locator;
this.clusters = new ClusterManager(dbFolder);
load();
- ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
cssi.setReadDirectory(clusters.lastSessionDirectory);
cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
return dbFolder;
}
- public void tryMakeSnapshot() throws IOException {
-
- if (isClosing || unexpectedClose)
- return;
-
+ /*
+ * This method schedules snapshotting.
+ * No lock and thread restrictions.
+ */
+ void tryMakeSnapshot() throws IOException {
+
+ if (isClosing || unexpectedClose)
+ return;
+
saver.execute(new Runnable() {
@Override
// First take a write transaction
tr = askWriteTransaction(-1);
// Then make sure that MainProgram is idling
- mainProgram.mutex.acquire();
- try {
- synchronized(mainProgram) {
- if(mainProgram.operations.isEmpty()) {
- makeSnapshot(false);
- } else {
- // MainProgram is becoming busy again - delay snapshotting
- return;
- }
- }
- } finally {
- mainProgram.mutex.release();
- }
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
} catch (IllegalAcornStateException | ProCoreException e) {
LOGGER.error("Snapshotting failed", e);
unexpectedClose = true;
- } catch (InterruptedException e) {
- LOGGER.error("Snapshotting interrupted", e);
+ } catch (SDBException e) {
+ LOGGER.error("Snapshotting failed", e);
+ unexpectedClose = true;
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
if (unexpectedClose) {
- LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
- try {
- support.close();
- } catch (DatabaseException e1) {
- LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
- }
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
+ }
}
} catch (ProCoreException e) {
LOGGER.error("Failed to end snapshotting write transaction", e);
}
});
}
-
- public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
- clusters.makeSnapshot(locator, fullSave);
- }
-
+
+ private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+ clusters.makeSnapshot(locator, fullSave);
+ }
+
+ @Override
public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
try {
return clusters.clone(uid, creator);
}
}
-// private void save() throws IOException {
-// clusters.save();
-// }
-
- public void load() throws IOException {
+ private void load() throws IOException {
clusters.load();
}
-
-// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
-// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
-// }
@Override
public Database getDatabase() {
private boolean closed = false;
private boolean isClosing = false;
private boolean unexpectedClose = false;
-
+
@Override
public void close() throws ProCoreException {
- LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
+ LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
if(!closed && !isClosing) {
- isClosing = true;
+ isClosing = true;
try {
- if (!unexpectedClose)
- makeSnapshot(true);
+
+ if (!unexpectedClose)
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
mainProgram.close();
clusters.shutdown();
executor.shutdown();
saver.shutdown();
+
boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
-
+
LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
try {
mainProgram = null;
executor = null;
saver = null;
-
+
} catch (IllegalAcornStateException | InterruptedException e) {
throw new ProCoreException(e);
+ } catch (SDBException e1) {
+ throw new ProCoreException(e1);
}
closed = true;
}
public boolean isClosed() throws ProCoreException {
return closed;
}
-
+
@Override
public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
clusters.state.headChangeSetId++;
long committedChangeSetId = changeSetId + 1;
try {
- clusters.commitChangeSet(committedChangeSetId, metadata);
-
- clusters.state.transactionId = transactionId;
-
- mainProgram.committed();
-
- TimeLogger.log("Accepted commit");
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+ clusters.state.transactionId = transactionId;
+ mainProgram.committed();
+ TimeLogger.log("Accepted commit");
} catch (IllegalAcornStateException e) {
throw new ProCoreException(e);
}
clusters.state.headChangeSetId++;
return clusters.state.headChangeSetId;
} catch (SDBException e) {
- LOGGER.error("Failed to undo cancelled transaction", e);
+ LOGGER.error("Failed to undo cancelled transaction", e);
throw new ProCoreException(e);
}
}
return transactionManager.askReadTransaction();
}
- enum TransactionState {
+ private enum TransactionState {
IDLE,WRITE,READ
}
-
- class TransactionRequest {
+
+ private class TransactionRequest {
public TransactionState state;
public Semaphore semaphore;
public TransactionRequest(TransactionState state, Semaphore semaphore) {
}
}
- class TransactionManager {
+ private class TransactionManager {
private TransactionState currentTransactionState = TransactionState.IDLE;
-
+
private int reads = 0;
-
- LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
-
- TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
-
+
+ private LinkedList<TransactionRequest> requests = new LinkedList<>();
+
+ private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
+
private synchronized Transaction makeTransaction(TransactionRequest req) {
-
+
final int csId = clusters.state.headChangeSetId;
final long trId = clusters.state.transactionId+1;
requestMap.put(trId, req);
return new Transaction() {
-
+
@Override
public long getTransactionId() {
return trId;
}
-
+
@Override
public long getHeadChangeSetId() {
return csId;
}
};
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askReadTransaction() throws ProCoreException {
-
+ private Transaction askReadTransaction() throws ProCoreException {
+
Semaphore semaphore = new Semaphore(0);
-
+
TransactionRequest req = queue(TransactionState.READ, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new ProCoreException(e);
}
-
+
return makeTransaction(req);
}
if(r.state == TransactionState.READ) reads++;
r.semaphore.release();
}
-
+
private synchronized void processRequests() {
-
+
while(true) {
if(requests.isEmpty()) return;
TransactionRequest req = requests.peek();
if(currentTransactionState == TransactionState.IDLE) {
-
+
// Accept anything while IDLE
currentTransactionState = req.state;
dispatch();
-
+
} else if (currentTransactionState == TransactionState.READ) {
-
+
if(req.state == currentTransactionState) {
// Allow other reads
dispatch();
} else {
-
+
// Wait
return;
-
+
}
-
+
} else if (currentTransactionState == TransactionState.WRITE) {
// Wait
return;
-
+
}
-
+
}
-
+
}
-
+
private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
- TransactionRequest req = new TransactionRequest(state, semaphore);
+ TransactionRequest req = new TransactionRequest(state, semaphore);
requests.addLast(req);
processRequests();
return req;
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askWriteTransaction() throws IllegalAcornStateException {
-
+ private Transaction askWriteTransaction() throws IllegalAcornStateException {
+
Semaphore semaphore = new Semaphore(0);
TransactionRequest req = queue(TransactionState.WRITE, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
mainProgram.startTransaction(clusters.state.headChangeSetId+1);
return makeTransaction(req);
}
-
- public synchronized long endTransaction(long transactionId) throws ProCoreException {
-
+
+ private synchronized long endTransaction(long transactionId) throws ProCoreException {
+
TransactionRequest req = requestMap.remove(transactionId);
if(req.state == TransactionState.WRITE) {
currentTransactionState = TransactionState.IDLE;
}
}
-
+
@Override
public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
try {
public ChangeSetData getChangeSetData(long minChangeSetId,
long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
throws ProCoreException {
-
+
new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
return null;
-
+
}
@Override
public long getFirstChangeSetId() {
return 0;
}
-
+
};
}
@Override
public Refresh getRefresh(long changeSetId) throws ProCoreException {
-
+
final ClusterIds ids = getClusterIds();
-
+
return new Refresh() {
@Override
public long[] getSecond() {
return ids.getSecond();
}
-
+
};
-
- }
- public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
- return clusters.getResourceFile(clusterUID, resourceIndex);
}
+// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
+// return clusters.getResourceFile(clusterUID, resourceIndex);
+// }
+
@Override
public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
try {
chunk.releaseMutex();
}
}
-
+
private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
for(int i=0;i<proc.entries.size();i++) {
-
+
Entry e = proc.entries.get(proc.entries.size() - 1 - i);
e.process(clusters, cs, clusterKey);
}
}
}
+ private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
+
+ Exception[] exception = { null };
+ Semaphore s = new Semaphore(0);
+
+ mainProgram.runIdle(new MainProgramRunnable() {
+
+ @Override
+ public void success() {
+ try {
+ runnable.success();
+ } finally {
+ s.release();
+ }
+ }
+
+ @Override
+ public void error(Exception e) {
+ exception[0] = e;
+ try {
+ runnable.error(e);
+ } finally {
+ s.release();
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ runnable.run();
+ }
+
+ });
+
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ throw new IllegalAcornStateException("Unhandled interruption.", e);
+ }
+
+ Exception e = exception[0];
+ if(e != null) {
+ if(e instanceof SDBException) throw (SDBException)e;
+ else if(e != null) throw new IllegalAcornStateException(e);
+ }
+
+ }
+
@Override
public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
- Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
+ synchronizeWithIdleMainProgram(new MainProgramRunnable() {
@Override
public void run() throws Exception {
try {
final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-
+
UndoClusterSupport support = new UndoClusterSupport(clusters);
-
+
final int changeSetId = clusters.state.headChangeSetId;
-
+
if(ClusterUpdateProcessorBase.DEBUG)
LOGGER.info(" === BEGIN UNDO ===");
-
+
for(int i=0;i<changeSetIds.length;i++) {
final long id = changeSetIds[changeSetIds.length-1-i];
ArrayList<String> ccss = clusters.getChanges(id);
-
+
for(int j=0;j<ccss.size();j++) {
String ccsid = ccss.get(ccss.size()-j-1);
try {
}
}
}
-
+
if(ClusterUpdateProcessorBase.DEBUG)
LOGGER.info(" === END UNDO ===");
-
+
for(int i=0;i<clusterChanges.size();i++) {
-
+
final int changeSetIndex = i;
-
+
final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-
+
final ClusterUID cuid = pair.first;
final byte[] data = pair.second;
-
+
onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
-
+
@Override
public long getChangeSetId() {
return changeSetId;
}
-
+
@Override
public int getChangeSetIndex() {
return 0;
}
-
+
@Override
public int getNumberOfClusterChangeSets() {
return clusterChanges.size();
}
-
+
@Override
public int getIndexOfClusterChangeSet() {
return changeSetIndex;
}
-
+
@Override
public byte[] getClusterId() {
return cuid.asBytes();
}
-
+
@Override
public boolean getNewCluster() {
return false;
}
-
+
@Override
public byte[] getData() {
return data;
}
-
+
});
}
} catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
}
- @Override
- public void done() {
-
- }
-
});
- if(exception instanceof SDBException) throw (SDBException)exception;
- else if(exception != null) throw new IllegalAcornStateException(exception);
-
return false;
-
+
}
-
- public ServiceLocator getServiceLocator() {
+
+ ServiceLocator getServiceLocator() {
return locator;
}
public boolean rolledback() {
return clusters.rolledback();
}
-
- public void purge() throws IllegalAcornStateException {
+
+ private void purge() throws IllegalAcornStateException {
clusters.purge(locator);
}
public void purgeDatabase() {
-
+
if (isClosing || unexpectedClose)
return;
-
+
saver.execute(new Runnable() {
@Override
// First take a write transaction
tr = askWriteTransaction(-1);
// Then make sure that MainProgram is idling
- mainProgram.mutex.acquire();
- try {
- synchronized(mainProgram) {
- if(mainProgram.operations.isEmpty()) {
- purge();
- } else {
- // MainProgram is becoming busy again - delay snapshotting
- return;
- }
- }
- } finally {
- mainProgram.mutex.release();
- }
+ synchronizeWithIdleMainProgram(() -> purge());
} catch (IllegalAcornStateException | ProCoreException e) {
- LOGGER.error("Purge failed", e);
+ LOGGER.error("Purge failed", e);
+ unexpectedClose = true;
+ } catch (SDBException e) {
+ LOGGER.error("Purge failed", e);
unexpectedClose = true;
- } catch (InterruptedException e) {
- LOGGER.error("Purge interrupted", e);
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
if (unexpectedClose) {
- LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
- try {
- support.close();
- } catch (DatabaseException e1) {
- LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
- }
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
+ }
}
} catch (ProCoreException e) {
LOGGER.error("Failed to end purge write transaction", e);
}
}
});
-
+
}
-
+
public long getTailChangeSetId() {
return clusters.getTailChangeSetId();
}
-
+
+ public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
+
+ makeSnapshot(true);
+
+ Path dbDir = getDbFolder();
+ int newestFolder = clusters.mainState.headDir - 1;
+ int latestFolder = -2;
+ Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
+ if (Files.exists(AcornMetadataFile)) {
+ try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
+ latestFolder = Integer.parseInt( br.readLine() );
+ }
+ }
+
+ AcornBackupRunnable r = new AcornBackupRunnable(
+ lock, targetPath, revision, dbDir, latestFolder, newestFolder);
+ new Thread(r, "Acorn backup thread").start();
+ return r;
+
+ }
+
}
package org.simantics.acorn;
import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class MainProgram implements Runnable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
-
+
private static final int CLUSTER_THREADS = 4;
private static final int CHUNK_CACHE_SIZE = 100;
private final GraphClientImpl2 client;
- private final ClusterManager clusters;
private final ExecutorService[] clusterUpdateThreads;
private final List<ClusterUpdateOperation>[] updateSchedules;
-
- private int residentOperationBytes = 0;
- private long currentChangeSetId = -1;
- private int nextChunkId = 0;
+
+ private Thread mainProgramThread;
+
private boolean alive = true;
private Semaphore deathBarrier = new Semaphore(0);
- final Semaphore mutex = new Semaphore(1);
- final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+ final ClusterManager clusters;
+
+ private final OperationQueue operationQueue = new OperationQueue(this);
static class ClusterThreadFactory implements ThreadFactory {
}
}
- public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+ MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
this.client = client;
this.clusters = clusters;
}
}
- public void startTransaction(long id) {
- currentChangeSetId = id;
- nextChunkId = 0;
+ void startTransaction(long id) {
+ operationQueue.startTransaction(id);
}
private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
@Override
public void run() {
+
+ mainProgramThread = Thread.currentThread();
+
try {
- mutex.acquire();
main:
while(alive) {
TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
- synchronized(MainProgram.this) {
-
- while(!operations.isEmpty() && updates.size() < 100) {
+ operationQueue.pumpUpdates(updates);
- ClusterStreamChunk chunk = operations.pollFirst();
+ if(updates.isEmpty()) {
- for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
- ClusterUpdateOperation o = chunk.operations.get(i);
- ClusterUID uid = o.uid;
- List<ClusterUpdateOperation> ops = updates.get(uid);
- if(ops == null) {
- ops = new ArrayList<ClusterUpdateOperation>();
- updates.put(uid, ops);
- }
- ops.add(o);
- }
+ long start = System.nanoTime();
- chunk.nextToProcess = chunk.operations.size();
+ operationQueue.waitFor();
- if(!chunk.isCommitted()) {
- assert(operations.isEmpty());
- operations.add(chunk);
- break;
- }
+ if (!alive)
+ break main;
- }
-
- if(updates.isEmpty()) {
- try {
- long start = System.nanoTime();
- mutex.release();
- MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed
- mutex.acquire();
- if (!alive)
- break main;
- long duration = System.nanoTime()-start;
- if(duration > 4000000000L) {
-
- // Was this a time-out or a new stream request?
- if(operations.isEmpty()) {
-
- /*
- * We are idling here.
- * Flush all caches gradually
- */
-
- // Write pending cs to disk
- boolean written = clusters.csLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.csLRU.swapForced();
- }
- // Write pending chunks to disk
- written = clusters.streamLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.streamLRU.swapForced();
- }
- // Write pending files to disk
- written = clusters.fileLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.fileLRU.swapForced();
- }
- // Write pending clusters to disk
- written = clusters.clusterLRU.swapForced();
- while(written) {
- if(!updates.isEmpty()) break;
- written = clusters.clusterLRU.swapForced();
- }
-
- client.tryMakeSnapshot();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ long duration = System.nanoTime()-start;
+ if(duration > 4000000000L) {
+ checkIdle();
}
}
// long sss = System.nanoTime();
- for(int i=0;i<CLUSTER_THREADS;i++)
- updateSchedules[i].clear();
-
- final Semaphore s = new Semaphore(0);
-
- for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
- ClusterUID key = entry.getKey();
- int hash = key.hashCode() & (clusterUpdateThreads.length-1);
- updateSchedules[hash].addAll(entry.getValue());
- }
-
- // final AtomicLong elapsed = new AtomicLong(0);
- int acquireAmount = 0;
- for(int i=0;i<CLUSTER_THREADS;i++) {
- final List<ClusterUpdateOperation> ops = updateSchedules[i];
- if (!ops.isEmpty()) {
- acquireAmount++;
- clusterUpdateThreads[i].submit(new Callable<Object>() {
-
- @Override
- public Object call() throws Exception {
- //long st = System.nanoTime();
- try {
- for(ClusterUpdateOperation op : ops) {
- op.run();
- }
- } finally {
- s.release();
- }
- return null;
-
- // long duration = System.nanoTime()-st;
- // elapsed.addAndGet(duration);
- // double dur = 1e-9*duration;
- // if(dur > 0.05)
- // System.err.println("duration=" + dur + "s. " + ops.size());
- }
- });
- }
- }
-
- s.acquire(acquireAmount);
+ runUpdates(updates);
+ runTasksIfEmpty();
/*
* Here we are actively processing updates from client.
deathBarrier.release();
}
}
-
+
+ @FunctionalInterface
static interface MainProgramRunnable {
-
- public void run() throws Exception;
- public void done();
-
+ void run() throws Exception;
+ default void error(Exception e) {
+ LOGGER.error("An error occured", e);
+ }
+ default void success() {}
}
- public Exception runIdle(MainProgramRunnable runnable) {
- try {
- long startTime = System.currentTimeMillis();
- while (true) {
- boolean hasMutex = false;
+ private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+// long duration = System.nanoTime()-st;
+// elapsed.addAndGet(duration);
+// double dur = 1e-9*duration;
+// if(dur > 0.05)
+// System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);
+
+ }
+
+ /*
+ * This shall be run when no updates are currently available.
+ */
+ private void runTasksIfEmpty() {
+ if(operationQueue.isEmpty()) {
+ List<MainProgramRunnable> todo = new ArrayList<>();
+ operationQueue.pumpTasks(todo);
+ for(MainProgramRunnable runnable : todo) {
try {
- synchronized (MainProgram.this) {
- if (hasMutex = mutex.tryAcquire()) {
- if (operations.isEmpty()) {
- runnable.run();
- return null;
- }
- }
- }
- long endTime = System.currentTimeMillis();
- if ((endTime - startTime) > 100) {
- startTime = endTime;
- LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
- }
+ runnable.run();
+ runnable.success();
} catch (Exception e) {
- return e;
- } finally {
- if (hasMutex)
- mutex.release();
+ runnable.error(e);
}
}
- } finally {
- runnable.done();
}
}
-
+
+ /*
+ * This gets called when an idle period has been detected
+ */
+ private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+ // Was this a time-out or a new stream request?
+ if(operationQueue.isEmpty()) {
+
+ /*
+ * We are idling here.
+ * Flush all caches gradually
+ */
+
+ // Write pending cs to disk
+ boolean written = clusters.csLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.csLRU.swapForced();
+ }
+ // Write pending chunks to disk
+ written = clusters.streamLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.streamLRU.swapForced();
+ }
+ // Write pending files to disk
+ written = clusters.fileLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.fileLRU.swapForced();
+ }
+ // Write pending clusters to disk
+ written = clusters.clusterLRU.swapForced();
+ while(written) {
+ if(!operationQueue.isEmpty()) break;
+ written = clusters.clusterLRU.swapForced();
+ }
+
+ client.tryMakeSnapshot();
+
+ }
+
+ }
+
+
+ /*
+ * This schedules tasks to be run in MainProgram thread
+ * Called from other threads than MainProgram thread
+ *
+ */
+ void runIdle(MainProgramRunnable task) {
+ operationQueue.scheduleTask(task);
+ }
+
/*
* Mutex for streamLRU is assumed here
- *
+ *
*/
private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
}
- public synchronized void committed() {
+ /*
+ * Called by DB client write threads
+ */
+ void committed() {
- ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ ClusterStreamChunk last = operationQueue.commitLast();
if (!alive) {
LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
-// return;
}
- if(last != null) {
- last.commit();
- notifyAll();
- }
}
- public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
- if (!alive) {
+ /*
+ * Called by DB client write threads
+ */
+ void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
+
+ if (!alive) {
LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
-// return;
}
+
clusters.streamLRU.acquireMutex();
try {
- ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
- if(last == null || last.isCommitted()) {
- String id = "" + currentChangeSetId + "-" + nextChunkId++;
- last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
- operations.add(last);
- }
-
- String chunkId = last.getKey();
- int chunkOffset = last.operations.size();
- operation.scheduled(chunkId + "." + chunkOffset);
-
- last.addOperation(operation);
-
+ operationQueue.scheduleUpdate(operation);
swapChunks();
- notifyAll();
} catch (IllegalAcornStateException e) {
throw e;
} catch (Throwable t) {
} finally {
clusters.streamLRU.releaseMutex();
}
+
}
@Override
public void close() {
+
alive = false;
- synchronized (this) {
- notifyAll();
- }
+
+ // This will wake up the sleeping beauty
+ operationQueue.scheduleTask(() -> {});
+
try {
deathBarrier.acquire();
} catch (InterruptedException e) {
}
}
+ void assertMainProgramThread() {
+ assert(Thread.currentThread().equals(mainProgramThread));
+ }
+
+ void assertNoMainProgramThread() {
+ assert(!Thread.currentThread().equals(mainProgramThread));
+ }
+
}
--- /dev/null
+package org.simantics.acorn;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Provides work for MainProgram thread
+ * -cluster stream updates from client
+ * -tasks from other threads
+ *
+ * Synchronized via monitor.
+ *
+ */
+class OperationQueue {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
+
+ private final MainProgram mainProgram;
+
+ private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+ private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
+
+ private long currentChangeSetId = -1;
+ private int nextChunkId = 0;
+
+ OperationQueue(MainProgram mainProgram) {
+ this.mainProgram = mainProgram;
+ }
+
+ /*
+ * Called by other Acorn threads than MainProgram thread
+ */
+ synchronized void scheduleTask(MainProgramRunnable task) {
+
+ mainProgram.assertNoMainProgramThread();
+
+ tasks.add(task);
+
+ // The only thread waiting for this is MainProgram thread
+ notify();
+
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized void startTransaction(long id) {
+ currentChangeSetId = id;
+ nextChunkId = 0;
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized ClusterStreamChunk commitLast() {
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last != null) {
+ last.commit();
+ notifyAll();
+ }
+ return last;
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last == null || last.isCommitted()) {
+ String id = "" + currentChangeSetId + "-" + nextChunkId++;
+ last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
+ operations.add(last);
+ }
+
+ String chunkId = last.getKey();
+ int chunkOffset = last.operations.size();
+ operation.scheduled(chunkId + "." + chunkOffset);
+
+ last.addOperation(operation);
+
+ // The only thread waiting for this is MainProgram thread
+ notify();
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized void pumpTasks(List<MainProgramRunnable> todo) {
+
+ mainProgram.assertMainProgramThread();
+
+ todo.addAll(tasks);
+ tasks.clear();
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
+
+ mainProgram.assertMainProgramThread();
+
+ while(!operations.isEmpty() && updates.size() < 100) {
+
+ ClusterStreamChunk chunk = operations.pollFirst();
+
+ for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+ ClusterUpdateOperation o = chunk.operations.get(i);
+ ClusterUID uid = o.uid;
+ List<ClusterUpdateOperation> ops = updates.get(uid);
+ if(ops == null) {
+ ops = new ArrayList<ClusterUpdateOperation>();
+ updates.put(uid, ops);
+ }
+ ops.add(o);
+ }
+
+ chunk.nextToProcess = chunk.operations.size();
+
+ if(!chunk.isCommitted()) {
+ assert(operations.isEmpty());
+ operations.add(chunk);
+ break;
+ }
+
+ }
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized boolean isEmpty() {
+
+ mainProgram.assertMainProgramThread();
+
+ return operations.isEmpty();
+
+ }
+
+ /*
+ * Wake up when new operations are scheduled or the last operation is committed
+ * Called by MainProgram thread
+ */
+ synchronized void waitFor() {
+
+ mainProgram.assertMainProgramThread();
+
+ // One last check within the monitor
+ if(!operations.isEmpty() || !tasks.isEmpty()) return;
+
+ try {
+ wait(5000);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interruption", e);
+ }
+
+ }
+
+}