*******************************************************************************/
package org.simantics.acorn;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.backup.AcornBackupProvider;
+import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
import org.simantics.acorn.exception.AcornAccessVerificationException;
import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.internal.ClusterChange;
import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
import org.simantics.acorn.lru.ClusterInfo;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
-import org.simantics.acorn.lru.ClusterChangeSet.Entry;
+import org.simantics.backup.BackupException;
import org.simantics.db.ClusterCreator;
import org.simantics.db.Database;
import org.simantics.db.ServiceLocator;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.SDBException;
import org.simantics.db.server.ProCoreException;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.DataContainer;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TLongObjectHashMap;
public class GraphClientImpl2 implements Database.Session {
-
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
public static final boolean DEBUG = false;
- public final ClusterManager clusters;
-
+ final ClusterManager clusters;
+
private TransactionManager transactionManager = new TransactionManager();
private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
private Path dbFolder;
private final Database database;
private ServiceLocator locator;
+ private FileCache fileCache;
private MainProgram mainProgram;
- static class ClientThreadFactory implements ThreadFactory {
-
+ private static class ClientThreadFactory implements ThreadFactory {
+
final String name;
final boolean daemon;
-
+
public ClientThreadFactory(String name, boolean daemon) {
this.name = name;
this.daemon = daemon;
}
-
+
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, name);
this.database = database;
this.dbFolder = dbFolder;
this.locator = locator;
- this.clusters = new ClusterManager(dbFolder);
+ this.fileCache = new FileCache();
+ // This disposes the cache when the session is shut down
+ locator.registerService(FileCache.class, fileCache);
+ this.clusters = new ClusterManager(dbFolder, fileCache);
load();
- ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
cssi.setReadDirectory(clusters.lastSessionDirectory);
cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
return dbFolder;
}
- public void tryMakeSnapshot() throws IOException {
-
- if (isClosing || unexpectedClose)
- return;
-
+ /*
+ * This method schedules snapshotting.
+ * No lock and thread restrictions.
+ */
+ void tryMakeSnapshot() throws IOException {
+
+ if (isClosing || unexpectedClose)
+ return;
+
saver.execute(new Runnable() {
@Override
// First take a write transaction
tr = askWriteTransaction(-1);
// Then make sure that MainProgram is idling
- mainProgram.mutex.acquire();
- try {
- synchronized(mainProgram) {
- if(mainProgram.operations.isEmpty()) {
- makeSnapshot(false);
- } else {
- // MainProgram is becoming busy again - delay snapshotting
- return;
- }
- }
- } finally {
- mainProgram.mutex.release();
- }
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
} catch (IllegalAcornStateException | ProCoreException e) {
- Logger.defaultLogError("Snapshotting failed", e);
+ LOGGER.error("Snapshotting failed", e);
+ unexpectedClose = true;
+ } catch (SDBException e) {
+ LOGGER.error("Snapshotting failed", e);
unexpectedClose = true;
- } catch (InterruptedException e) {
- Logger.defaultLogError("Snapshotting interrupted", e);
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
if (unexpectedClose) {
- LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
- try {
- support.close();
- } catch (DatabaseException e1) {
- Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
- }
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
+ }
}
} catch (ProCoreException e) {
- Logger.defaultLogError("Failed to end snapshotting write transaction", e);
+ LOGGER.error("Failed to end snapshotting write transaction", e);
}
}
}
});
}
-
- public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
- clusters.makeSnapshot(locator, fullSave);
- }
-
+
+ private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+ clusters.makeSnapshot(locator, fullSave);
+ }
+
+ @Override
public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
try {
return clusters.clone(uid, creator);
}
}
-// private void save() throws IOException {
-// clusters.save();
-// }
-
- public void load() throws IOException {
+ private void load() throws IOException {
clusters.load();
}
-
-// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
-// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
-// }
@Override
public Database getDatabase() {
private boolean closed = false;
private boolean isClosing = false;
private boolean unexpectedClose = false;
-
+
@Override
public void close() throws ProCoreException {
- System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+ LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
if(!closed && !isClosing) {
- isClosing = true;
+ isClosing = true;
try {
- if (!unexpectedClose)
- makeSnapshot(true);
-
+
+ if (!unexpectedClose)
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
+
mainProgram.close();
clusters.shutdown();
executor.shutdown();
saver.shutdown();
+
boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
-
- System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
-
+
+ LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
+
+ try {
+ clusters.mainState.save(dbFolder);
+ } catch (IOException e) {
+ LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
+ }
+
mainProgram = null;
executor = null;
saver = null;
-
+
} catch (IllegalAcornStateException | InterruptedException e) {
throw new ProCoreException(e);
+ } catch (SDBException e1) {
+ throw new ProCoreException(e1);
}
closed = true;
}
public boolean isClosed() throws ProCoreException {
return closed;
}
-
+
@Override
public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
clusters.state.headChangeSetId++;
long committedChangeSetId = changeSetId + 1;
try {
- clusters.commitChangeSet(committedChangeSetId, metadata);
-
- clusters.state.transactionId = transactionId;
-
- mainProgram.committed();
-
- TimeLogger.log("Accepted commit");
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+ clusters.state.transactionId = transactionId;
+ mainProgram.committed();
+ TimeLogger.log("Accepted commit");
} catch (IllegalAcornStateException e) {
throw new ProCoreException(e);
}
clusters.state.headChangeSetId++;
return clusters.state.headChangeSetId;
} catch (SDBException e) {
- Logger.defaultLogError("Failed to undo cancelled transaction", e);
+ LOGGER.error("Failed to undo cancelled transaction", e);
throw new ProCoreException(e);
}
}
return transactionManager.askReadTransaction();
}
- enum TransactionState {
+ private enum TransactionState {
IDLE,WRITE,READ
}
-
- class TransactionRequest {
+
+ private class TransactionRequest {
public TransactionState state;
public Semaphore semaphore;
public TransactionRequest(TransactionState state, Semaphore semaphore) {
}
}
- class TransactionManager {
+ private class TransactionManager {
private TransactionState currentTransactionState = TransactionState.IDLE;
-
+
private int reads = 0;
-
- LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
-
- TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
-
+
+ private LinkedList<TransactionRequest> requests = new LinkedList<>();
+
+ private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
+
private synchronized Transaction makeTransaction(TransactionRequest req) {
-
+
final int csId = clusters.state.headChangeSetId;
final long trId = clusters.state.transactionId+1;
requestMap.put(trId, req);
return new Transaction() {
-
+
@Override
public long getTransactionId() {
return trId;
}
-
+
@Override
public long getHeadChangeSetId() {
return csId;
}
};
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askReadTransaction() throws ProCoreException {
-
+ private Transaction askReadTransaction() throws ProCoreException {
+
Semaphore semaphore = new Semaphore(0);
-
+
TransactionRequest req = queue(TransactionState.READ, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new ProCoreException(e);
}
-
+
return makeTransaction(req);
}
if(r.state == TransactionState.READ) reads++;
r.semaphore.release();
}
-
+
private synchronized void processRequests() {
-
+
while(true) {
if(requests.isEmpty()) return;
TransactionRequest req = requests.peek();
if(currentTransactionState == TransactionState.IDLE) {
-
+
// Accept anything while IDLE
currentTransactionState = req.state;
dispatch();
-
+
} else if (currentTransactionState == TransactionState.READ) {
-
+
if(req.state == currentTransactionState) {
// Allow other reads
dispatch();
} else {
-
+
// Wait
return;
-
+
}
-
+
} else if (currentTransactionState == TransactionState.WRITE) {
// Wait
return;
-
+
}
-
+
}
-
+
}
-
+
private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
- TransactionRequest req = new TransactionRequest(state, semaphore);
+ TransactionRequest req = new TransactionRequest(state, semaphore);
requests.addLast(req);
processRequests();
return req;
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askWriteTransaction() throws IllegalAcornStateException {
-
+ private Transaction askWriteTransaction() throws IllegalAcornStateException {
+
Semaphore semaphore = new Semaphore(0);
TransactionRequest req = queue(TransactionState.WRITE, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
mainProgram.startTransaction(clusters.state.headChangeSetId+1);
return makeTransaction(req);
}
-
- public synchronized long endTransaction(long transactionId) throws ProCoreException {
-
+
+ private synchronized long endTransaction(long transactionId) throws ProCoreException {
+
TransactionRequest req = requestMap.remove(transactionId);
if(req.state == TransactionState.WRITE) {
currentTransactionState = TransactionState.IDLE;
}
}
-
+
@Override
public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
try {
public ChangeSetData getChangeSetData(long minChangeSetId,
long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
throws ProCoreException {
-
+
new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
return null;
-
+
}
@Override
public long getFirstChangeSetId() {
return 0;
}
-
+
};
}
@Override
public Refresh getRefresh(long changeSetId) throws ProCoreException {
-
+
final ClusterIds ids = getClusterIds();
-
+
return new Refresh() {
@Override
public long[] getSecond() {
return ids.getSecond();
}
-
+
};
-
- }
- public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
- return clusters.getResourceFile(clusterUID, resourceIndex);
}
+// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
+// return clusters.getResourceFile(clusterUID, resourceIndex);
+// }
+
@Override
public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
try {
chunk.releaseMutex();
}
}
-
+
private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
for(int i=0;i<proc.entries.size();i++) {
-
+
Entry e = proc.entries.get(proc.entries.size() - 1 - i);
e.process(clusters, cs, clusterKey);
}
}
}
+ private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
+
+ Exception[] exception = { null };
+ Semaphore s = new Semaphore(0);
+
+ mainProgram.runIdle(new MainProgramRunnable() {
+
+ @Override
+ public void success() {
+ try {
+ runnable.success();
+ } finally {
+ s.release();
+ }
+ }
+
+ @Override
+ public void error(Exception e) {
+ exception[0] = e;
+ try {
+ runnable.error(e);
+ } finally {
+ s.release();
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ runnable.run();
+ }
+
+ });
+
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ throw new IllegalAcornStateException("Unhandled interruption.", e);
+ }
+
+ Exception e = exception[0];
+ if(e != null) {
+ if(e instanceof SDBException) throw (SDBException)e;
+ else if(e != null) throw new IllegalAcornStateException(e);
+ }
+
+ }
+
@Override
public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
- Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
+ synchronizeWithIdleMainProgram(new MainProgramRunnable() {
@Override
public void run() throws Exception {
try {
final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-
+
UndoClusterSupport support = new UndoClusterSupport(clusters);
-
+
final int changeSetId = clusters.state.headChangeSetId;
-
+
if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === BEGIN UNDO ===");
-
+ LOGGER.info(" === BEGIN UNDO ===");
+
for(int i=0;i<changeSetIds.length;i++) {
final long id = changeSetIds[changeSetIds.length-1-i];
ArrayList<String> ccss = clusters.getChanges(id);
-
+
for(int j=0;j<ccss.size();j++) {
String ccsid = ccss.get(ccss.size()-j-1);
try {
if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println("performUndo " + ccsid);
+ LOGGER.info("performUndo " + ccsid);
performUndo(ccsid, clusterChanges, support);
} catch (DatabaseException e) {
e.printStackTrace();
}
}
}
-
+
if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === END UNDO ===");
-
+ LOGGER.info(" === END UNDO ===");
+
for(int i=0;i<clusterChanges.size();i++) {
-
+
final int changeSetIndex = i;
-
+
final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-
+
final ClusterUID cuid = pair.first;
final byte[] data = pair.second;
-
+
onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
-
+
@Override
public long getChangeSetId() {
return changeSetId;
}
-
+
@Override
public int getChangeSetIndex() {
return 0;
}
-
+
@Override
public int getNumberOfClusterChangeSets() {
return clusterChanges.size();
}
-
+
@Override
public int getIndexOfClusterChangeSet() {
return changeSetIndex;
}
-
+
@Override
public byte[] getClusterId() {
return cuid.asBytes();
}
-
+
@Override
public boolean getNewCluster() {
return false;
}
-
+
@Override
public byte[] getData() {
return data;
}
-
+
});
}
} catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
}
- @Override
- public void done() {
-
- }
-
});
- if(exception instanceof SDBException) throw (SDBException)exception;
- else if(exception != null) throw new IllegalAcornStateException(exception);
-
return false;
-
+
}
-
- public ServiceLocator getServiceLocator() {
+
+ ServiceLocator getServiceLocator() {
return locator;
}
public boolean rolledback() {
return clusters.rolledback();
}
-
- public void purge() throws IllegalAcornStateException {
+
+ private void purge() throws IllegalAcornStateException {
clusters.purge(locator);
}
public void purgeDatabase() {
-
+
if (isClosing || unexpectedClose)
return;
-
+
saver.execute(new Runnable() {
@Override
// First take a write transaction
tr = askWriteTransaction(-1);
// Then make sure that MainProgram is idling
- mainProgram.mutex.acquire();
- try {
- synchronized(mainProgram) {
- if(mainProgram.operations.isEmpty()) {
- purge();
- } else {
- // MainProgram is becoming busy again - delay snapshotting
- return;
- }
- }
- } finally {
- mainProgram.mutex.release();
- }
+ synchronizeWithIdleMainProgram(() -> purge());
} catch (IllegalAcornStateException | ProCoreException e) {
- Logger.defaultLogError("Purge failed", e);
+ LOGGER.error("Purge failed", e);
+ unexpectedClose = true;
+ } catch (SDBException e) {
+ LOGGER.error("Purge failed", e);
unexpectedClose = true;
- } catch (InterruptedException e) {
- Logger.defaultLogError("Purge interrupted", e);
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
if (unexpectedClose) {
- LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
- try {
- support.close();
- } catch (DatabaseException e1) {
- Logger.defaultLogError("Failed to close database as a safety measure due to failed purge", e1);
- }
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
+ }
}
} catch (ProCoreException e) {
- Logger.defaultLogError("Failed to end purge write transaction", e);
+ LOGGER.error("Failed to end purge write transaction", e);
}
}
}
});
-
+
}
-
+
public long getTailChangeSetId() {
return clusters.getTailChangeSetId();
}
-
+
+ public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
+
+ makeSnapshot(true);
+
+ Path dbDir = getDbFolder();
+ int newestFolder = clusters.mainState.headDir - 1;
+ int latestFolder = -2;
+ Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
+ if (Files.exists(AcornMetadataFile)) {
+ try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
+ latestFolder = Integer.parseInt( br.readLine() );
+ }
+ }
+
+ AcornBackupRunnable r = new AcornBackupRunnable(
+ lock, targetPath, revision, dbDir, latestFolder, newestFolder);
+ new Thread(r, "Acorn backup thread").start();
+ return r;
+
+ }
+
}