*******************************************************************************/
package org.simantics.acorn;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.backup.AcornBackupProvider;
+import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.internal.ClusterChange;
import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
import org.simantics.acorn.lru.ClusterInfo;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
-import org.simantics.acorn.lru.ClusterChangeSet.Entry;
+import org.simantics.backup.BackupException;
import org.simantics.db.ClusterCreator;
import org.simantics.db.Database;
import org.simantics.db.ServiceLocator;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.SDBException;
import org.simantics.db.server.ProCoreException;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.DataContainer;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TLongObjectHashMap;
public class GraphClientImpl2 implements Database.Session {
-
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
public static final boolean DEBUG = false;
- public final ClusterManager clusters;
-
+ final ClusterManager clusters;
+
private TransactionManager transactionManager = new TransactionManager();
private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
- private static GraphClientImpl2 INSTANCE;
private Path dbFolder;
private final Database database;
private ServiceLocator locator;
+ private FileCache fileCache;
private MainProgram mainProgram;
- static class ClientThreadFactory implements ThreadFactory {
-
+ private static class ClientThreadFactory implements ThreadFactory {
+
final String name;
final boolean daemon;
-
+
public ClientThreadFactory(String name, boolean daemon) {
this.name = name;
this.daemon = daemon;
}
-
+
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, name);
this.database = database;
this.dbFolder = dbFolder;
this.locator = locator;
- this.clusters = new ClusterManager(dbFolder);
+ this.fileCache = new FileCache();
+ // This disposes the cache when the session is shut down
+ locator.registerService(FileCache.class, fileCache);
+ this.clusters = new ClusterManager(dbFolder, fileCache);
load();
- ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
- cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ cssi.setReadDirectory(clusters.lastSessionDirectory);
+ cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
executor.execute(mainProgram);
- INSTANCE = this;
}
public Path getDbFolder() {
return dbFolder;
}
- public void tryMakeSnapshot() throws IOException {
-
- if (isClosing)
- return;
-
+ /*
+ * This method schedules snapshotting.
+ * No lock and thread restrictions.
+ */
+ void tryMakeSnapshot() throws IOException {
+
+ if (isClosing || unexpectedClose)
+ return;
+
saver.execute(new Runnable() {
@Override
// First take a write transaction
tr = askWriteTransaction(-1);
// Then make sure that MainProgram is idling
- mainProgram.mutex.acquire();
- try {
- synchronized(mainProgram) {
- if(mainProgram.operations.isEmpty()) {
- makeSnapshot(false);
- } else {
- // MainProgram is becoming busy again - delay snapshotting
- return;
- }
- }
- } finally {
- mainProgram.mutex.release();
- }
- } catch (IOException e) {
- Logger.defaultLogError(e);
- } catch (ProCoreException e) {
- Logger.defaultLogError(e);
- } catch (InterruptedException e) {
- Logger.defaultLogError(e);
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ LOGGER.error("Snapshotting failed", e);
+ unexpectedClose = true;
+ } catch (SDBException e) {
+ LOGGER.error("Snapshotting failed", e);
+ unexpectedClose = true;
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
+ }
+ }
} catch (ProCoreException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("Failed to end snapshotting write transaction", e);
}
}
}
-
});
}
-
- public void makeSnapshot(boolean force) throws IOException {
- clusters.makeSnapshot(locator, force);
- }
-
+
+ private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+ clusters.makeSnapshot(locator, fullSave);
+ }
+
+ @Override
public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
- return clusters.clone(uid, creator);
+ try {
+ return clusters.clone(uid, creator);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
+ unexpectedClose = true;
+ throw new DatabaseException(e);
+ }
}
-// private void save() throws IOException {
-// clusters.save();
-// }
-
- public void load() throws IOException {
+ private void load() throws IOException {
clusters.load();
}
-
-// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
-// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
-// }
@Override
public Database getDatabase() {
private boolean closed = false;
private boolean isClosing = false;
-
+ private boolean unexpectedClose = false;
+
@Override
public void close() throws ProCoreException {
- System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+ LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
if(!closed && !isClosing) {
- isClosing = true;
+ isClosing = true;
try {
- makeSnapshot(true);
-
+
+ if (!unexpectedClose)
+ synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
+
mainProgram.close();
clusters.shutdown();
executor.shutdown();
saver.shutdown();
+
boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
-
- System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
-
- INSTANCE = null;
+
+ LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
+
+ try {
+ clusters.mainState.save(dbFolder);
+ } catch (IOException e) {
+ LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
+ }
+
mainProgram = null;
executor = null;
saver = null;
-
- } catch (IOException | InterruptedException e) {
+
+ } catch (IllegalAcornStateException | InterruptedException e) {
throw new ProCoreException(e);
+ } catch (SDBException e1) {
+ throw new ProCoreException(e1);
}
+ closed = true;
}
- closed = true;
//impl.close();
}
public boolean isClosed() throws ProCoreException {
return closed;
}
-
+
@Override
public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
-
clusters.state.headChangeSetId++;
-
long committedChangeSetId = changeSetId + 1;
-
- clusters.commitChangeSet(committedChangeSetId, metadata);
-
- clusters.state.transactionId = transactionId;
-
- mainProgram.committed();
-
- TimeLogger.log("Accepted commit");
-
+ try {
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+ clusters.state.transactionId = transactionId;
+ mainProgram.committed();
+ TimeLogger.log("Accepted commit");
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
- public long cancelCommit(long transactionId, long changeSetId,
- byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
- throws ProCoreException {
- System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
- try {
- undo(new long[] {changeSetId}, onChangeSetUpdate);
- } catch (SDBException e) {
- e.printStackTrace();
- throw new ProCoreException(e);
- }
- clusters.state.headChangeSetId++;
- return clusters.state.headChangeSetId;
+ public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
+ // Accept and finalize current transaction and then undo it
+ acceptCommit(transactionId, changeSetId, metadata);
+
+ try {
+ undo(new long[] {changeSetId+1}, onChangeSetUpdate);
+ clusters.state.headChangeSetId++;
+ return clusters.state.headChangeSetId;
+ } catch (SDBException e) {
+ LOGGER.error("Failed to undo cancelled transaction", e);
+ throw new ProCoreException(e);
+ }
}
@Override
return transactionManager.askReadTransaction();
}
- enum TransactionState {
+ private enum TransactionState {
IDLE,WRITE,READ
}
-
- class TransactionRequest {
+
+ private class TransactionRequest {
public TransactionState state;
public Semaphore semaphore;
public TransactionRequest(TransactionState state, Semaphore semaphore) {
}
}
- class TransactionManager {
+ private class TransactionManager {
private TransactionState currentTransactionState = TransactionState.IDLE;
-
+
private int reads = 0;
-
- LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
-
- TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
-
+
+ private LinkedList<TransactionRequest> requests = new LinkedList<>();
+
+ private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
+
private synchronized Transaction makeTransaction(TransactionRequest req) {
-
+
final int csId = clusters.state.headChangeSetId;
final long trId = clusters.state.transactionId+1;
requestMap.put(trId, req);
return new Transaction() {
-
+
@Override
public long getTransactionId() {
return trId;
}
-
+
@Override
public long getHeadChangeSetId() {
return csId;
}
};
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askReadTransaction() throws ProCoreException {
-
+ private Transaction askReadTransaction() throws ProCoreException {
+
Semaphore semaphore = new Semaphore(0);
-
+
TransactionRequest req = queue(TransactionState.READ, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new ProCoreException(e);
}
-
+
return makeTransaction(req);
}
if(r.state == TransactionState.READ) reads++;
r.semaphore.release();
}
-
+
private synchronized void processRequests() {
-
+
while(true) {
if(requests.isEmpty()) return;
TransactionRequest req = requests.peek();
if(currentTransactionState == TransactionState.IDLE) {
-
+
// Accept anything while IDLE
currentTransactionState = req.state;
dispatch();
-
+
} else if (currentTransactionState == TransactionState.READ) {
-
+
if(req.state == currentTransactionState) {
// Allow other reads
dispatch();
} else {
-
+
// Wait
return;
-
+
}
-
+
} else if (currentTransactionState == TransactionState.WRITE) {
// Wait
return;
-
+
}
-
+
}
-
+
}
-
+
private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
- TransactionRequest req = new TransactionRequest(state, semaphore);
+ TransactionRequest req = new TransactionRequest(state, semaphore);
requests.addLast(req);
processRequests();
return req;
}
-
+
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askWriteTransaction()
- throws ProCoreException {
-
+ private Transaction askWriteTransaction() throws IllegalAcornStateException {
+
Semaphore semaphore = new Semaphore(0);
-
TransactionRequest req = queue(TransactionState.WRITE, semaphore);
-
+
try {
semaphore.acquire();
} catch (InterruptedException e) {
- throw new ProCoreException(e);
+ throw new IllegalAcornStateException(e);
}
-
mainProgram.startTransaction(clusters.state.headChangeSetId+1);
-
return makeTransaction(req);
-
}
-
- public synchronized long endTransaction(long transactionId) throws ProCoreException {
-
+
+ private synchronized long endTransaction(long transactionId) throws ProCoreException {
+
TransactionRequest req = requestMap.remove(transactionId);
if(req.state == TransactionState.WRITE) {
currentTransactionState = TransactionState.IDLE;
}
}
-
+
@Override
- public Transaction askWriteTransaction(final long transactionId)
- throws ProCoreException {
- return transactionManager.askWriteTransaction();
+ public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
+ try {
+ if (isClosing || unexpectedClose || closed) {
+ throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
+ }
+ return transactionManager.askWriteTransaction();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
}
@Override
- public byte[] getChangeSetMetadata(long changeSetId)
- throws ProCoreException {
- return clusters.getMetadata(changeSetId);
+ public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
+ try {
+ return clusters.getMetadata(changeSetId);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
public ChangeSetData getChangeSetData(long minChangeSetId,
long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
throws ProCoreException {
-
+
new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
return null;
-
+
}
@Override
@Override
public ClusterIds getClusterIds() throws ProCoreException {
- return clusters.getClusterIds();
+ try {
+ return clusters.getClusterIds();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
public long getFirstChangeSetId() {
return 0;
}
-
+
};
}
@Override
public Refresh getRefresh(long changeSetId) throws ProCoreException {
-
+
final ClusterIds ids = getClusterIds();
-
+
return new Refresh() {
@Override
public long[] getSecond() {
return ids.getSecond();
}
-
+
};
-
- }
- public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
- return clusters.getResourceFile(clusterUID, resourceIndex);
}
- @Override
- public ResourceSegment getResourceSegment(final byte[] clusterUID,
- final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
-
- return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
+// return clusters.getResourceFile(clusterUID, resourceIndex);
+// }
+ @Override
+ public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
+ try {
+ return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
@Override
public void updateCluster(byte[] operations) throws ProCoreException {
-
- ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
- ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
- if(info == null) throw new IllegalStateException();
- info.acquireMutex();
- try {
+ ClusterInfo info = null;
+ try {
+ ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
+ info = clusters.clusterLRU.getOrCreate(operation.uid, true);
+ if(info == null)
+ throw new IllegalAcornStateException("info == null for operation " + operation);
+ info.acquireMutex();
info.scheduleUpdate();
mainProgram.schedule(operation);
- } catch (Throwable t) {
- throw new IllegalStateException(t);
- } finally {
- info.releaseMutex();
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ throw new ProCoreException(e);
+ } finally {
+ if (info != null)
+ info.releaseMutex();
}
-
}
- private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
+ private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
String[] ss = ccsId.split("\\.");
String chunkKey = ss[0];
int chunkOffset = Integer.parseInt(ss[1]);
ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
- if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
+ if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
chunk.acquireMutex();
try {
return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
+ } catch (DatabaseException e) {
+ throw e;
} catch (Throwable t) {
throw new IllegalStateException(t);
} finally {
chunk.releaseMutex();
}
-
}
-
- private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
+ private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
for(int i=0;i<proc.entries.size();i++) {
-
+
Entry e = proc.entries.get(proc.entries.size() - 1 - i);
e.process(clusters, cs, clusterKey);
-
}
-
cs.flush();
} finally {
clusters.clusterLRU.releaseMutex();
}
-
}
-
- @Override
- public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
-
- final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-
- UndoClusterSupport support = new UndoClusterSupport(clusters);
-
- final int changeSetId = clusters.state.headChangeSetId;
-
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === BEGIN UNDO ===");
-
- for(int i=0;i<changeSetIds.length;i++) {
- final long id = changeSetIds[changeSetIds.length-1-i];
- ArrayList<String> ccss = clusters.getChanges(id);
- for(int j=0;j<ccss.size();j++) {
+
+ private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
+
+ Exception[] exception = { null };
+ Semaphore s = new Semaphore(0);
+
+ mainProgram.runIdle(new MainProgramRunnable() {
+
+ @Override
+ public void success() {
try {
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
- performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
- } catch (DatabaseException e) {
- e.printStackTrace();
+ runnable.success();
+ } finally {
+ s.release();
}
}
+
+ @Override
+ public void error(Exception e) {
+ exception[0] = e;
+ try {
+ runnable.error(e);
+ } finally {
+ s.release();
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ runnable.run();
+ }
+
+ });
+
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ throw new IllegalAcornStateException("Unhandled interruption.", e);
+ }
+
+ Exception e = exception[0];
+ if(e != null) {
+ if(e instanceof SDBException) throw (SDBException)e;
+ else if(e != null) throw new IllegalAcornStateException(e);
}
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === END UNDO ===");
+ }
- for(int i=0;i<clusterChanges.size();i++) {
-
- final int changeSetIndex = i;
-
- final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-
- final ClusterUID cuid = pair.first;
- final byte[] data = pair.second;
+ @Override
+ public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
- onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+ synchronizeWithIdleMainProgram(new MainProgramRunnable() {
- @Override
- public long getChangeSetId() {
- return changeSetId;
- }
+ @Override
+ public void run() throws Exception {
- @Override
- public int getChangeSetIndex() {
- return 0;
- }
+ try {
- @Override
- public int getNumberOfClusterChangeSets() {
- return clusterChanges.size();
- }
+ final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
- @Override
- public int getIndexOfClusterChangeSet() {
- return changeSetIndex;
- }
+ UndoClusterSupport support = new UndoClusterSupport(clusters);
- @Override
- public byte[] getClusterId() {
- return cuid.asBytes();
- }
+ final int changeSetId = clusters.state.headChangeSetId;
- @Override
- public boolean getNewCluster() {
- return false;
- }
+ if(ClusterUpdateProcessorBase.DEBUG)
+ LOGGER.info(" === BEGIN UNDO ===");
- @Override
- public byte[] getData() {
- return data;
- }
+ for(int i=0;i<changeSetIds.length;i++) {
+ final long id = changeSetIds[changeSetIds.length-1-i];
+ ArrayList<String> ccss = clusters.getChanges(id);
- });
+ for(int j=0;j<ccss.size();j++) {
+ String ccsid = ccss.get(ccss.size()-j-1);
+ try {
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println("performUndo " + ccsid);
+ performUndo(ccsid, clusterChanges, support);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
- }
+ if(ClusterUpdateProcessorBase.DEBUG)
+ LOGGER.info(" === END UNDO ===");
+
+ for(int i=0;i<clusterChanges.size();i++) {
+
+ final int changeSetIndex = i;
+
+ final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+
+ final ClusterUID cuid = pair.first;
+ final byte[] data = pair.second;
+
+ onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+ @Override
+ public long getChangeSetId() {
+ return changeSetId;
+ }
+
+ @Override
+ public int getChangeSetIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return clusterChanges.size();
+ }
+
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return changeSetIndex;
+ }
+
+ @Override
+ public byte[] getClusterId() {
+ return cuid.asBytes();
+ }
+
+ @Override
+ public boolean getNewCluster() {
+ return false;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ });
+ }
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
+ throw new ProCoreException(e1);
+ }
+
+ }
+
+ });
-
return false;
-
- }
-
- public static GraphClientImpl2 getInstance() {
- return INSTANCE;
+
}
-
- public ServiceLocator getServiceLocator() {
+
+ ServiceLocator getServiceLocator() {
return locator;
}
return false;
}
-
-
-
-
-
-
-
-
-
-
- ////////////////////////
-
-
-
-
-
-
-
-
-
-
-
-
+ @Override
+ public boolean rolledback() {
+ return clusters.rolledback();
+ }
+
+ private void purge() throws IllegalAcornStateException {
+ clusters.purge(locator);
+ }
+
+ public void purgeDatabase() {
+
+ if (isClosing || unexpectedClose)
+ return;
+
+ saver.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Transaction tr = null;
+ try {
+ // First take a write transaction
+ tr = askWriteTransaction(-1);
+ // Then make sure that MainProgram is idling
+ synchronizeWithIdleMainProgram(() -> purge());
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ LOGGER.error("Purge failed", e);
+ unexpectedClose = true;
+ } catch (SDBException e) {
+ LOGGER.error("Purge failed", e);
+ unexpectedClose = true;
+ } finally {
+ try {
+ if(tr != null)
+ endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
+ }
+ }
+ } catch (ProCoreException e) {
+ LOGGER.error("Failed to end purge write transaction", e);
+ }
+ }
+ }
+ });
+
+ }
+
+ public long getTailChangeSetId() {
+ return clusters.getTailChangeSetId();
+ }
+
+ public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
+
+ makeSnapshot(true);
+
+ Path dbDir = getDbFolder();
+ int newestFolder = clusters.mainState.headDir - 1;
+ int latestFolder = -2;
+ Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
+ if (Files.exists(AcornMetadataFile)) {
+ try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
+ latestFolder = Integer.parseInt( br.readLine() );
+ }
+ }
+
+ AcornBackupRunnable r = new AcornBackupRunnable(
+ lock, targetPath, revision, dbDir, latestFolder, newestFolder);
+ new Thread(r, "Acorn backup thread").start();
+ return r;
+
+ }
+
}