import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.simantics.db.procore.cluster.ClusterTraits;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.FileUtils;
import org.simantics.utils.threads.logger.ITask;
import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClusterManager {
+
+ final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
private ArrayList<String> currentChanges = new ArrayList<String>();
public final Path dbFolder;
- public Path lastSessionDirectory;
- public Path workingDirectory;
+ private FileCache fileCache;
+ Path lastSessionDirectory;
+ Path workingDirectory;
public LRU<String, ClusterStreamChunk> streamLRU;
public LRU<Long, ChangeSetInfo> csLRU;
*
*/
- public ClusterManager(Path dbFolder) {
+ public ClusterManager(Path dbFolder, FileCache fileCache) {
this.dbFolder = dbFolder;
+ this.fileCache = fileCache;
}
public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
private IllegalAcornStateException cause;
+ public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException {
+
+ try {
+
+ // Schedule writing of all data to disk
+ refreshHeadState();
+ // Wait for files to be written
+ synchronizeWorkingDirectory();
+
+ String currentDir = workingDirectory.getFileName().toString();
+ Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline");
+
+ Files.createDirectories(baseline);
+
+ for(String clusterKey : state.clusters) {
+ String[] parts1 = clusterKey.split("#");
+ String[] parts = parts1[0].split("\\.");
+ String readDirName = parts1[1];
+ if(!readDirName.equals(currentDir)) {
+ String fileName = parts[0] + "." + parts[1] + ".cluster";
+ Path from = dbFolder.resolve(readDirName).resolve(fileName);
+ Path to = baseline.resolve(fileName);
+ LOGGER.info("purge copies " + from + " => " + to);
+ Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
+ long first = new BigInteger(parts[0], 16).longValue();
+ long second = new BigInteger(parts[1], 16).longValue();
+ ClusterUID uuid = ClusterUID.make(first, second);
+ ClusterInfo info = clusterLRU.getWithoutMutex(uuid);
+ info.moveTo(baseline);
+ }
+ }
+
+ for (String fileKey : state.files) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ String fileName = parts[0] + ".extFile";
+ Path from = dbFolder.resolve(readDirName).resolve(fileName);
+ Path to = baseline.resolve(fileName);
+ LOGGER.info("purge copies " + from + " => " + to);
+ Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
+ FileInfo info = fileLRU.getWithoutMutex(parts[0]);
+ info.moveTo(baseline);
+ }
+ }
+
+ for (String fileKey : state.stream) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ ClusterStreamChunk chunk = streamLRU.purge(parts[0]);
+ LOGGER.info("purge removes " + chunk);
+ }
+ }
+
+ // Change sets
+ for (String fileKey : state.cs) {
+ String[] parts = fileKey.split("#");
+ String readDirName = parts[1];
+ if(!readDirName.equals(currentDir)) {
+ Long revisionId = Long.parseLong(parts[0]);
+ ChangeSetInfo info = csLRU.purge(revisionId);
+ LOGGER.info("purge removes " + info);
+ }
+// Path readDir = dbFolder.resolve(parts[1]);
+// Long revisionId = Long.parseLong(parts[0]);
+// int offset = Integer.parseInt(parts[2]);
+// int length = Integer.parseInt(parts[3]);
+// ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+// csLRU.map(info);
+ }
+
+ state.tailChangeSetId = state.headChangeSetId;
+
+ makeSnapshot(locator, true);
+
+ Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f));
+
+ } catch (IllegalAcornStateException e) {
+ notSafeToMakeSnapshot(e);
+ throw e;
+ } catch (IOException e) {
+ IllegalAcornStateException e1 = new IllegalAcornStateException(e);
+ notSafeToMakeSnapshot(e1);
+ throw e1;
+ } catch (AcornAccessVerificationException e) {
+ IllegalAcornStateException e1 = new IllegalAcornStateException(e);
+ notSafeToMakeSnapshot(e1);
+ throw e1;
+ }
+
+ }
+
+ void tryPurgeDirectory(Path f) {
+
+
+ LOGGER.info("purge deletes " + f);
+
+ String currentDir = f.getFileName().toString();
+ if(currentDir.endsWith("db"))
+ return;
+
+ if(currentDir.endsWith("_baseline"))
+ currentDir = currentDir.replace("_baseline", "");
+
+ int ordinal = Integer.parseInt(currentDir);
+ if(ordinal < mainState.headDir - 1) {
+ LOGGER.info("purge deletes " + f);
+ FileUtils.deleteDir(f.toFile());
+ }
+
+ }
+
public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
try {
if (!safeToMakeSnapshot.get())
throw cause;
// Maximum autosave frequency is per 60s
if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
- // System.err.println("lastSnapshot too early");
+ // LOGGER.info("lastSnapshot too early");
return false;
}
// Cluster files are always there
// Nothing has been written => no need to do anything
long amountOfFiles = countFiles(workingDirectory);
- if(!fullSave && amountOfFiles < 3) {
- // System.err.println("amountOfFiles < 3");
+ if(!fullSave && amountOfFiles == 0) {
+ //LOGGER.info("makeSnapshot: " + amountOfFiles + " files, skipping snapshot");
return false;
}
- System.err.println("makeSnapshot");
+ LOGGER.info("makeSnapshot: start with " + amountOfFiles + " files");
// Schedule writing of all data to disk
refreshHeadState();
if (!safeToMakeSnapshot.get())
throw cause;
- persistHeadState();
-
- if (fullSave)
- mainState.save(dbFolder);
-
ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
cssi.save();
-
- amountOfFiles = countFiles(workingDirectory);
-
- System.err.println(" -finished: amount of files is " + amountOfFiles);
-
- workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
- if (!Files.exists(workingDirectory)) {
- Files.createDirectories(workingDirectory);
+
+ persistHeadState();
+
+ if (LOGGER.isInfoEnabled()) {
+ amountOfFiles = countFiles(workingDirectory);
+ LOGGER.info(" -finished: amount of files is {}", amountOfFiles);
}
+
+ workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+ Files.createDirectories(workingDirectory);
cssi.updateWriteDirectory(workingDirectory);
csLRU.persist(state.cs);
}
- private void persistHeadState() throws IOException {
+ private void synchronizeWorkingDirectory() throws IOException {
// Sync current working directory
Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
+ }
+
+ private void persistHeadState() throws IOException {
+ synchronizeWorkingDirectory();
state.save(workingDirectory);
mainState.headDir++;
}
// e.printStackTrace();
// }
- // System.err.println("-- load statistics --");
+ // LOGGER.info("-- load statistics --");
// for(Pair<ClusterUID, Integer> entry :
// CollectionUtils.valueSortedEntries(histogram)) {
- // System.err.println(" " + entry.second + " " + entry.first);
+ // LOGGER.info(" " + entry.second + " " + entry.first);
// }
// }
public void load() throws IOException {
// Main state
- mainState = MainState.load(dbFolder, t -> rollback.set(true));
+ mainState = MainState.load(dbFolder, () -> rollback.set(true));
lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
// Head State
- try {
- state = HeadState.load(lastSessionDirectory);
- } catch (InvalidHeadStateException e) {
- // For backwards compatibility only!
- Throwable cause = e.getCause();
- if (cause instanceof Throwable) {
- try {
- org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
-
- HeadState newState = new HeadState();
- newState.clusters = oldState.clusters;
- newState.cs = oldState.cs;
- newState.files = oldState.files;
- newState.stream = oldState.stream;
- newState.headChangeSetId = oldState.headChangeSetId;
- newState.reservedIds = oldState.reservedIds;
- newState.transactionId = oldState.transactionId;
- state = newState;
- } catch (InvalidHeadStateException e1) {
- throw new IOException("Could not load HeadState due to corruption", e1);
- }
- } else {
- // This should never happen as MainState.load() checks the integrity
- // of head.state files and rolls back in cases of corruption until a
- // consistent state is found (could be case 0 - initial db state)
- // IF this does happen something is completely wrong
- throw new IOException("Could not load HeadState due to corruption", e);
- }
- }
+ if (mainState.isInitial()) {
+ state = new HeadState();
+ } else {
+ try {
+ state = HeadState.load(lastSessionDirectory);
+ } catch (InvalidHeadStateException e) {
+ // For backwards compatibility only!
+ Throwable cause = e.getCause();
+ if (cause instanceof Throwable) {
+ try {
+ org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
+
+ HeadState newState = new HeadState();
+ newState.clusters = oldState.clusters;
+ newState.cs = oldState.cs;
+ newState.files = oldState.files;
+ newState.stream = oldState.stream;
+ newState.headChangeSetId = oldState.headChangeSetId;
+ newState.reservedIds = oldState.reservedIds;
+ newState.transactionId = oldState.transactionId;
+ state = newState;
+ } catch (InvalidHeadStateException e1) {
+ throw new IOException("Could not load HeadState due to corruption", e1);
+ }
+ } else {
+ // This should never happen as MainState.load() checks the integrity
+ // of head.state files and rolls back in cases of corruption until a
+ // consistent state is found (could be case 0 - initial db state)
+ // IF this does happen something is completely wrong
+ throw new IOException("Could not load HeadState due to corruption", e);
+ }
+ }
+ }
try {
workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
Files.createDirectories(workingDirectory);
}
// Files
for (String fileKey : state.files) {
- // System.err.println("loadFile: " + fileKey);
+ // LOGGER.info("loadFile: " + fileKey);
String[] parts = fileKey.split("#");
Path readDir = dbFolder.resolve(parts[1]);
int offset = Integer.parseInt(parts[2]);
int length = Integer.parseInt(parts[3]);
- FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
+ FileInfo info = new FileInfo(fileLRU, fileCache, readDir, parts[0], offset, length);
fileLRU.map(info);
}
// Update chunks
for (String fileKey : state.stream) {
- // System.err.println("loadStream: " + fileKey);
+ // LOGGER.info("loadStream: " + fileKey);
String[] parts = fileKey.split("#");
Path readDir = dbFolder.resolve(parts[1]);
int offset = Integer.parseInt(parts[2]);
Long revisionId = Long.parseLong(parts[0]);
int offset = Integer.parseInt(parts[2]);
int length = Integer.parseInt(parts[3]);
- ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+ ChangeSetInfo info = new ChangeSetInfo(csLRU, fileCache, readDir, revisionId, offset, length);
csLRU.map(info);
}
try {
ArrayList<String> csids = new ArrayList<String>(currentChanges);
currentChanges = new ArrayList<String>();
- new ChangeSetInfo(csLRU, changeSetId, data, csids);
+ new ChangeSetInfo(csLRU, fileCache, changeSetId, data, csids);
} catch (Throwable t) {
throw new IllegalAcornStateException(t);
} finally {
try {
info = fileLRU.get(key);
if (info == null) {
- info = new FileInfo(fileLRU, key, (int) (offset + size));
+ info = new FileInfo(fileLRU, fileCache, key, (int) (offset + size));
}
} catch (Throwable t) {
throw new IllegalAcornStateException(t);
this.cause = t;
}
+ public long getTailChangeSetId() {
+ return state.tailChangeSetId;
+ }
+
+ public FileCache getFileCache() {
+ return fileCache;
+ }
+
}