--- /dev/null
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.simantics.acorn.cluster.ClusterImpl;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.acorn.lru.ChangeSetInfo;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterLRU;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.FileInfo;
+import org.simantics.acorn.lru.LRU;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.Database.Session.ClusterIds;
+import org.simantics.db.Database.Session.ResourceSegment;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+
+public class ClusterManager {
+
+ private ArrayList<String> currentChanges = new ArrayList<String>();
+
+ public final Path dbFolder;
+ public Path lastSessionDirectory;
+ public Path workingDirectory;
+
+ public LRU<String, ClusterStreamChunk> streamLRU;
+ public LRU<Long, ChangeSetInfo> csLRU;
+ public ClusterLRU clusterLRU;
+ public LRU<String, FileInfo> fileLRU;
+
+ public MainState mainState;
+ public HeadState state;
+
+ private long lastSnapshot = System.nanoTime();
+
+ final public ClusterSupport2 support = new ClusterSupport2(this);
+
+ /*
+ * Public interface
+ *
+ */
+
+ public ClusterManager(Path dbFolder) {
+ this.dbFolder = dbFolder;
+ }
+
+ public ArrayList<String> getChanges(long changeSetId) {
+ ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+ info.acquireMutex();
+ try {
+ info.makeResident();
+ return info.getCSSIds();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException {
+ return clusterLRU.getClusterByClusterKey(clusterKey);
+ }
+
+ public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException {
+ return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
+ }
+
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException {
+ return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ }
+
+ public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) {
+ return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
+ }
+
+ public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) {
+ return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
+ }
+
+ public int getClusterKeyByUID(long id1, long id2) throws DatabaseException {
+ return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
+ }
+
+ public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException {
+ return clusterLRU.getClusterProxyByResourceKey(resourceKey);
+ }
+
+ public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
+ return clusterLRU.getClusterUIDByResourceKey(resourceKey);
+ }
+
+ public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException {
+ return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
+ }
+
+ /*
+ * Private implementation
+ *
+ */
+
+ private static long countFiles(Path directory) throws IOException {
+ try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
+ int count = 0;
+ for (@SuppressWarnings("unused") Path p : ds)
+ ++count;
+ return count;
+ }
+ }
+
+ public synchronized boolean makeSnapshot(ServiceLocator locator, boolean force) throws IOException {
+
+ // Maximum autosave frequency is per 60s
+ if(!force && System.nanoTime() - lastSnapshot < 10*1000000000L) {
+// System.err.println("lastSnapshot too early");
+ return false;
+ }
+
+ // Cluster files are always there
+ // Nothing has been written => no need to do anything
+ long amountOfFiles = countFiles(workingDirectory);
+ if(!force && amountOfFiles < 3) {
+// System.err.println("amountOfFiles < 3");
+ return false;
+ }
+
+ System.err.println("makeSnapshot");
+
+ // Schedule writing of all data to disk
+ refreshHeadState();
+
+ // Wait for all files to be written
+ clusterLRU.shutdown();
+ fileLRU.shutdown();
+ streamLRU.shutdown();
+ csLRU.shutdown();
+
+ persistHeadState();
+
+ mainState.save(dbFolder);
+
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ cssi.save();
+
+ amountOfFiles = countFiles(workingDirectory);
+
+ System.err.println(" -finished: amount of files is " + amountOfFiles);
+
+ workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+ if (!Files.exists(workingDirectory)) {
+ Files.createDirectories(workingDirectory);
+ }
+
+ cssi.updateReadAndWriteDirectories(lastSessionDirectory, workingDirectory);
+
+ clusterLRU.setWriteDir(workingDirectory);
+ fileLRU.setWriteDir(workingDirectory);
+ streamLRU.setWriteDir(workingDirectory);
+ csLRU.setWriteDir(workingDirectory);
+
+ clusterLRU.resume();
+ fileLRU.resume();
+ streamLRU.resume();
+ csLRU.resume();
+
+ lastSnapshot = System.nanoTime();
+
+ return true;
+
+ }
+
+ public void refreshHeadState() throws IOException {
+
+ state.clusters.clear();
+ state.files.clear();
+ state.stream.clear();
+ state.cs.clear();
+
+ clusterLRU.persist(state.clusters);
+ fileLRU.persist(state.files);
+ streamLRU.persist(state.stream);
+ csLRU.persist(state.cs);
+
+ }
+
+ public void persistHeadState() throws IOException {
+
+ // Sync current working directory
+ Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
+ state.save(workingDirectory);
+ mainState.headDir++;
+ }
+
+
+// public void save() throws IOException {
+//
+// refreshHeadState();
+//
+// clusterLRU.shutdown();
+// fileLRU.shutdown();
+// streamLRU.shutdown();
+// csLRU.shutdown();
+//
+// persistHeadState();
+//
+// mainState.save(getBaseDirectory());
+
+// try {
+// ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
+// visualizer.read(new DataInputStream(new FileInputStream(
+// ThreadLogger.LOG_FILE)));
+// visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
+// + ".svg"));
+// } catch (FileNotFoundException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+
+ // System.err.println("-- load statistics --");
+ // for(Pair<ClusterUID, Integer> entry :
+ // CollectionUtils.valueSortedEntries(histogram)) {
+ // System.err.println(" " + entry.second + " " + entry.first);
+ // }
+
+// }
+
+ private void acquireAll() {
+ clusterLRU.acquireMutex();
+ fileLRU.acquireMutex();
+ streamLRU.acquireMutex();
+ csLRU.acquireMutex();
+ }
+
+ private void releaseAll() {
+ csLRU.releaseMutex();
+ streamLRU.releaseMutex();
+ fileLRU.releaseMutex();
+ clusterLRU.releaseMutex();
+ }
+
+ public void load() throws IOException {
+
+ // Main state
+ mainState = MainState.load(dbFolder);
+
+ lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
+
+ // Head State
+ try {
+ state = HeadState.load(lastSessionDirectory);
+ } catch (InvalidHeadStateException e) {
+ // For backwards compatibility only!
+ Throwable cause = e.getCause();
+ if (cause instanceof Throwable) {
+ try {
+ org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
+
+ HeadState newState = new HeadState();
+ newState.clusters = oldState.clusters;
+ newState.cs = oldState.cs;
+ newState.files = oldState.files;
+ newState.stream = oldState.stream;
+ newState.headChangeSetId = oldState.headChangeSetId;
+ newState.reservedIds = oldState.reservedIds;
+ newState.transactionId = oldState.transactionId;
+ state = newState;
+ } catch (InvalidHeadStateException e1) {
+ throw new IOException("Could not load HeadState due to corruption", e1);
+ }
+ } else {
+ // This should never happen as MainState.load() checks the integrity
+ // of head.state files and rolls back in cases of corruption until a
+ // consistent state is found (could be case 0 - initial db state)
+ // IF this does happen something is completely wrong
+ throw new IOException("Could not load HeadState due to corruption", e);
+ }
+ }
+
+ workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+ Files.createDirectories(workingDirectory);
+
+ csLRU = new LRU<Long, ChangeSetInfo>("Change Set", workingDirectory);
+ streamLRU = new LRU<String, ClusterStreamChunk>("Cluster Stream", workingDirectory);
+ clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
+ fileLRU = new LRU<String, FileInfo>("External Value", workingDirectory);
+
+ acquireAll();
+
+ // Clusters
+ for (String clusterKey : state.clusters) {
+ String[] parts1 = clusterKey.split("#");
+ String[] parts = parts1[0].split("\\.");
+ long first = new BigInteger(parts[0], 16).longValue();
+ long second = new BigInteger(parts[1], 16).longValue();
+ ClusterUID uuid = ClusterUID.make(first, second);
+ Path readDir = dbFolder.resolve(parts1[1]);
+ int offset = Integer.parseInt(parts1[2]);
+ int length = Integer.parseInt(parts1[3]);
+ clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
+ }
+ // Files
+ for (String fileKey : state.files) {
+// System.err.println("loadFile: " + fileKey);
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
+ fileLRU.map(info);
+ }
+ // Update chunks
+ for (String fileKey : state.stream) {
+// System.err.println("loadStream: " + fileKey);
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ ClusterStreamChunk info = new ClusterStreamChunk(this,
+ streamLRU, readDir, parts[0], offset, length);
+ streamLRU.map(info);
+ }
+ // Change sets
+ for (String fileKey : state.cs) {
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ Long revisionId = Long.parseLong(parts[0]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+ csLRU.map(info);
+ }
+
+ releaseAll();
+
+ }
+
+ public <T> T clone(ClusterUID uid, ClusterCreator creator)
+ throws DatabaseException {
+
+ clusterLRU.ensureUpdates(uid);
+
+ ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+ return info.clone(uid, creator);
+
+ }
+
+ //private int loadCounter = 0;
+
+ public static void startLog(String msg) {
+ tasks.put(msg, ThreadLogger.getInstance().begin(msg));
+ }
+
+ public static void endLog(String msg) {
+ ITask task = tasks.get(msg);
+ if (task != null)
+ task.finish();
+ }
+
+ static Map<String, ITask> tasks = new HashMap<String, ITask>();
+
+ public void update(ClusterUID uid, ClusterImpl clu) {
+
+ ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+ info.acquireMutex();
+ try {
+ info.update(clu);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ }
+
+ public long getClusterIdOrCreate(ClusterUID clusterUID) {
+ return 1;
+ }
+
+ public int getResourceKey(ClusterUID uid, int index) {
+ return clusterLRU.getResourceKey(uid, index);
+ }
+
+ public int getResourceKeyWitoutMutex(ClusterUID uid, int index) {
+ return clusterLRU.getResourceKeyWithoutMutex(uid, index);
+ }
+
+ public ClusterIds getClusterIds() throws ProCoreException {
+
+ clusterLRU.acquireMutex();
+
+ try {
+
+ Collection<ClusterInfo> infos = clusterLRU.values();
+ final int status = infos.size();
+ final long[] firsts = new long[status];
+ final long[] seconds = new long[status];
+
+ int index = 0;
+ for (ClusterInfo info : infos) {
+ firsts[index] = 0;
+ seconds[index] = info.getKey().second;
+ index++;
+ }
+
+ return new ClusterIds() {
+
+ @Override
+ public int getStatus() {
+ return status;
+ }
+
+ @Override
+ public long[] getFirst() {
+ return firsts;
+ }
+
+ @Override
+ public long[] getSecond() {
+ return seconds;
+ }
+
+ };
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ clusterLRU.releaseMutex();
+ }
+
+ }
+
+ public void addIntoCurrentChangeSet(String ccs) {
+
+ csLRU.acquireMutex();
+
+ try {
+
+ currentChanges.add(ccs);
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ csLRU.releaseMutex();
+
+ }
+
+ }
+
+ public void commitChangeSet(long changeSetId, byte[] data) {
+ csLRU.acquireMutex();
+ try {
+ ArrayList<String> csids = new ArrayList<String>(currentChanges);
+ currentChanges = new ArrayList<String>();
+ new ChangeSetInfo(csLRU, changeSetId, data, csids);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ csLRU.releaseMutex();
+ }
+ }
+
+ public byte[] getMetadata(long changeSetId) {
+
+ ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+ if (info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getMetadataBytes();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ }
+
+ public byte[] getResourceFile(final byte[] clusterUID,
+ final int resourceIndex) throws ProCoreException {
+
+ ClusterUID uid = ClusterUID.make(clusterUID, 0);
+ String key = uid.toString() + "_" + resourceIndex;
+ FileInfo info = fileLRU.getWithoutMutex(key);
+ if(info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getResourceFile();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ }
+
+ public ResourceSegment getResourceSegment(final byte[] clusterUID,
+ final int resourceIndex, final long segmentOffset, short segmentSize)
+ throws ProCoreException {
+
+ ClusterUID uid = ClusterUID.make(clusterUID, 0);
+
+ String key = uid.toString() + "_" + resourceIndex;
+ FileInfo info = fileLRU.getWithoutMutex(key);
+ if(info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ }
+
+ public void modiFileEx(ClusterUID uid, int resourceKey, long offset,
+ long size, byte[] bytes, long pos, ClusterSupport support) {
+
+ try {
+
+ String key = uid.toString()
+ + "_"
+ + ClusterTraits
+ .getResourceIndexFromResourceKey(resourceKey);
+
+ FileInfo info = null;
+
+ fileLRU.acquireMutex();
+
+ try {
+
+ info = fileLRU.get(key);
+ if (info == null)
+ info = new FileInfo(fileLRU, key, (int) (offset + size));
+
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ fileLRU.releaseMutex();
+
+ }
+
+ info.acquireMutex();
+ try {
+ info.updateData(bytes, offset, pos, size);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void shutdown() {
+ clusterLRU.shutdown();
+ fileLRU.shutdown();
+ streamLRU.shutdown();
+ csLRU.shutdown();
+ }
+
+}