import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
import org.simantics.acorn.exception.AcornAccessVerificationException;
import org.simantics.acorn.exception.IllegalAcornStateException;
import org.simantics.acorn.internal.ClusterChange;
import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
import org.simantics.acorn.lru.ClusterInfo;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.ClusterUpdateOperation;
-import org.simantics.acorn.lru.ClusterChangeSet.Entry;
import org.simantics.db.ClusterCreator;
import org.simantics.db.Database;
import org.simantics.db.ServiceLocator;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.SDBException;
import org.simantics.db.server.ProCoreException;
import org.simantics.db.service.LifecycleSupport;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TLongObjectHashMap;
public class GraphClientImpl2 implements Database.Session {
-
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
public static final boolean DEBUG = false;
public final ClusterManager clusters;
load();
ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
cssi.setReadDirectory(clusters.lastSessionDirectory);
+ cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
executor.execute(mainProgram);
}
mainProgram.mutex.release();
}
} catch (IllegalAcornStateException | ProCoreException e) {
- Logger.defaultLogError("Snapshotting failed", e);
+ LOGGER.error("Snapshotting failed", e);
unexpectedClose = true;
} catch (InterruptedException e) {
- Logger.defaultLogError("Snapshotting interrupted", e);
+ LOGGER.error("Snapshotting interrupted", e);
} finally {
try {
if(tr != null)
try {
support.close();
} catch (DatabaseException e1) {
- Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
+ LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
}
}
} catch (ProCoreException e) {
- Logger.defaultLogError("Failed to end snapshotting write transaction", e);
+ LOGGER.error("Failed to end snapshotting write transaction", e);
}
}
}
@Override
public void close() throws ProCoreException {
- System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+ LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
if(!closed && !isClosing) {
isClosing = true;
try {
clusters.state.headChangeSetId++;
return clusters.state.headChangeSetId;
} catch (SDBException e) {
- Logger.defaultLogError("Failed to undo cancelled transaction", e);
+ LOGGER.error("Failed to undo cancelled transaction", e);
throw new ProCoreException(e);
}
}
clusters.clusterLRU.releaseMutex();
}
}
-
+
@Override
public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
- try {
- final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-
- UndoClusterSupport support = new UndoClusterSupport(clusters);
-
- final int changeSetId = clusters.state.headChangeSetId;
-
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === BEGIN UNDO ===");
-
- for(int i=0;i<changeSetIds.length;i++) {
- final long id = changeSetIds[changeSetIds.length-1-i];
- ArrayList<String> ccss = clusters.getChanges(id);
-
- for(int j=0;j<ccss.size();j++) {
- String ccsid = ccss.get(ccss.size()-j-1);
- try {
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println("performUndo " + ccsid);
- performUndo(ccsid, clusterChanges, support);
- } catch (DatabaseException e) {
- e.printStackTrace();
- }
- }
- }
-
- if(ClusterUpdateProcessorBase.DEBUG)
- System.err.println(" === END UNDO ===");
-
- for(int i=0;i<clusterChanges.size();i++) {
-
- final int changeSetIndex = i;
-
- final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-
- final ClusterUID cuid = pair.first;
- final byte[] data = pair.second;
-
- onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
-
- @Override
- public long getChangeSetId() {
- return changeSetId;
- }
-
- @Override
- public int getChangeSetIndex() {
- return 0;
- }
-
- @Override
- public int getNumberOfClusterChangeSets() {
- return clusterChanges.size();
- }
-
- @Override
- public int getIndexOfClusterChangeSet() {
- return changeSetIndex;
- }
-
- @Override
- public byte[] getClusterId() {
- return cuid.asBytes();
- }
-
- @Override
- public boolean getNewCluster() {
- return false;
- }
-
- @Override
- public byte[] getData() {
- return data;
- }
-
- });
- }
- } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
- throw new ProCoreException(e1);
- }
+
+ Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
+
+ @Override
+ public void run() throws Exception {
+
+ try {
+
+ final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
+
+ UndoClusterSupport support = new UndoClusterSupport(clusters);
+
+ final int changeSetId = clusters.state.headChangeSetId;
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === BEGIN UNDO ===");
+
+ for(int i=0;i<changeSetIds.length;i++) {
+ final long id = changeSetIds[changeSetIds.length-1-i];
+ ArrayList<String> ccss = clusters.getChanges(id);
+
+ for(int j=0;j<ccss.size();j++) {
+ String ccsid = ccss.get(ccss.size()-j-1);
+ try {
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println("performUndo " + ccsid);
+ performUndo(ccsid, clusterChanges, support);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === END UNDO ===");
+
+ for(int i=0;i<clusterChanges.size();i++) {
+
+ final int changeSetIndex = i;
+
+ final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+
+ final ClusterUID cuid = pair.first;
+ final byte[] data = pair.second;
+
+ onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+ @Override
+ public long getChangeSetId() {
+ return changeSetId;
+ }
+
+ @Override
+ public int getChangeSetIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return clusterChanges.size();
+ }
+
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return changeSetIndex;
+ }
+
+ @Override
+ public byte[] getClusterId() {
+ return cuid.asBytes();
+ }
+
+ @Override
+ public boolean getNewCluster() {
+ return false;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ });
+ }
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
+ throw new ProCoreException(e1);
+ }
+
+ }
+
+ @Override
+ public void done() {
+
+ }
+
+ });
+
+ if(exception instanceof SDBException) throw (SDBException)exception;
+ else if(exception != null) throw new IllegalAcornStateException(exception);
+
return false;
+
}
public ServiceLocator getServiceLocator() {
public boolean rolledback() {
return clusters.rolledback();
}
+
+ public void purge() throws IllegalAcornStateException {
+ clusters.purge(locator);
+ }
-
-
-
-
-
-
-
-
-
-
- ////////////////////////
-
-
-
-
-
-
-
-
-
-
-
-
+ public void purgeDatabase() {
+
+ if (isClosing || unexpectedClose)
+ return;
+
+ saver.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Transaction tr = null;
+ try {
+ // First take a write transaction
+ tr = askWriteTransaction(-1);
+ // Then make sure that MainProgram is idling
+ mainProgram.mutex.acquire();
+ try {
+ synchronized(mainProgram) {
+ if(mainProgram.operations.isEmpty()) {
+ purge();
+ } else {
+ // MainProgram is becoming busy again - delay snapshotting
+ return;
+ }
+ }
+ } finally {
+ mainProgram.mutex.release();
+ }
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ LOGGER.error("Purge failed", e);
+ unexpectedClose = true;
+ } catch (InterruptedException e) {
+ LOGGER.error("Purge interrupted", e);
+ } finally {
+ try {
+ if(tr != null)
+ endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
+ }
+ }
+ } catch (ProCoreException e) {
+ LOGGER.error("Failed to end purge write transaction", e);
+ }
+ }
+ }
+ });
+
+ }
+
+ public long getTailChangeSetId() {
+ return clusters.getTailChangeSetId();
+ }
+
}