import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.internal.ClusterChange;
import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
import org.simantics.db.server.ProCoreException;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.LifecycleSupport;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
- private static GraphClientImpl2 INSTANCE;
private Path dbFolder;
private final Database database;
private ServiceLocator locator;
this.clusters = new ClusterManager(dbFolder);
load();
ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
- cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
+ cssi.setReadDirectory(clusters.lastSessionDirectory);
+ cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
executor.execute(mainProgram);
- INSTANCE = this;
}
public Path getDbFolder() {
public void tryMakeSnapshot() throws IOException {
- if (isClosing)
+ if (isClosing || unexpectedClose)
return;
saver.execute(new Runnable() {
} finally {
mainProgram.mutex.release();
}
- } catch (IOException e) {
- Logger.defaultLogError(e);
- } catch (ProCoreException e) {
- Logger.defaultLogError(e);
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ Logger.defaultLogError("Snapshotting failed", e);
+ unexpectedClose = true;
} catch (InterruptedException e) {
- Logger.defaultLogError(e);
+ Logger.defaultLogError("Snapshotting interrupted", e);
} finally {
try {
if(tr != null)
endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
+ }
+ }
} catch (ProCoreException e) {
- Logger.defaultLogError(e);
+ Logger.defaultLogError("Failed to end snapshotting write transaction", e);
}
}
}
-
});
}
- public void makeSnapshot(boolean force) throws IOException {
- if (safeToMakeSnapshot)
- clusters.makeSnapshot(locator, force);
+ public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+ clusters.makeSnapshot(locator, fullSave);
}
public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
- return clusters.clone(uid, creator);
+ try {
+ return clusters.clone(uid, creator);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
+ unexpectedClose = true;
+ throw new DatabaseException(e);
+ }
}
// private void save() throws IOException {
private boolean closed = false;
private boolean isClosing = false;
- // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
- private boolean safeToMakeSnapshot = true;
+ private boolean unexpectedClose = false;
@Override
public void close() throws ProCoreException {
if(!closed && !isClosing) {
isClosing = true;
try {
- makeSnapshot(true);
+ if (!unexpectedClose)
+ makeSnapshot(true);
mainProgram.close();
clusters.shutdown();
System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
- INSTANCE = null;
mainProgram = null;
executor = null;
saver = null;
- } catch (IOException | InterruptedException e) {
+ } catch (IllegalAcornStateException | InterruptedException e) {
throw new ProCoreException(e);
}
+ closed = true;
}
- closed = true;
//impl.close();
}
@Override
public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
-
clusters.state.headChangeSetId++;
-
long committedChangeSetId = changeSetId + 1;
-
- clusters.commitChangeSet(committedChangeSetId, metadata);
-
- clusters.state.transactionId = transactionId;
-
- mainProgram.committed();
-
- TimeLogger.log("Accepted commit");
-
+ try {
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+
+ clusters.state.transactionId = transactionId;
+
+ mainProgram.committed();
+
+ TimeLogger.log("Accepted commit");
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
- public long cancelCommit(long transactionId, long changeSetId,
- byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
- throws ProCoreException {
- safeToMakeSnapshot = false;
- throw new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
-// System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
-// try {
-// undo(new long[] {changeSetId}, onChangeSetUpdate);
-// } catch (SDBException e) {
-// e.printStackTrace();
-// throw new ProCoreException(e);
-// }
-// clusters.state.headChangeSetId++;
-// return clusters.state.headChangeSetId;
+ public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
+ // Accept and finalize current transaction and then undo it
+ acceptCommit(transactionId, changeSetId, metadata);
+
+ try {
+ undo(new long[] {changeSetId+1}, onChangeSetUpdate);
+ clusters.state.headChangeSetId++;
+ return clusters.state.headChangeSetId;
+ } catch (SDBException e) {
+ Logger.defaultLogError("Failed to undo cancelled transaction", e);
+ throw new ProCoreException(e);
+ }
}
@Override
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
*/
- public Transaction askWriteTransaction()
- throws ProCoreException {
+ public Transaction askWriteTransaction() throws IllegalAcornStateException {
Semaphore semaphore = new Semaphore(0);
-
TransactionRequest req = queue(TransactionState.WRITE, semaphore);
try {
semaphore.acquire();
} catch (InterruptedException e) {
- throw new ProCoreException(e);
+ throw new IllegalAcornStateException(e);
}
-
mainProgram.startTransaction(clusters.state.headChangeSetId+1);
-
return makeTransaction(req);
-
}
public synchronized long endTransaction(long transactionId) throws ProCoreException {
}
@Override
- public Transaction askWriteTransaction(final long transactionId)
- throws ProCoreException {
- return transactionManager.askWriteTransaction();
+ public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
+ try {
+ if (isClosing || unexpectedClose || closed) {
+ throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
+ }
+ return transactionManager.askWriteTransaction();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
}
@Override
- public byte[] getChangeSetMetadata(long changeSetId)
- throws ProCoreException {
- return clusters.getMetadata(changeSetId);
+ public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
+ try {
+ return clusters.getMetadata(changeSetId);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
@Override
public ClusterIds getClusterIds() throws ProCoreException {
- return clusters.getClusterIds();
+ try {
+ return clusters.getClusterIds();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
}
- public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
+ public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
return clusters.getResourceFile(clusterUID, resourceIndex);
}
@Override
- public ResourceSegment getResourceSegment(final byte[] clusterUID,
- final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
-
- return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
-
+ public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
+ try {
+ return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
}
@Override
@Override
public void updateCluster(byte[] operations) throws ProCoreException {
-
- ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
- ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
- if(info == null) throw new IllegalStateException();
- info.acquireMutex();
- try {
+ ClusterInfo info = null;
+ try {
+ ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
+ info = clusters.clusterLRU.getOrCreate(operation.uid, true);
+ if(info == null)
+ throw new IllegalAcornStateException("info == null for operation " + operation);
+ info.acquireMutex();
info.scheduleUpdate();
mainProgram.schedule(operation);
- } catch (Throwable t) {
- throw new IllegalStateException(t);
- } finally {
- info.releaseMutex();
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ throw new ProCoreException(e);
+ } finally {
+ if (info != null)
+ info.releaseMutex();
}
-
}
- private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
+ private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
String[] ss = ccsId.split("\\.");
String chunkKey = ss[0];
int chunkOffset = Integer.parseInt(ss[1]);
ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
- if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
+ if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
chunk.acquireMutex();
try {
return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
+ } catch (DatabaseException e) {
+ throw e;
} catch (Throwable t) {
throw new IllegalStateException(t);
} finally {
chunk.releaseMutex();
}
-
}
- private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
-
+ private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
Entry e = proc.entries.get(proc.entries.size() - 1 - i);
e.process(clusters, cs, clusterKey);
-
}
-
cs.flush();
} finally {
clusters.clusterLRU.releaseMutex();
}
-
}
-
+
@Override
public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
-
- final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-
- UndoClusterSupport support = new UndoClusterSupport(clusters);
-
- final int changeSetId = clusters.state.headChangeSetId;
-
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === BEGIN UNDO ===");
-
- for(int i=0;i<changeSetIds.length;i++) {
- final long id = changeSetIds[changeSetIds.length-1-i];
- ArrayList<String> ccss = clusters.getChanges(id);
- for(int j=0;j<ccss.size();j++) {
- try {
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
- performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
- } catch (DatabaseException e) {
- e.printStackTrace();
- }
- }
- }
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === END UNDO ===");
+ Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
- for(int i=0;i<clusterChanges.size();i++) {
-
- final int changeSetIndex = i;
-
- final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-
- final ClusterUID cuid = pair.first;
- final byte[] data = pair.second;
-
- onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
-
- @Override
- public long getChangeSetId() {
- return changeSetId;
- }
-
- @Override
- public int getChangeSetIndex() {
- return 0;
- }
-
- @Override
- public int getNumberOfClusterChangeSets() {
- return clusterChanges.size();
- }
+ @Override
+ public void run() throws Exception {
- @Override
- public int getIndexOfClusterChangeSet() {
- return changeSetIndex;
- }
+ try {
- @Override
- public byte[] getClusterId() {
- return cuid.asBytes();
- }
+ final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
+
+ UndoClusterSupport support = new UndoClusterSupport(clusters);
+
+ final int changeSetId = clusters.state.headChangeSetId;
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === BEGIN UNDO ===");
+
+ for(int i=0;i<changeSetIds.length;i++) {
+ final long id = changeSetIds[changeSetIds.length-1-i];
+ ArrayList<String> ccss = clusters.getChanges(id);
+
+ for(int j=0;j<ccss.size();j++) {
+ String ccsid = ccss.get(ccss.size()-j-1);
+ try {
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println("performUndo " + ccsid);
+ performUndo(ccsid, clusterChanges, support);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === END UNDO ===");
+
+ for(int i=0;i<clusterChanges.size();i++) {
+
+ final int changeSetIndex = i;
+
+ final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+
+ final ClusterUID cuid = pair.first;
+ final byte[] data = pair.second;
+
+ onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+ @Override
+ public long getChangeSetId() {
+ return changeSetId;
+ }
+
+ @Override
+ public int getChangeSetIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return clusterChanges.size();
+ }
+
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return changeSetIndex;
+ }
+
+ @Override
+ public byte[] getClusterId() {
+ return cuid.asBytes();
+ }
+
+ @Override
+ public boolean getNewCluster() {
+ return false;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ });
+ }
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
+ throw new ProCoreException(e1);
+ }
- @Override
- public boolean getNewCluster() {
- return false;
- }
+ }
- @Override
- public byte[] getData() {
- return data;
- }
+ @Override
+ public void done() {
- });
+ }
- }
+ });
+ if(exception instanceof SDBException) throw (SDBException)exception;
+ else if(exception != null) throw new IllegalAcornStateException(exception);
return false;
}
- public static GraphClientImpl2 getInstance() {
- return INSTANCE;
- }
-
public ServiceLocator getServiceLocator() {
return locator;
}
return false;
}
+ @Override
+ public boolean rolledback() {
+ return clusters.rolledback();
+ }
+