1 package org.simantics.acorn;
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.StandardCopyOption;
9 import java.util.ArrayList;
10 import java.util.Collection;
11 import java.util.HashMap;
13 import java.util.concurrent.atomic.AtomicBoolean;
15 import org.simantics.acorn.cluster.ClusterImpl;
16 import org.simantics.acorn.exception.AcornAccessVerificationException;
17 import org.simantics.acorn.exception.IllegalAcornStateException;
18 import org.simantics.acorn.exception.InvalidHeadStateException;
19 import org.simantics.acorn.internal.ClusterSupport2;
20 import org.simantics.acorn.lru.ChangeSetInfo;
21 import org.simantics.acorn.lru.ClusterInfo;
22 import org.simantics.acorn.lru.ClusterLRU;
23 import org.simantics.acorn.lru.ClusterStreamChunk;
24 import org.simantics.acorn.lru.FileInfo;
25 import org.simantics.acorn.lru.LRU;
26 import org.simantics.db.ClusterCreator;
27 import org.simantics.db.Database.Session.ClusterIds;
28 import org.simantics.db.Database.Session.ResourceSegment;
29 import org.simantics.db.ServiceLocator;
30 import org.simantics.db.exception.DatabaseException;
31 import org.simantics.db.impl.ClusterBase;
32 import org.simantics.db.impl.ClusterI;
33 import org.simantics.db.impl.ClusterSupport;
34 import org.simantics.db.procore.cluster.ClusterTraits;
35 import org.simantics.db.service.ClusterSetsSupport;
36 import org.simantics.db.service.ClusterUID;
37 import org.simantics.utils.FileUtils;
38 import org.simantics.utils.threads.logger.ITask;
39 import org.simantics.utils.threads.logger.ThreadLogger;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class ClusterManager {
45 final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
47 private ArrayList<String> currentChanges = new ArrayList<String>();
49 public final Path dbFolder;
50 private FileCache fileCache;
51 Path lastSessionDirectory;
52 Path workingDirectory;
54 public LRU<String, ClusterStreamChunk> streamLRU;
55 public LRU<Long, ChangeSetInfo> csLRU;
56 public ClusterLRU clusterLRU;
57 public LRU<String, FileInfo> fileLRU;
59 public MainState mainState;
60 public HeadState state;
62 private long lastSnapshot = System.nanoTime();
64 final public ClusterSupport2 support = new ClusterSupport2(this);
71 public ClusterManager(Path dbFolder, FileCache fileCache) {
72 this.dbFolder = dbFolder;
73 this.fileCache = fileCache;
76 public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
77 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
81 return info.getCCSIds();
87 public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
88 return clusterLRU.getClusterByClusterKey(clusterKey);
91 public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
92 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
95 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
96 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
99 public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
100 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
103 public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
104 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
107 public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
108 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
111 public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
112 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
115 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
116 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
119 public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
120 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
124 * Private implementation
128 private static long countFiles(Path directory) throws IOException {
129 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
131 for (@SuppressWarnings("unused") Path p : ds)
137 // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
138 private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
139 private IllegalAcornStateException cause;
141 public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException {
145 // Schedule writing of all data to disk
147 // Wait for files to be written
148 synchronizeWorkingDirectory();
150 String currentDir = workingDirectory.getFileName().toString();
151 Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline");
153 Files.createDirectories(baseline);
155 for(String clusterKey : state.clusters) {
156 String[] parts1 = clusterKey.split("#");
157 String[] parts = parts1[0].split("\\.");
158 String readDirName = parts1[1];
159 if(!readDirName.equals(currentDir)) {
160 String fileName = parts[0] + "." + parts[1] + ".cluster";
161 Path from = dbFolder.resolve(readDirName).resolve(fileName);
162 Path to = baseline.resolve(fileName);
163 LOGGER.info("purge copies " + from + " => " + to);
164 Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
165 long first = new BigInteger(parts[0], 16).longValue();
166 long second = new BigInteger(parts[1], 16).longValue();
167 ClusterUID uuid = ClusterUID.make(first, second);
168 ClusterInfo info = clusterLRU.getWithoutMutex(uuid);
169 info.moveTo(baseline);
173 for (String fileKey : state.files) {
174 String[] parts = fileKey.split("#");
175 String readDirName = parts[1];
176 if(!readDirName.equals(currentDir)) {
177 String fileName = parts[0] + ".extFile";
178 Path from = dbFolder.resolve(readDirName).resolve(fileName);
179 Path to = baseline.resolve(fileName);
180 LOGGER.info("purge copies " + from + " => " + to);
181 Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
182 FileInfo info = fileLRU.getWithoutMutex(parts[0]);
183 info.moveTo(baseline);
187 for (String fileKey : state.stream) {
188 String[] parts = fileKey.split("#");
189 String readDirName = parts[1];
190 if(!readDirName.equals(currentDir)) {
191 ClusterStreamChunk chunk = streamLRU.purge(parts[0]);
192 LOGGER.info("purge removes " + chunk);
197 for (String fileKey : state.cs) {
198 String[] parts = fileKey.split("#");
199 String readDirName = parts[1];
200 if(!readDirName.equals(currentDir)) {
201 Long revisionId = Long.parseLong(parts[0]);
202 ChangeSetInfo info = csLRU.purge(revisionId);
203 LOGGER.info("purge removes " + info);
205 // Path readDir = dbFolder.resolve(parts[1]);
206 // Long revisionId = Long.parseLong(parts[0]);
207 // int offset = Integer.parseInt(parts[2]);
208 // int length = Integer.parseInt(parts[3]);
209 // ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
213 state.tailChangeSetId = state.headChangeSetId;
215 makeSnapshot(locator, true);
217 Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f));
219 } catch (IllegalAcornStateException e) {
220 notSafeToMakeSnapshot(e);
222 } catch (IOException e) {
223 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
224 notSafeToMakeSnapshot(e1);
226 } catch (AcornAccessVerificationException e) {
227 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
228 notSafeToMakeSnapshot(e1);
234 void tryPurgeDirectory(Path f) {
237 LOGGER.info("purge deletes " + f);
239 String currentDir = f.getFileName().toString();
240 if(currentDir.endsWith("db"))
243 if(currentDir.endsWith("_baseline"))
244 currentDir = currentDir.replace("_baseline", "");
246 int ordinal = Integer.parseInt(currentDir);
247 if(ordinal < mainState.headDir - 1) {
248 LOGGER.info("purge deletes " + f);
249 FileUtils.deleteDir(f.toFile());
254 public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
256 if (!safeToMakeSnapshot.get())
258 // Maximum autosave frequency is per 60s
259 if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
260 // LOGGER.info("lastSnapshot too early");
264 // Cluster files are always there
265 // Nothing has been written => no need to do anything
266 long amountOfFiles = countFiles(workingDirectory);
267 if(!fullSave && amountOfFiles == 0) {
268 //LOGGER.info("makeSnapshot: " + amountOfFiles + " files, skipping snapshot");
272 LOGGER.info("makeSnapshot: start with " + amountOfFiles + " files");
274 // Schedule writing of all data to disk
277 // Wait for all files to be written
278 clusterLRU.shutdown();
280 streamLRU.shutdown();
283 // Lets check if it is still safe to make a snapshot
284 if (!safeToMakeSnapshot.get())
287 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
292 if (LOGGER.isInfoEnabled()) {
293 amountOfFiles = countFiles(workingDirectory);
294 LOGGER.info(" -finished: amount of files is {}", amountOfFiles);
297 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
298 Files.createDirectories(workingDirectory);
300 cssi.updateWriteDirectory(workingDirectory);
302 clusterLRU.setWriteDir(workingDirectory);
303 fileLRU.setWriteDir(workingDirectory);
304 streamLRU.setWriteDir(workingDirectory);
305 csLRU.setWriteDir(workingDirectory);
312 lastSnapshot = System.nanoTime();
315 } catch (IllegalAcornStateException e) {
316 notSafeToMakeSnapshot(e);
318 } catch (IOException e) {
319 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
320 notSafeToMakeSnapshot(e1);
325 private void refreshHeadState() throws IOException, IllegalAcornStateException {
326 state.clusters.clear();
328 state.stream.clear();
331 clusterLRU.persist(state.clusters);
332 fileLRU.persist(state.files);
333 streamLRU.persist(state.stream);
334 csLRU.persist(state.cs);
337 private void synchronizeWorkingDirectory() throws IOException {
338 // Sync current working directory
339 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
342 private void persistHeadState() throws IOException {
343 synchronizeWorkingDirectory();
344 state.save(workingDirectory);
349 // public void save() throws IOException {
351 // refreshHeadState();
353 // clusterLRU.shutdown();
354 // fileLRU.shutdown();
355 // streamLRU.shutdown();
358 // persistHeadState();
360 // mainState.save(getBaseDirectory());
363 // ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
364 // visualizer.read(new DataInputStream(new FileInputStream(
365 // ThreadLogger.LOG_FILE)));
366 // visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
368 // } catch (FileNotFoundException e) {
369 // // TODO Auto-generated catch block
370 // e.printStackTrace();
373 // LOGGER.info("-- load statistics --");
374 // for(Pair<ClusterUID, Integer> entry :
375 // CollectionUtils.valueSortedEntries(histogram)) {
376 // LOGGER.info(" " + entry.second + " " + entry.first);
381 private void acquireAll() throws IllegalAcornStateException {
382 clusterLRU.acquireMutex();
383 fileLRU.acquireMutex();
384 streamLRU.acquireMutex();
385 csLRU.acquireMutex();
388 private void releaseAll() {
389 csLRU.releaseMutex();
390 streamLRU.releaseMutex();
391 fileLRU.releaseMutex();
392 clusterLRU.releaseMutex();
395 private AtomicBoolean rollback = new AtomicBoolean(false);
397 boolean rolledback() {
398 return rollback.get();
401 public void load() throws IOException {
404 mainState = MainState.load(dbFolder, () -> rollback.set(true));
406 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
409 if (mainState.isInitial()) {
410 state = new HeadState();
413 state = HeadState.load(lastSessionDirectory);
414 } catch (InvalidHeadStateException e) {
415 // For backwards compatibility only!
416 Throwable cause = e.getCause();
417 if (cause instanceof Throwable) {
419 org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
421 HeadState newState = new HeadState();
422 newState.clusters = oldState.clusters;
423 newState.cs = oldState.cs;
424 newState.files = oldState.files;
425 newState.stream = oldState.stream;
426 newState.headChangeSetId = oldState.headChangeSetId;
427 newState.reservedIds = oldState.reservedIds;
428 newState.transactionId = oldState.transactionId;
430 } catch (InvalidHeadStateException e1) {
431 throw new IOException("Could not load HeadState due to corruption", e1);
434 // This should never happen as MainState.load() checks the integrity
435 // of head.state files and rolls back in cases of corruption until a
436 // consistent state is found (could be case 0 - initial db state)
437 // IF this does happen something is completely wrong
438 throw new IOException("Could not load HeadState due to corruption", e);
443 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
444 Files.createDirectories(workingDirectory);
446 csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
447 streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
448 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
449 fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
454 for (String clusterKey : state.clusters) {
455 String[] parts1 = clusterKey.split("#");
456 String[] parts = parts1[0].split("\\.");
457 long first = new BigInteger(parts[0], 16).longValue();
458 long second = new BigInteger(parts[1], 16).longValue();
459 ClusterUID uuid = ClusterUID.make(first, second);
460 Path readDir = dbFolder.resolve(parts1[1]);
461 int offset = Integer.parseInt(parts1[2]);
462 int length = Integer.parseInt(parts1[3]);
463 clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
466 for (String fileKey : state.files) {
467 // LOGGER.info("loadFile: " + fileKey);
468 String[] parts = fileKey.split("#");
469 Path readDir = dbFolder.resolve(parts[1]);
470 int offset = Integer.parseInt(parts[2]);
471 int length = Integer.parseInt(parts[3]);
472 FileInfo info = new FileInfo(fileLRU, fileCache, readDir, parts[0], offset, length);
476 for (String fileKey : state.stream) {
477 // LOGGER.info("loadStream: " + fileKey);
478 String[] parts = fileKey.split("#");
479 Path readDir = dbFolder.resolve(parts[1]);
480 int offset = Integer.parseInt(parts[2]);
481 int length = Integer.parseInt(parts[3]);
482 ClusterStreamChunk info = new ClusterStreamChunk(this,
483 streamLRU, readDir, parts[0], offset, length);
487 for (String fileKey : state.cs) {
488 String[] parts = fileKey.split("#");
489 Path readDir = dbFolder.resolve(parts[1]);
490 Long revisionId = Long.parseLong(parts[0]);
491 int offset = Integer.parseInt(parts[2]);
492 int length = Integer.parseInt(parts[3]);
493 ChangeSetInfo info = new ChangeSetInfo(csLRU, fileCache, readDir, revisionId, offset, length);
498 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
499 // ROLLBACK ONE DIR UNTIL WE ARE FINE!
500 throw new IOException(e);
504 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
506 clusterLRU.ensureUpdates(uid);
508 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
509 return info.clone(uid, creator);
512 //private int loadCounter = 0;
514 public static void startLog(String msg) {
515 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
518 public static void endLog(String msg) {
519 ITask task = tasks.get(msg);
524 static Map<String, ITask> tasks = new HashMap<String, ITask>();
526 public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
527 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
536 public long getClusterIdOrCreate(ClusterUID clusterUID) {
540 public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
541 return clusterLRU.getResourceKey(uid, index);
544 public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
545 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
548 public ClusterIds getClusterIds() throws IllegalAcornStateException {
549 clusterLRU.acquireMutex();
552 Collection<ClusterInfo> infos = clusterLRU.values();
553 final int status = infos.size();
554 final long[] firsts = new long[status];
555 final long[] seconds = new long[status];
558 for (ClusterInfo info : infos) {
560 seconds[index] = info.getKey().second;
564 return new ClusterIds() {
567 public int getStatus() {
572 public long[] getFirst() {
577 public long[] getSecond() {
583 } catch (Throwable t) {
584 throw new IllegalAcornStateException(t);
586 clusterLRU.releaseMutex();
590 public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
591 csLRU.acquireMutex();
594 currentChanges.add(ccs);
595 } catch (Throwable t) {
596 throw new IllegalAcornStateException(t);
598 csLRU.releaseMutex();
602 public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
603 csLRU.acquireMutex();
605 ArrayList<String> csids = new ArrayList<String>(currentChanges);
606 currentChanges = new ArrayList<String>();
607 new ChangeSetInfo(csLRU, fileCache, changeSetId, data, csids);
608 } catch (Throwable t) {
609 throw new IllegalAcornStateException(t);
611 csLRU.releaseMutex();
615 public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
617 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
618 if (info == null) return null;
621 return info.getMetadataBytes();
622 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
624 } catch (Throwable t) {
625 throw new IllegalAcornStateException(t);
631 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
633 ClusterUID uid = ClusterUID.make(clusterUID, 0);
634 String key = uid.toString() + "_" + resourceIndex;
635 FileInfo info = fileLRU.getWithoutMutex(key);
636 if(info == null) return null;
639 return info.getResourceFile();
640 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
642 } catch (Throwable t) {
643 throw new IllegalAcornStateException(t);
649 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
650 ClusterUID uid = ClusterUID.make(clusterUID, 0);
652 String key = uid.toString() + "_" + resourceIndex;
653 FileInfo info = fileLRU.getWithoutMutex(key);
654 if(info == null) return null;
657 return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
658 } catch (Throwable t) {
659 throw new IllegalAcornStateException(t);
665 public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
667 String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
669 FileInfo info = null;
670 fileLRU.acquireMutex();
672 info = fileLRU.get(key);
674 info = new FileInfo(fileLRU, fileCache, key, (int) (offset + size));
676 } catch (Throwable t) {
677 throw new IllegalAcornStateException(t);
679 fileLRU.releaseMutex();
684 info.updateData(bytes, offset, pos, size);
685 } catch (Throwable t) {
686 throw new IllegalAcornStateException(t);
690 } catch (DatabaseException e) {
695 public void shutdown() {
696 clusterLRU.shutdown();
698 streamLRU.shutdown();
702 public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
703 this.safeToMakeSnapshot.compareAndSet(true, false);
707 public long getTailChangeSetId() {
708 return state.tailChangeSetId;
711 public FileCache getFileCache() {