package org.simantics.acorn; import java.io.IOException; import java.math.BigInteger; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.simantics.acorn.cluster.ClusterImpl; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.exception.InvalidHeadStateException; import org.simantics.acorn.internal.ClusterSupport2; import org.simantics.acorn.lru.ChangeSetInfo; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterLRU; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.FileInfo; import org.simantics.acorn.lru.LRU; import org.simantics.db.ClusterCreator; import org.simantics.db.Database.Session.ClusterIds; import org.simantics.db.Database.Session.ResourceSegment; import org.simantics.db.ServiceLocator; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterSupport; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.utils.FileUtils; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClusterManager { final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); private ArrayList currentChanges = new ArrayList(); public final Path dbFolder; public Path lastSessionDirectory; public Path workingDirectory; public LRU streamLRU; public LRU csLRU; public ClusterLRU clusterLRU; public LRU fileLRU; public MainState mainState; public HeadState state; private long lastSnapshot = System.nanoTime(); final public ClusterSupport2 support = new ClusterSupport2(this); /* * Public interface * */ public ClusterManager(Path dbFolder) { this.dbFolder = dbFolder; } public ArrayList getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException { ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId); info.acquireMutex(); try { info.makeResident(); return info.getCCSIds(); } finally { info.releaseMutex(); } } public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { return clusterLRU.getClusterByClusterKey(clusterKey); } public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { return clusterLRU.getClusterByClusterUIDOrMake(clusterUID); } public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID); } public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException { return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID); } public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException { return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID); } public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException { return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2); } public T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { return clusterLRU.getClusterProxyByResourceKey(resourceKey); } public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException { return clusterLRU.getClusterUIDByResourceKey(resourceKey); } public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey); } /* * Private implementation * */ private static long countFiles(Path directory) throws IOException { try (DirectoryStream ds = Files.newDirectoryStream(directory)) { int count = 0; for (@SuppressWarnings("unused") Path p : ds) ++count; return count; } } // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing) private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true); private IllegalAcornStateException cause; public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException { try { // Schedule writing of all data to disk refreshHeadState(); // Wait for files to be written synchronizeWorkingDirectory(); String currentDir = workingDirectory.getFileName().toString(); Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline"); Files.createDirectories(baseline); for(String clusterKey : state.clusters) { String[] parts1 = clusterKey.split("#"); String[] parts = parts1[0].split("\\."); String readDirName = parts1[1]; if(!readDirName.equals(currentDir)) { String fileName = parts[0] + "." + parts[1] + ".cluster"; Path from = dbFolder.resolve(readDirName).resolve(fileName); Path to = baseline.resolve(fileName); System.err.println("purge copies " + from + " => " + to); Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES); long first = new BigInteger(parts[0], 16).longValue(); long second = new BigInteger(parts[1], 16).longValue(); ClusterUID uuid = ClusterUID.make(first, second); ClusterInfo info = clusterLRU.getWithoutMutex(uuid); info.moveTo(baseline); } } for (String fileKey : state.files) { String[] parts = fileKey.split("#"); String readDirName = parts[1]; if(!readDirName.equals(currentDir)) { String fileName = parts[0] + ".extFile"; Path from = dbFolder.resolve(readDirName).resolve(fileName); Path to = baseline.resolve(fileName); System.err.println("purge copies " + from + " => " + to); Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES); FileInfo info = fileLRU.getWithoutMutex(parts[0]); info.moveTo(baseline); } } for (String fileKey : state.stream) { String[] parts = fileKey.split("#"); String readDirName = parts[1]; if(!readDirName.equals(currentDir)) { ClusterStreamChunk chunk = streamLRU.purge(parts[0]); System.err.println("purge removes " + chunk); } } // Change sets for (String fileKey : state.cs) { String[] parts = fileKey.split("#"); String readDirName = parts[1]; if(!readDirName.equals(currentDir)) { Long revisionId = Long.parseLong(parts[0]); ChangeSetInfo info = csLRU.purge(revisionId); System.err.println("purge removes " + info); } // Path readDir = dbFolder.resolve(parts[1]); // Long revisionId = Long.parseLong(parts[0]); // int offset = Integer.parseInt(parts[2]); // int length = Integer.parseInt(parts[3]); // ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length); // csLRU.map(info); } state.tailChangeSetId = state.headChangeSetId; makeSnapshot(locator, true); Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f)); } catch (IllegalAcornStateException e) { notSafeToMakeSnapshot(e); throw e; } catch (IOException e) { IllegalAcornStateException e1 = new IllegalAcornStateException(e); notSafeToMakeSnapshot(e1); throw e1; } catch (AcornAccessVerificationException e) { IllegalAcornStateException e1 = new IllegalAcornStateException(e); notSafeToMakeSnapshot(e1); throw e1; } } void tryPurgeDirectory(Path f) { System.err.println("purge deletes " + f); String currentDir = f.getFileName().toString(); if(currentDir.endsWith("db")) return; if(currentDir.endsWith("_baseline")) currentDir = currentDir.replace("_baseline", ""); int ordinal = Integer.parseInt(currentDir); if(ordinal < mainState.headDir - 1) { System.err.println("purge deletes " + f); FileUtils.deleteDir(f.toFile()); } } public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException { try { if (!safeToMakeSnapshot.get()) throw cause; // Maximum autosave frequency is per 60s if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) { // System.err.println("lastSnapshot too early"); return false; } // Cluster files are always there // Nothing has been written => no need to do anything long amountOfFiles = countFiles(workingDirectory); if(!fullSave && amountOfFiles == 0) { // System.err.println("amountOfFiles < 3"); return false; } LOGGER.info("makeSnapshot"); // Schedule writing of all data to disk refreshHeadState(); // Wait for all files to be written clusterLRU.shutdown(); fileLRU.shutdown(); streamLRU.shutdown(); csLRU.shutdown(); // Lets check if it is still safe to make a snapshot if (!safeToMakeSnapshot.get()) throw cause; ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.save(); persistHeadState(); if (fullSave) mainState.save(dbFolder); amountOfFiles = countFiles(workingDirectory); LOGGER.info(" -finished: amount of files is {}", amountOfFiles); workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir)); if (!Files.exists(workingDirectory)) { Files.createDirectories(workingDirectory); } cssi.updateWriteDirectory(workingDirectory); clusterLRU.setWriteDir(workingDirectory); fileLRU.setWriteDir(workingDirectory); streamLRU.setWriteDir(workingDirectory); csLRU.setWriteDir(workingDirectory); clusterLRU.resume(); fileLRU.resume(); streamLRU.resume(); csLRU.resume(); lastSnapshot = System.nanoTime(); return true; } catch (IllegalAcornStateException e) { notSafeToMakeSnapshot(e); throw e; } catch (IOException e) { IllegalAcornStateException e1 = new IllegalAcornStateException(e); notSafeToMakeSnapshot(e1); throw e1; } } private void refreshHeadState() throws IOException, IllegalAcornStateException { state.clusters.clear(); state.files.clear(); state.stream.clear(); state.cs.clear(); clusterLRU.persist(state.clusters); fileLRU.persist(state.files); streamLRU.persist(state.stream); csLRU.persist(state.cs); } private void synchronizeWorkingDirectory() throws IOException { // Sync current working directory Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath); } private void persistHeadState() throws IOException { synchronizeWorkingDirectory(); state.save(workingDirectory); mainState.headDir++; } // public void save() throws IOException { // // refreshHeadState(); // // clusterLRU.shutdown(); // fileLRU.shutdown(); // streamLRU.shutdown(); // csLRU.shutdown(); // // persistHeadState(); // // mainState.save(getBaseDirectory()); // try { // ThreadLogVisualizer visualizer = new ThreadLogVisualizer(); // visualizer.read(new DataInputStream(new FileInputStream( // ThreadLogger.LOG_FILE))); // visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE // + ".svg")); // } catch (FileNotFoundException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } // System.err.println("-- load statistics --"); // for(Pair entry : // CollectionUtils.valueSortedEntries(histogram)) { // System.err.println(" " + entry.second + " " + entry.first); // } // } private void acquireAll() throws IllegalAcornStateException { clusterLRU.acquireMutex(); fileLRU.acquireMutex(); streamLRU.acquireMutex(); csLRU.acquireMutex(); } private void releaseAll() { csLRU.releaseMutex(); streamLRU.releaseMutex(); fileLRU.releaseMutex(); clusterLRU.releaseMutex(); } private AtomicBoolean rollback = new AtomicBoolean(false); boolean rolledback() { return rollback.get(); } public void load() throws IOException { // Main state mainState = MainState.load(dbFolder, () -> rollback.set(true)); lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1)); // Head State try { state = HeadState.load(lastSessionDirectory); } catch (InvalidHeadStateException e) { // For backwards compatibility only! Throwable cause = e.getCause(); if (cause instanceof Throwable) { try { org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory); HeadState newState = new HeadState(); newState.clusters = oldState.clusters; newState.cs = oldState.cs; newState.files = oldState.files; newState.stream = oldState.stream; newState.headChangeSetId = oldState.headChangeSetId; newState.reservedIds = oldState.reservedIds; newState.transactionId = oldState.transactionId; state = newState; } catch (InvalidHeadStateException e1) { throw new IOException("Could not load HeadState due to corruption", e1); } } else { // This should never happen as MainState.load() checks the integrity // of head.state files and rolls back in cases of corruption until a // consistent state is found (could be case 0 - initial db state) // IF this does happen something is completely wrong throw new IOException("Could not load HeadState due to corruption", e); } } try { workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir)); Files.createDirectories(workingDirectory); csLRU = new LRU(this, "Change Set", workingDirectory); streamLRU = new LRU(this, "Cluster Stream", workingDirectory); clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory); fileLRU = new LRU(this, "External Value", workingDirectory); acquireAll(); // Clusters for (String clusterKey : state.clusters) { String[] parts1 = clusterKey.split("#"); String[] parts = parts1[0].split("\\."); long first = new BigInteger(parts[0], 16).longValue(); long second = new BigInteger(parts[1], 16).longValue(); ClusterUID uuid = ClusterUID.make(first, second); Path readDir = dbFolder.resolve(parts1[1]); int offset = Integer.parseInt(parts1[2]); int length = Integer.parseInt(parts1[3]); clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length)); } // Files for (String fileKey : state.files) { // System.err.println("loadFile: " + fileKey); String[] parts = fileKey.split("#"); Path readDir = dbFolder.resolve(parts[1]); int offset = Integer.parseInt(parts[2]); int length = Integer.parseInt(parts[3]); FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length); fileLRU.map(info); } // Update chunks for (String fileKey : state.stream) { // System.err.println("loadStream: " + fileKey); String[] parts = fileKey.split("#"); Path readDir = dbFolder.resolve(parts[1]); int offset = Integer.parseInt(parts[2]); int length = Integer.parseInt(parts[3]); ClusterStreamChunk info = new ClusterStreamChunk(this, streamLRU, readDir, parts[0], offset, length); streamLRU.map(info); } // Change sets for (String fileKey : state.cs) { String[] parts = fileKey.split("#"); Path readDir = dbFolder.resolve(parts[1]); Long revisionId = Long.parseLong(parts[0]); int offset = Integer.parseInt(parts[2]); int length = Integer.parseInt(parts[3]); ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length); csLRU.map(info); } releaseAll(); } catch (IllegalAcornStateException | AcornAccessVerificationException e) { // ROLLBACK ONE DIR UNTIL WE ARE FINE! throw new IOException(e); } } public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException { clusterLRU.ensureUpdates(uid); ClusterInfo info = clusterLRU.getWithoutMutex(uid); return info.clone(uid, creator); } //private int loadCounter = 0; public static void startLog(String msg) { tasks.put(msg, ThreadLogger.getInstance().begin(msg)); } public static void endLog(String msg) { ITask task = tasks.get(msg); if (task != null) task.finish(); } static Map tasks = new HashMap(); public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException { ClusterInfo info = clusterLRU.getWithoutMutex(uid); info.acquireMutex(); try { info.update(clu); } finally { info.releaseMutex(); } } public long getClusterIdOrCreate(ClusterUID clusterUID) { return 1; } public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException { return clusterLRU.getResourceKey(uid, index); } public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException { return clusterLRU.getResourceKeyWithoutMutex(uid, index); } public ClusterIds getClusterIds() throws IllegalAcornStateException { clusterLRU.acquireMutex(); try { Collection infos = clusterLRU.values(); final int status = infos.size(); final long[] firsts = new long[status]; final long[] seconds = new long[status]; int index = 0; for (ClusterInfo info : infos) { firsts[index] = 0; seconds[index] = info.getKey().second; index++; } return new ClusterIds() { @Override public int getStatus() { return status; } @Override public long[] getFirst() { return firsts; } @Override public long[] getSecond() { return seconds; } }; } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { clusterLRU.releaseMutex(); } } public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException { csLRU.acquireMutex(); try { currentChanges.add(ccs); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { csLRU.releaseMutex(); } } public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException { csLRU.acquireMutex(); try { ArrayList csids = new ArrayList(currentChanges); currentChanges = new ArrayList(); new ChangeSetInfo(csLRU, changeSetId, data, csids); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { csLRU.releaseMutex(); } } public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException { ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId); if (info == null) return null; info.acquireMutex(); try { return info.getMetadataBytes(); } catch (IllegalAcornStateException | AcornAccessVerificationException e) { throw e; } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { info.releaseMutex(); } } public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException { ClusterUID uid = ClusterUID.make(clusterUID, 0); String key = uid.toString() + "_" + resourceIndex; FileInfo info = fileLRU.getWithoutMutex(key); if(info == null) return null; info.acquireMutex(); try { return info.getResourceFile(); } catch (IllegalAcornStateException | AcornAccessVerificationException e) { throw e; } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { info.releaseMutex(); } } public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException { ClusterUID uid = ClusterUID.make(clusterUID, 0); String key = uid.toString() + "_" + resourceIndex; FileInfo info = fileLRU.getWithoutMutex(key); if(info == null) return null; info.acquireMutex(); try { return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { info.releaseMutex(); } } public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException { try { String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey); FileInfo info = null; fileLRU.acquireMutex(); try { info = fileLRU.get(key); if (info == null) { info = new FileInfo(fileLRU, key, (int) (offset + size)); } } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { fileLRU.releaseMutex(); } info.acquireMutex(); try { info.updateData(bytes, offset, pos, size); } catch (Throwable t) { throw new IllegalAcornStateException(t); } finally { info.releaseMutex(); } } catch (DatabaseException e) { e.printStackTrace(); } } public void shutdown() { clusterLRU.shutdown(); fileLRU.shutdown(); streamLRU.shutdown(); csLRU.shutdown(); } public void notSafeToMakeSnapshot(IllegalAcornStateException t) { this.safeToMakeSnapshot.compareAndSet(true, false); this.cause = t; } public long getTailChangeSetId() { return state.tailChangeSetId; } }