X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FClusterManager.java;h=cb9ac578e1aade416162beffde3ad27b2051d4f9;hp=b2a30953cca9e0bff26e72dc69a788139e52a817;hb=e5c006a3e29dcb1f29ae5bcc21ac28573bd37648;hpb=ca40974f87c9db00eb77aaf1acc1e9937b37261b diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java index b2a30953c..cb9ac578e 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java @@ -5,6 +5,7 @@ import java.math.BigInteger; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -33,16 +34,22 @@ import org.simantics.db.impl.ClusterSupport; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; +import org.simantics.utils.FileUtils; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClusterManager { + + final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); private ArrayList currentChanges = new ArrayList(); public final Path dbFolder; - public Path lastSessionDirectory; - public Path workingDirectory; + private FileCache fileCache; + Path lastSessionDirectory; + Path workingDirectory; public LRU streamLRU; public LRU csLRU; @@ -61,8 +68,9 @@ public class ClusterManager { * */ - public ClusterManager(Path dbFolder) { + public ClusterManager(Path dbFolder, FileCache fileCache) { this.dbFolder = dbFolder; + this.fileCache = fileCache; } public ArrayList getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException { @@ -130,25 +138,138 @@ public class ClusterManager { private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true); private IllegalAcornStateException cause; + public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException { + + try { + + // Schedule writing of all data to disk + refreshHeadState(); + // Wait for files to be written + synchronizeWorkingDirectory(); + + String currentDir = workingDirectory.getFileName().toString(); + Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline"); + + Files.createDirectories(baseline); + + for(String clusterKey : state.clusters) { + String[] parts1 = clusterKey.split("#"); + String[] parts = parts1[0].split("\\."); + String readDirName = parts1[1]; + if(!readDirName.equals(currentDir)) { + String fileName = parts[0] + "." + parts[1] + ".cluster"; + Path from = dbFolder.resolve(readDirName).resolve(fileName); + Path to = baseline.resolve(fileName); + LOGGER.info("purge copies " + from + " => " + to); + Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES); + long first = new BigInteger(parts[0], 16).longValue(); + long second = new BigInteger(parts[1], 16).longValue(); + ClusterUID uuid = ClusterUID.make(first, second); + ClusterInfo info = clusterLRU.getWithoutMutex(uuid); + info.moveTo(baseline); + } + } + + for (String fileKey : state.files) { + String[] parts = fileKey.split("#"); + String readDirName = parts[1]; + if(!readDirName.equals(currentDir)) { + String fileName = parts[0] + ".extFile"; + Path from = dbFolder.resolve(readDirName).resolve(fileName); + Path to = baseline.resolve(fileName); + LOGGER.info("purge copies " + from + " => " + to); + Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES); + FileInfo info = fileLRU.getWithoutMutex(parts[0]); + info.moveTo(baseline); + } + } + + for (String fileKey : state.stream) { + String[] parts = fileKey.split("#"); + String readDirName = parts[1]; + if(!readDirName.equals(currentDir)) { + ClusterStreamChunk chunk = streamLRU.purge(parts[0]); + LOGGER.info("purge removes " + chunk); + } + } + + // Change sets + for (String fileKey : state.cs) { + String[] parts = fileKey.split("#"); + String readDirName = parts[1]; + if(!readDirName.equals(currentDir)) { + Long revisionId = Long.parseLong(parts[0]); + ChangeSetInfo info = csLRU.purge(revisionId); + LOGGER.info("purge removes " + info); + } +// Path readDir = dbFolder.resolve(parts[1]); +// Long revisionId = Long.parseLong(parts[0]); +// int offset = Integer.parseInt(parts[2]); +// int length = Integer.parseInt(parts[3]); +// ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length); +// csLRU.map(info); + } + + state.tailChangeSetId = state.headChangeSetId; + + makeSnapshot(locator, true); + + Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f)); + + } catch (IllegalAcornStateException e) { + notSafeToMakeSnapshot(e); + throw e; + } catch (IOException e) { + IllegalAcornStateException e1 = new IllegalAcornStateException(e); + notSafeToMakeSnapshot(e1); + throw e1; + } catch (AcornAccessVerificationException e) { + IllegalAcornStateException e1 = new IllegalAcornStateException(e); + notSafeToMakeSnapshot(e1); + throw e1; + } + + } + + void tryPurgeDirectory(Path f) { + + + LOGGER.info("purge deletes " + f); + + String currentDir = f.getFileName().toString(); + if(currentDir.endsWith("db")) + return; + + if(currentDir.endsWith("_baseline")) + currentDir = currentDir.replace("_baseline", ""); + + int ordinal = Integer.parseInt(currentDir); + if(ordinal < mainState.headDir - 1) { + LOGGER.info("purge deletes " + f); + FileUtils.deleteDir(f.toFile()); + } + + } + public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException { try { if (!safeToMakeSnapshot.get()) throw cause; // Maximum autosave frequency is per 60s if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) { - // System.err.println("lastSnapshot too early"); + // LOGGER.info("lastSnapshot too early"); return false; } // Cluster files are always there // Nothing has been written => no need to do anything long amountOfFiles = countFiles(workingDirectory); - if(!fullSave && amountOfFiles < 3) { - // System.err.println("amountOfFiles < 3"); + if(!fullSave && amountOfFiles == 0) { + //LOGGER.info("makeSnapshot: " + amountOfFiles + " files, skipping snapshot"); return false; } - System.err.println("makeSnapshot"); + LOGGER.info("makeSnapshot: start with " + amountOfFiles + " files"); // Schedule writing of all data to disk refreshHeadState(); @@ -163,22 +284,18 @@ public class ClusterManager { if (!safeToMakeSnapshot.get()) throw cause; - persistHeadState(); - - if (fullSave) - mainState.save(dbFolder); - ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.save(); - - amountOfFiles = countFiles(workingDirectory); - - System.err.println(" -finished: amount of files is " + amountOfFiles); - - workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir)); - if (!Files.exists(workingDirectory)) { - Files.createDirectories(workingDirectory); + + persistHeadState(); + + if (LOGGER.isInfoEnabled()) { + amountOfFiles = countFiles(workingDirectory); + LOGGER.info(" -finished: amount of files is {}", amountOfFiles); } + + workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir)); + Files.createDirectories(workingDirectory); cssi.updateWriteDirectory(workingDirectory); @@ -217,9 +334,13 @@ public class ClusterManager { csLRU.persist(state.cs); } - private void persistHeadState() throws IOException { + private void synchronizeWorkingDirectory() throws IOException { // Sync current working directory Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath); + } + + private void persistHeadState() throws IOException { + synchronizeWorkingDirectory(); state.save(workingDirectory); mainState.headDir++; } @@ -249,10 +370,10 @@ public class ClusterManager { // e.printStackTrace(); // } - // System.err.println("-- load statistics --"); + // LOGGER.info("-- load statistics --"); // for(Pair entry : // CollectionUtils.valueSortedEntries(histogram)) { - // System.err.println(" " + entry.second + " " + entry.first); + // LOGGER.info(" " + entry.second + " " + entry.first); // } // } @@ -280,40 +401,44 @@ public class ClusterManager { public void load() throws IOException { // Main state - mainState = MainState.load(dbFolder, t -> rollback.set(true)); + mainState = MainState.load(dbFolder, () -> rollback.set(true)); lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1)); // Head State - try { - state = HeadState.load(lastSessionDirectory); - } catch (InvalidHeadStateException e) { - // For backwards compatibility only! - Throwable cause = e.getCause(); - if (cause instanceof Throwable) { - try { - org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory); - - HeadState newState = new HeadState(); - newState.clusters = oldState.clusters; - newState.cs = oldState.cs; - newState.files = oldState.files; - newState.stream = oldState.stream; - newState.headChangeSetId = oldState.headChangeSetId; - newState.reservedIds = oldState.reservedIds; - newState.transactionId = oldState.transactionId; - state = newState; - } catch (InvalidHeadStateException e1) { - throw new IOException("Could not load HeadState due to corruption", e1); - } - } else { - // This should never happen as MainState.load() checks the integrity - // of head.state files and rolls back in cases of corruption until a - // consistent state is found (could be case 0 - initial db state) - // IF this does happen something is completely wrong - throw new IOException("Could not load HeadState due to corruption", e); - } - } + if (mainState.isInitial()) { + state = new HeadState(); + } else { + try { + state = HeadState.load(lastSessionDirectory); + } catch (InvalidHeadStateException e) { + // For backwards compatibility only! + Throwable cause = e.getCause(); + if (cause instanceof Throwable) { + try { + org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory); + + HeadState newState = new HeadState(); + newState.clusters = oldState.clusters; + newState.cs = oldState.cs; + newState.files = oldState.files; + newState.stream = oldState.stream; + newState.headChangeSetId = oldState.headChangeSetId; + newState.reservedIds = oldState.reservedIds; + newState.transactionId = oldState.transactionId; + state = newState; + } catch (InvalidHeadStateException e1) { + throw new IOException("Could not load HeadState due to corruption", e1); + } + } else { + // This should never happen as MainState.load() checks the integrity + // of head.state files and rolls back in cases of corruption until a + // consistent state is found (could be case 0 - initial db state) + // IF this does happen something is completely wrong + throw new IOException("Could not load HeadState due to corruption", e); + } + } + } try { workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir)); Files.createDirectories(workingDirectory); @@ -339,17 +464,17 @@ public class ClusterManager { } // Files for (String fileKey : state.files) { - // System.err.println("loadFile: " + fileKey); + // LOGGER.info("loadFile: " + fileKey); String[] parts = fileKey.split("#"); Path readDir = dbFolder.resolve(parts[1]); int offset = Integer.parseInt(parts[2]); int length = Integer.parseInt(parts[3]); - FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length); + FileInfo info = new FileInfo(fileLRU, fileCache, readDir, parts[0], offset, length); fileLRU.map(info); } // Update chunks for (String fileKey : state.stream) { - // System.err.println("loadStream: " + fileKey); + // LOGGER.info("loadStream: " + fileKey); String[] parts = fileKey.split("#"); Path readDir = dbFolder.resolve(parts[1]); int offset = Integer.parseInt(parts[2]); @@ -365,7 +490,7 @@ public class ClusterManager { Long revisionId = Long.parseLong(parts[0]); int offset = Integer.parseInt(parts[2]); int length = Integer.parseInt(parts[3]); - ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length); + ChangeSetInfo info = new ChangeSetInfo(csLRU, fileCache, readDir, revisionId, offset, length); csLRU.map(info); } @@ -479,7 +604,7 @@ public class ClusterManager { try { ArrayList csids = new ArrayList(currentChanges); currentChanges = new ArrayList(); - new ChangeSetInfo(csLRU, changeSetId, data, csids); + new ChangeSetInfo(csLRU, fileCache, changeSetId, data, csids); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { @@ -546,7 +671,7 @@ public class ClusterManager { try { info = fileLRU.get(key); if (info == null) { - info = new FileInfo(fileLRU, key, (int) (offset + size)); + info = new FileInfo(fileLRU, fileCache, key, (int) (offset + size)); } } catch (Throwable t) { throw new IllegalAcornStateException(t); @@ -579,4 +704,12 @@ public class ClusterManager { this.cause = t; } + public long getTailChangeSetId() { + return state.tailChangeSetId; + } + + public FileCache getFileCache() { + return fileCache; + } + }