import java.io.IOException;
import java.math.BigInteger;
+import java.nio.file.CopyOption;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.simantics.acorn.lru.ClusterStreamChunk;
import org.simantics.acorn.lru.FileInfo;
import org.simantics.acorn.lru.LRU;
+import org.simantics.databoard.file.RuntimeIOException;
import org.simantics.db.ClusterCreator;
import org.simantics.db.Database.Session.ClusterIds;
import org.simantics.db.Database.Session.ResourceSegment;
import org.simantics.db.procore.cluster.ClusterTraits;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.FileUtils;
import org.simantics.utils.threads.logger.ITask;
import org.simantics.utils.threads.logger.ThreadLogger;
import org.slf4j.Logger;
private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
private IllegalAcornStateException cause;
+ public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException {
+
+ try {
+
+ // Schedule writing of all data to disk
+ refreshHeadState();
+ // Wait for files to be written
+ synchronizeWorkingDirectory();
+
+ String currentDir = workingDirectory.getFileName().toString();
+ Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline");
+
+ Files.createDirectories(baseline);
+
+ for(String clusterKey : state.clusters) {
+ String[] parts1 = clusterKey.split("#");
+ String[] parts = parts1[0].split("\\.");
+ String readDirName = parts1[1];
+ if(!readDirName.equals(currentDir)) {
+ String fileName = parts[0] + "." + parts[1] + ".cluster";
+ Path from = dbFolder.resolve(readDirName).resolve(fileName);
+ Path to = baseline.resolve(fileName);
+ System.err.println("purge copies " + from + " => " + to);
+ Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
+ long first = new BigInteger(parts[0], 16).longValue();
+ long second = new BigInteger(parts[1], 16).longValue();
+ ClusterUID uuid = ClusterUID.make(first, second);
+ ClusterInfo info = clusterLRU.getWithoutMutex(uuid);
+ info.moveTo(baseline);
+ }
+ }
+
+ for (String fileKey : state.files) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ String fileName = parts[0] + ".extFile";
+ Path from = dbFolder.resolve(readDirName).resolve(fileName);
+ Path to = baseline.resolve(fileName);
+ System.err.println("purge copies " + from + " => " + to);
+ Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
+ FileInfo info = fileLRU.getWithoutMutex(parts[0]);
+ info.moveTo(baseline);
+ }
+ }
+
+ for (String fileKey : state.stream) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ ClusterStreamChunk chunk = streamLRU.purge(parts[0]);
+ System.err.println("purge removes " + chunk);
+ }
+ }
+
+ // Change sets
+ for (String fileKey : state.cs) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ Long revisionId = Long.parseLong(parts[0]);
+ ChangeSetInfo info = csLRU.purge(revisionId);
+ System.err.println("purge removes " + info);
+ }
+// Path readDir = dbFolder.resolve(parts[1]);
+// Long revisionId = Long.parseLong(parts[0]);
+// int offset = Integer.parseInt(parts[2]);
+// int length = Integer.parseInt(parts[3]);
+// ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+// csLRU.map(info);
+ }
+
+ state.tailChangeSetId = state.headChangeSetId;
+
+ makeSnapshot(locator, true);
+
+ Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f));
+
+ } catch (IllegalAcornStateException e) {
+ notSafeToMakeSnapshot(e);
+ throw e;
+ } catch (IOException e) {
+ IllegalAcornStateException e1 = new IllegalAcornStateException(e);
+ notSafeToMakeSnapshot(e1);
+ throw e1;
+ } catch (AcornAccessVerificationException e) {
+ IllegalAcornStateException e1 = new IllegalAcornStateException(e);
+ notSafeToMakeSnapshot(e1);
+ throw e1;
+ }
+
+ }
+
+ void tryPurgeDirectory(Path f) {
+
+
+ System.err.println("purge deletes " + f);
+
+ String currentDir = f.getFileName().toString();
+ if(currentDir.endsWith("db"))
+ return;
+
+ if(currentDir.endsWith("_baseline"))
+ currentDir = currentDir.replace("_baseline", "");
+
+ int ordinal = Integer.parseInt(currentDir);
+ if(ordinal < mainState.headDir - 1) {
+ System.err.println("purge deletes " + f);
+ FileUtils.deleteDir(f.toFile());
+ }
+
+ }
+
public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
try {
if (!safeToMakeSnapshot.get())
csLRU.persist(state.cs);
}
- private void persistHeadState() throws IOException {
+ private void synchronizeWorkingDirectory() throws IOException {
// Sync current working directory
Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
+ }
+
+ private void persistHeadState() throws IOException {
+ synchronizeWorkingDirectory();
state.save(workingDirectory);
mainState.headDir++;
}
this.cause = t;
}
+ public long getTailChangeSetId() {
+ return state.tailChangeSetId;
+ }
+
}
public boolean rolledback() {
return clusters.rolledback();
}
+
+ public void purge() throws IllegalAcornStateException {
+ clusters.purge(locator);
+ }
-
-
-
-
-
-
-
-
-
-
- ////////////////////////
-
-
-
-
-
-
-
-
-
-
-
-
+ public void purgeDatabase() {
+
+ if (isClosing || unexpectedClose)
+ return;
+
+ saver.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Transaction tr = null;
+ try {
+ // First take a write transaction
+ tr = askWriteTransaction(-1);
+ // Then make sure that MainProgram is idling
+ mainProgram.mutex.acquire();
+ try {
+ synchronized(mainProgram) {
+ if(mainProgram.operations.isEmpty()) {
+ purge();
+ } else {
+ // MainProgram is becoming busy again - delay snapshotting
+ return;
+ }
+ }
+ } finally {
+ mainProgram.mutex.release();
+ }
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ Logger.defaultLogError("Purge failed", e);
+ unexpectedClose = true;
+ } catch (InterruptedException e) {
+ Logger.defaultLogError("Purge interrupted", e);
+ } finally {
+ try {
+ if(tr != null)
+ endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ Logger.defaultLogError("Failed to close database as a safety measure due to failed purge", e1);
+ }
+ }
+ } catch (ProCoreException e) {
+ Logger.defaultLogError("Failed to end purge write transaction", e);
+ }
+ }
+ }
+ });
+
+ }
+
+ public long getTailChangeSetId() {
+ return clusters.getTailChangeSetId();
+ }
+
}
import org.simantics.acorn.exception.InvalidHeadStateException;
import org.simantics.databoard.Bindings;
+import org.simantics.databoard.adapter.AdapterConstructionException;
import org.simantics.databoard.binding.mutable.MutableVariant;
import org.simantics.databoard.serialization.Serializer;
import org.simantics.databoard.util.binary.BinaryMemory;
public ArrayList<String> cs = new ArrayList<>();
// public ArrayList<String> ccs = new ArrayList<String>();
+ public long tailChangeSetId = 1;
+
public static HeadState load(Path directory) throws InvalidHeadStateException {
Path f = directory.resolve(HEAD_STATE);
return object;
}
} catch (IOException i) {
+ Throwable cause = i.getCause();
+ if(cause instanceof AdapterConstructionException) {
+ HeadState1 old = HeadState1.load(directory);
+ return old.migrate();
+ }
return new HeadState();
// throw new InvalidHeadStateException(i);
} catch (NoSuchAlgorithmException e) {
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.simantics.acorn.exception.InvalidHeadStateException;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.mutable.MutableVariant;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.util.binary.BinaryMemory;
+
+public class HeadState1 {
+
+ public static final String HEAD_STATE = "head.state";
+ public static final String SHA_1 = "SHA-1";
+
+ public int headChangeSetId = 0;
+ public long transactionId = 1;
+ public long reservedIds = 3;
+
+ public ArrayList<String> clusters = new ArrayList<>();
+ public ArrayList<String> files = new ArrayList<>();
+ public ArrayList<String> stream = new ArrayList<>();
+ public ArrayList<String> cs = new ArrayList<>();
+
+ public HeadState migrate() {
+ HeadState state = new HeadState();
+ state.headChangeSetId = headChangeSetId;
+ state.transactionId = transactionId;
+ state.reservedIds = reservedIds;
+ state.clusters = clusters;
+ state.files = files;
+ state.stream = stream;
+ state.cs = cs;
+ return state;
+ }
+
+ public static HeadState1 load(Path directory) throws InvalidHeadStateException {
+ Path f = directory.resolve(HEAD_STATE);
+
+ try {
+ byte[] bytes = Files.readAllBytes(f);
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ int digestLength = sha1.getDigestLength();
+
+ sha1.update(bytes, digestLength, bytes.length - digestLength);
+ byte[] newChecksum = sha1.digest();
+ if (!Arrays.equals(newChecksum, Arrays.copyOfRange(bytes, 0, digestLength))) {
+ throw new InvalidHeadStateException(
+ "Checksum " + Arrays.toString(newChecksum) + " does not match excpected "
+ + Arrays.toString(Arrays.copyOfRange(bytes, 0, digestLength)) + " for " + f.toAbsolutePath());
+ }
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes, digestLength, bytes.length - digestLength)) {
+ HeadState1 object = (HeadState1) org.simantics.databoard.Files.readFile(bais, Bindings.getBindingUnchecked(HeadState1.class));
+ return object;
+ }
+ } catch (IOException i) {
+ return new HeadState1();
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 Algorithm not found", e);
+ } catch (Throwable t) {
+ throw new InvalidHeadStateException(t);
+ }
+ }
+
+ public void save(Path directory) throws IOException {
+ Path f = directory.resolve(HEAD_STATE);
+ try {
+ BinaryMemory rf = new BinaryMemory(4096);
+ try {
+ MutableVariant v = new MutableVariant(Bindings.getBindingUnchecked(HeadState1.class), this);
+ Serializer s = Bindings.getSerializerUnchecked( Bindings.VARIANT );
+ s.serialize(rf, v);
+ } finally {
+ rf.close();
+ }
+
+ byte[] bytes = rf.toByteBuffer().array();
+
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ sha1.update(bytes);
+ byte[] checksum = sha1.digest();
+
+ try (OutputStream out = Files.newOutputStream(f)) {
+ out.write(checksum);
+ out.write(bytes);
+ }
+ FileIO.syncPath(f);
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 digest not found, should not happen", e);
+ }
+ }
+
+ public static void validateHeadStateIntegrity(Path headState) throws InvalidHeadStateException, IOException {
+ try {
+ byte[] bytes = Files.readAllBytes(headState);
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ int digestLength = sha1.getDigestLength();
+ sha1.update(bytes, digestLength, bytes.length - digestLength);
+ byte[] newChecksum = sha1.digest();
+ if (!Arrays.equals(newChecksum, Arrays.copyOfRange(bytes, 0, digestLength))) {
+ throw new InvalidHeadStateException(
+ "Checksum " + Arrays.toString(newChecksum) + " does not match excpected "
+ + Arrays.toString(Arrays.copyOfRange(bytes, 0, digestLength)) + " for " + headState.toAbsolutePath());
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 digest not found, should not happen", e);
+ }
+ }
+}
import org.simantics.db.DatabaseUserAgent;
import org.simantics.db.ServiceLocator;
import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.SDBException;
+import org.simantics.db.server.DatabaseStartException;
import org.simantics.db.server.ProCoreException;
+import org.simantics.db.server.internal.InternalException;
/**
* @author Tuukka Lehtonen
public class AcornDatabase implements Database {
private final Path folder;
+
+ private GraphClientImpl2 currentClient;
private DatabaseUserAgent userAgent;
@Override
public void purgeDatabase() throws ProCoreException {
- // TODO: implement
- throw new UnsupportedOperationException();
+ if(currentClient == null) throw new IllegalStateException("No current session.");
+ currentClient.purgeDatabase();
}
@Override
public long serverGetTailChangeSetId() throws ProCoreException {
- // "We have it all"
- // But after purging we don't so beware.
- // TODO: beware for purge
- return 1;
+ if(currentClient == null) throw new IllegalStateException("No current session.");
+ return currentClient.getTailChangeSetId();
}
@Override
public Session newSession(ServiceLocator locator) throws ProCoreException {
try {
- return new GraphClientImpl2(this, folder, locator);
+ if(currentClient != null) throw new DatabaseStartException(folder.toFile(), "A session is already running. Only one session is supported.");
+ currentClient = new GraphClientImpl2(this, folder, locator);
+ return currentClient;
} catch (IOException e) {
throw new ProCoreException(e);
}
}
}
+
+
+ public MapValue purge(MapKey id) {
+ return map.remove(id);
+ }
+
public MapValue get(MapKey key) throws AcornAccessVerificationException {
if(VERIFY) verifyAccess();
if(VERIFY) verifyAccess();
return readDirectory;
}
+
+ public void moveTo(Path path) {
+ readDirectory = path;
+ }
}
\ No newline at end of file
import java.util.TreeSet;
import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.NullProgressMonitor;
import org.eclipse.core.runtime.SubMonitor;
import org.simantics.databoard.Bindings;
import org.simantics.databoard.Datatypes;
return Layer0.getInstance(graph).String;
}
+ public static void emptyTrashBin() throws ServiceException {
+ emptyTrashBin(new NullProgressMonitor());
+ }
+
public static void emptyTrashBin(IProgressMonitor monitor) throws ServiceException {
emptyTrashBin(monitor, SimanticsInternal.getSession(), SimanticsInternal.getProject());
}
return;
mon.subTask("Purging Database");
mon.newChild(1000);
- XSupport xs = session.getService(XSupport.class);
- xs.purge();
+ purgeDatabase(monitor, session);
} catch (CancelTransactionException e) {
// Ignore.
} catch (DatabaseException e) {
throw new ServiceException(e);
}
}
-
+
+ public static void purgeDatabase() throws ServiceException {
+ purgeDatabase(new NullProgressMonitor());
+ }
+
+ public static void purgeDatabase(final IProgressMonitor monitor) throws ServiceException {
+ purgeDatabase(monitor, SimanticsInternal.getSession());
+ }
+
+ public static void purgeDatabase(final IProgressMonitor monitor, Session session) throws ServiceException {
+ try {
+ XSupport xs = session.getService(XSupport.class);
+ xs.purge();
+ } catch (DatabaseException e) {
+ throw new ServiceException(e);
+ }
+ }
+
public static Resource getSingleDomainOf(ReadGraph graph, Resource type, Resource target) throws DatabaseException {
Resource result = null;
for(Resource candidate : getDomainOf(graph, type).values()) {
sortByCluster :: [a] -> (a->Resource) -> <ReadGraph> [a]
makeSynchronous :: Boolean -> <ReadGraph> ()
listOntologies :: () -> <ReadGraph> [Resource]
+ emptyTrashBin :: () -> <Proc> ()
+ purgeDatabase :: () -> <Proc> ()
@private
@JavaName copyTo