]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / ClusterManager.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java
new file mode 100644 (file)
index 0000000..5b8e5ab
--- /dev/null
@@ -0,0 +1,584 @@
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.simantics.acorn.cluster.ClusterImpl;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.acorn.lru.ChangeSetInfo;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterLRU;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.FileInfo;
+import org.simantics.acorn.lru.LRU;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.Database.Session.ClusterIds;
+import org.simantics.db.Database.Session.ResourceSegment;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+
+public class ClusterManager {
+
+       private ArrayList<String> currentChanges = new ArrayList<String>();
+
+       public final Path dbFolder;
+       public Path lastSessionDirectory;
+       public Path workingDirectory;
+
+       public LRU<String, ClusterStreamChunk> streamLRU;
+       public LRU<Long, ChangeSetInfo> csLRU;
+       public ClusterLRU clusterLRU;
+       public LRU<String, FileInfo> fileLRU;
+
+       public MainState mainState;
+       public HeadState state;
+
+       private long lastSnapshot = System.nanoTime();
+
+       final public ClusterSupport2 support = new ClusterSupport2(this);
+
+       /*
+        * Public interface
+        * 
+        */
+
+       public ClusterManager(Path dbFolder) {
+               this.dbFolder = dbFolder;
+       }
+
+       public ArrayList<String> getChanges(long changeSetId) {
+               ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+               info.acquireMutex();
+               try {
+                       info.makeResident();
+                       return info.getCSSIds();
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+       }
+
+       public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException {
+               return clusterLRU.getClusterByClusterKey(clusterKey);
+       }
+       
+       public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException {
+               return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
+       }
+
+       public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException {
+               return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
+       }
+
+       public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) {
+               return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
+       }
+
+       public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) {
+               return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
+       }
+
+       public int getClusterKeyByUID(long id1, long id2) throws DatabaseException {
+               return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
+       }
+       
+       public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException {
+               return clusterLRU.getClusterProxyByResourceKey(resourceKey);
+       }
+
+       public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
+               return clusterLRU.getClusterUIDByResourceKey(resourceKey);
+       }
+
+       public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException {
+               return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
+       }
+
+       /*
+        * Private implementation
+        * 
+        */
+
+       private static long countFiles(Path directory) throws IOException {
+               try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
+                       int count = 0;
+                       for (@SuppressWarnings("unused") Path p : ds)
+                               ++count;
+                       return count;
+               }
+       }
+
+       public synchronized boolean makeSnapshot(ServiceLocator locator, boolean force) throws IOException {
+
+               // Maximum autosave frequency is per 60s
+               if(!force && System.nanoTime() - lastSnapshot < 10*1000000000L) {
+//                 System.err.println("lastSnapshot too early");
+                   return false;
+               }
+
+               // Cluster files are always there 
+               // Nothing has been written => no need to do anything
+               long amountOfFiles = countFiles(workingDirectory);
+               if(!force && amountOfFiles < 3) {
+//                 System.err.println("amountOfFiles < 3");
+                   return false;
+               }
+
+               System.err.println("makeSnapshot");
+
+               // Schedule writing of all data to disk
+               refreshHeadState();
+
+               // Wait for all files to be written
+               clusterLRU.shutdown();
+               fileLRU.shutdown();
+               streamLRU.shutdown();
+               csLRU.shutdown();
+               
+               persistHeadState();
+               
+               mainState.save(dbFolder);
+
+               ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
+               cssi.save();
+
+               amountOfFiles = countFiles(workingDirectory);
+               
+               System.err.println(" -finished: amount of files is " + amountOfFiles);
+
+               workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+               if (!Files.exists(workingDirectory)) {
+                   Files.createDirectories(workingDirectory);
+               }
+
+               cssi.updateReadAndWriteDirectories(lastSessionDirectory, workingDirectory);
+
+               clusterLRU.setWriteDir(workingDirectory);
+               fileLRU.setWriteDir(workingDirectory);
+               streamLRU.setWriteDir(workingDirectory);
+               csLRU.setWriteDir(workingDirectory);
+
+               clusterLRU.resume();
+               fileLRU.resume();
+               streamLRU.resume();
+               csLRU.resume();
+
+               lastSnapshot = System.nanoTime();
+               
+               return true;
+               
+       }
+       
+       public void refreshHeadState() throws IOException {
+
+               state.clusters.clear();
+               state.files.clear();
+               state.stream.clear();
+               state.cs.clear();
+
+               clusterLRU.persist(state.clusters);
+               fileLRU.persist(state.files);
+               streamLRU.persist(state.stream);
+               csLRU.persist(state.cs);
+
+       }
+       
+       public void persistHeadState() throws IOException {
+
+               // Sync current working directory
+               Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
+               state.save(workingDirectory);
+               mainState.headDir++;
+       }
+
+       
+//     public void save() throws IOException {
+//
+//             refreshHeadState();
+//             
+//             clusterLRU.shutdown();
+//             fileLRU.shutdown();
+//             streamLRU.shutdown();
+//             csLRU.shutdown();
+//
+//             persistHeadState();
+//
+//             mainState.save(getBaseDirectory());
+
+//             try {
+//                     ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
+//                     visualizer.read(new DataInputStream(new FileInputStream(
+//                                     ThreadLogger.LOG_FILE)));
+//                     visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
+//                                     + ".svg"));
+//             } catch (FileNotFoundException e) {
+//                     // TODO Auto-generated catch block
+//                     e.printStackTrace();
+//             }
+
+               // System.err.println("-- load statistics --");
+               // for(Pair<ClusterUID, Integer> entry :
+               // CollectionUtils.valueSortedEntries(histogram)) {
+               // System.err.println(" " + entry.second + " " + entry.first);
+               // }
+
+//     }
+       
+       private void acquireAll() {
+               clusterLRU.acquireMutex();
+               fileLRU.acquireMutex();
+               streamLRU.acquireMutex();
+               csLRU.acquireMutex();
+       }
+       
+       private void releaseAll() {
+               csLRU.releaseMutex();
+               streamLRU.releaseMutex();
+               fileLRU.releaseMutex();
+               clusterLRU.releaseMutex();
+       }
+
+       public void load() throws IOException {
+
+               // Main state
+               mainState = MainState.load(dbFolder);
+
+               lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
+               
+               // Head State
+               try {
+            state = HeadState.load(lastSessionDirectory);
+        } catch (InvalidHeadStateException e) {
+            // For backwards compatibility only!
+            Throwable cause = e.getCause();
+            if (cause instanceof Throwable) {
+                try {
+                    org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
+                    
+                    HeadState newState = new HeadState();
+                    newState.clusters = oldState.clusters;
+                    newState.cs = oldState.cs;
+                    newState.files = oldState.files;
+                    newState.stream = oldState.stream;
+                    newState.headChangeSetId = oldState.headChangeSetId;
+                    newState.reservedIds = oldState.reservedIds;
+                    newState.transactionId = oldState.transactionId;
+                    state = newState;
+                } catch (InvalidHeadStateException e1) {
+                    throw new IOException("Could not load HeadState due to corruption", e1);
+                }
+            } else {
+                // This should never happen as MainState.load() checks the integrity
+                // of head.state files and rolls back in cases of corruption until a
+                // consistent state is found (could be case 0 - initial db state)
+                // IF this does happen something is completely wrong
+                throw new IOException("Could not load HeadState due to corruption", e);
+            }
+        }
+
+               workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+               Files.createDirectories(workingDirectory);
+
+               csLRU = new LRU<Long, ChangeSetInfo>("Change Set", workingDirectory);
+               streamLRU = new LRU<String, ClusterStreamChunk>("Cluster Stream", workingDirectory);
+               clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
+               fileLRU = new LRU<String, FileInfo>("External Value", workingDirectory);
+
+               acquireAll();
+               
+               // Clusters
+               for (String clusterKey : state.clusters) {
+                       String[] parts1 = clusterKey.split("#");
+                       String[] parts = parts1[0].split("\\.");
+                       long first = new BigInteger(parts[0], 16).longValue();
+                       long second = new BigInteger(parts[1], 16).longValue();
+                       ClusterUID uuid = ClusterUID.make(first, second);
+                       Path readDir = dbFolder.resolve(parts1[1]);
+                       int offset = Integer.parseInt(parts1[2]);
+                       int length = Integer.parseInt(parts1[3]);
+                       clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
+               }
+               // Files
+               for (String fileKey : state.files) {
+//                     System.err.println("loadFile: " + fileKey);
+                       String[] parts = fileKey.split("#");
+                       Path readDir = dbFolder.resolve(parts[1]);
+                       int offset = Integer.parseInt(parts[2]);
+                       int length = Integer.parseInt(parts[3]);
+                       FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
+                       fileLRU.map(info);
+               }
+               // Update chunks
+               for (String fileKey : state.stream) {
+//                     System.err.println("loadStream: " + fileKey);
+                       String[] parts = fileKey.split("#");
+                       Path readDir = dbFolder.resolve(parts[1]);
+                       int offset = Integer.parseInt(parts[2]);
+                       int length = Integer.parseInt(parts[3]);
+                       ClusterStreamChunk info = new ClusterStreamChunk(this,
+                                       streamLRU, readDir, parts[0], offset, length);
+                       streamLRU.map(info);
+               }
+               // Change sets
+               for (String fileKey : state.cs) {
+                       String[] parts = fileKey.split("#");
+                       Path readDir = dbFolder.resolve(parts[1]);
+                       Long revisionId = Long.parseLong(parts[0]);
+                       int offset = Integer.parseInt(parts[2]);
+                       int length = Integer.parseInt(parts[3]);
+                       ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+                       csLRU.map(info);
+               }
+               
+               releaseAll();
+
+       }
+
+       public <T> T clone(ClusterUID uid, ClusterCreator creator)
+                       throws DatabaseException {
+               
+               clusterLRU.ensureUpdates(uid);
+               
+               ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+               return info.clone(uid, creator);
+
+       }
+
+       //private int loadCounter = 0;
+
+       public static void startLog(String msg) {
+               tasks.put(msg, ThreadLogger.getInstance().begin(msg));
+       }
+
+       public static void endLog(String msg) {
+               ITask task = tasks.get(msg);
+               if (task != null)
+                       task.finish();
+       }
+
+       static Map<String, ITask> tasks = new HashMap<String, ITask>();
+
+       public void update(ClusterUID uid, ClusterImpl clu) {
+
+               ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+               info.acquireMutex();
+               try {
+                       info.update(clu);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+               
+       }
+
+       public long getClusterIdOrCreate(ClusterUID clusterUID) {
+               return 1;
+       }
+
+       public int getResourceKey(ClusterUID uid, int index) {
+               return clusterLRU.getResourceKey(uid, index);
+       }
+
+       public int getResourceKeyWitoutMutex(ClusterUID uid, int index) {
+               return clusterLRU.getResourceKeyWithoutMutex(uid, index);
+       }
+
+       public ClusterIds getClusterIds() throws ProCoreException {
+
+               clusterLRU.acquireMutex();
+
+               try {
+
+                       Collection<ClusterInfo> infos = clusterLRU.values();
+                       final int status = infos.size();
+                       final long[] firsts = new long[status];
+                       final long[] seconds = new long[status];
+
+                       int index = 0;
+                       for (ClusterInfo info : infos) {
+                               firsts[index] = 0;
+                               seconds[index] = info.getKey().second;
+                               index++;
+                       }
+
+                       return new ClusterIds() {
+
+                               @Override
+                               public int getStatus() {
+                                       return status;
+                               }
+
+                               @Override
+                               public long[] getFirst() {
+                                       return firsts;
+                               }
+
+                               @Override
+                               public long[] getSecond() {
+                                       return seconds;
+                               }
+
+                       };
+
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       clusterLRU.releaseMutex();
+               }
+
+       }
+
+       public void addIntoCurrentChangeSet(String ccs) {
+               
+               csLRU.acquireMutex();
+
+               try {
+                       
+                       currentChanges.add(ccs);
+                       
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       
+                       csLRU.releaseMutex();
+                       
+               }
+
+       }
+
+       public void commitChangeSet(long changeSetId, byte[] data) {
+               csLRU.acquireMutex();
+               try {
+                       ArrayList<String> csids = new ArrayList<String>(currentChanges);
+                       currentChanges = new ArrayList<String>();
+                       new ChangeSetInfo(csLRU, changeSetId, data, csids);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       csLRU.releaseMutex();
+               }
+       }
+
+       public byte[] getMetadata(long changeSetId) {
+               
+               ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+               if (info == null) return null;
+               info.acquireMutex();
+               try {
+                       return info.getMetadataBytes();
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+               
+       }
+
+       public byte[] getResourceFile(final byte[] clusterUID,
+                       final int resourceIndex) throws ProCoreException {
+
+               ClusterUID uid = ClusterUID.make(clusterUID, 0);
+               String key = uid.toString() + "_" + resourceIndex;
+               FileInfo info = fileLRU.getWithoutMutex(key);
+               if(info == null) return null;
+               info.acquireMutex();
+               try {
+                       return info.getResourceFile();
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+
+       }
+
+       public ResourceSegment getResourceSegment(final byte[] clusterUID,
+                       final int resourceIndex, final long segmentOffset, short segmentSize)
+                       throws ProCoreException {
+
+               ClusterUID uid = ClusterUID.make(clusterUID, 0);
+
+               String key = uid.toString() + "_" + resourceIndex;
+               FileInfo info = fileLRU.getWithoutMutex(key);
+               if(info == null) return null;
+               info.acquireMutex();
+               try {
+                       return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+
+       }
+
+       public void modiFileEx(ClusterUID uid, int resourceKey, long offset,
+                       long size, byte[] bytes, long pos, ClusterSupport support) {
+
+               try {
+
+                       String key = uid.toString()
+                                       + "_"
+                                       + ClusterTraits
+                                                       .getResourceIndexFromResourceKey(resourceKey);
+
+                       FileInfo info = null;
+
+                       fileLRU.acquireMutex();
+
+                       try {
+
+                               info = fileLRU.get(key);
+                               if (info == null)
+                                       info = new FileInfo(fileLRU, key, (int) (offset + size));
+                               
+                               
+                       } catch (Throwable t) {
+                               throw new IllegalStateException(t);
+                       } finally {
+                               
+                               fileLRU.releaseMutex();
+                               
+                       }
+                       
+                       info.acquireMutex();
+                       try {
+                               info.updateData(bytes, offset, pos, size);
+                       } catch (Throwable t) {
+                               throw new IllegalStateException(t);
+                       } finally {
+                               info.releaseMutex();
+                       }
+
+               } catch (DatabaseException e) {
+                       e.printStackTrace();
+               }
+
+       }
+
+    public void shutdown() {
+        clusterLRU.shutdown();
+        fileLRU.shutdown();
+        streamLRU.shutdown();
+        csLRU.shutdown();
+    }
+
+}