1 package org.simantics.acorn;
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.StandardCopyOption;
9 import java.util.ArrayList;
10 import java.util.Collection;
11 import java.util.HashMap;
13 import java.util.concurrent.atomic.AtomicBoolean;
15 import org.simantics.acorn.cluster.ClusterImpl;
16 import org.simantics.acorn.exception.AcornAccessVerificationException;
17 import org.simantics.acorn.exception.IllegalAcornStateException;
18 import org.simantics.acorn.exception.InvalidHeadStateException;
19 import org.simantics.acorn.internal.ClusterSupport2;
20 import org.simantics.acorn.lru.ChangeSetInfo;
21 import org.simantics.acorn.lru.ClusterInfo;
22 import org.simantics.acorn.lru.ClusterLRU;
23 import org.simantics.acorn.lru.ClusterStreamChunk;
24 import org.simantics.acorn.lru.FileInfo;
25 import org.simantics.acorn.lru.LRU;
26 import org.simantics.db.ClusterCreator;
27 import org.simantics.db.Database.Session.ClusterIds;
28 import org.simantics.db.Database.Session.ResourceSegment;
29 import org.simantics.db.ServiceLocator;
30 import org.simantics.db.exception.DatabaseException;
31 import org.simantics.db.impl.ClusterBase;
32 import org.simantics.db.impl.ClusterI;
33 import org.simantics.db.impl.ClusterSupport;
34 import org.simantics.db.procore.cluster.ClusterTraits;
35 import org.simantics.db.service.ClusterSetsSupport;
36 import org.simantics.db.service.ClusterUID;
37 import org.simantics.utils.FileUtils;
38 import org.simantics.utils.threads.logger.ITask;
39 import org.simantics.utils.threads.logger.ThreadLogger;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class ClusterManager {
45 final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
47 private ArrayList<String> currentChanges = new ArrayList<String>();
49 public final Path dbFolder;
50 public Path lastSessionDirectory;
51 public Path workingDirectory;
53 public LRU<String, ClusterStreamChunk> streamLRU;
54 public LRU<Long, ChangeSetInfo> csLRU;
55 public ClusterLRU clusterLRU;
56 public LRU<String, FileInfo> fileLRU;
58 public MainState mainState;
59 public HeadState state;
61 private long lastSnapshot = System.nanoTime();
63 final public ClusterSupport2 support = new ClusterSupport2(this);
70 public ClusterManager(Path dbFolder) {
71 this.dbFolder = dbFolder;
74 public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
75 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
79 return info.getCCSIds();
85 public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
86 return clusterLRU.getClusterByClusterKey(clusterKey);
89 public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
90 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
93 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
94 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
97 public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
98 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
101 public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
102 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
105 public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
106 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
109 public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
110 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
113 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
114 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
117 public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
118 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
122 * Private implementation
126 private static long countFiles(Path directory) throws IOException {
127 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
129 for (@SuppressWarnings("unused") Path p : ds)
135 // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
136 private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
137 private IllegalAcornStateException cause;
139 public synchronized void purge(ServiceLocator locator) throws IllegalAcornStateException {
143 // Schedule writing of all data to disk
145 // Wait for files to be written
146 synchronizeWorkingDirectory();
148 String currentDir = workingDirectory.getFileName().toString();
149 Path baseline = workingDirectory.resolveSibling(currentDir + "_baseline");
151 Files.createDirectories(baseline);
153 for(String clusterKey : state.clusters) {
154 String[] parts1 = clusterKey.split("#");
155 String[] parts = parts1[0].split("\\.");
156 String readDirName = parts1[1];
157 if(!readDirName.equals(currentDir)) {
158 String fileName = parts[0] + "." + parts[1] + ".cluster";
159 Path from = dbFolder.resolve(readDirName).resolve(fileName);
160 Path to = baseline.resolve(fileName);
161 System.err.println("purge copies " + from + " => " + to);
162 Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
163 long first = new BigInteger(parts[0], 16).longValue();
164 long second = new BigInteger(parts[1], 16).longValue();
165 ClusterUID uuid = ClusterUID.make(first, second);
166 ClusterInfo info = clusterLRU.getWithoutMutex(uuid);
167 info.moveTo(baseline);
171 for (String fileKey : state.files) {
172 String[] parts = fileKey.split("#");
173 String readDirName = parts[1];
174 if(!readDirName.equals(currentDir)) {
175 String fileName = parts[0] + ".extFile";
176 Path from = dbFolder.resolve(readDirName).resolve(fileName);
177 Path to = baseline.resolve(fileName);
178 System.err.println("purge copies " + from + " => " + to);
179 Files.copy(from, to, StandardCopyOption.COPY_ATTRIBUTES);
180 FileInfo info = fileLRU.getWithoutMutex(parts[0]);
181 info.moveTo(baseline);
185 for (String fileKey : state.stream) {
186 String[] parts = fileKey.split("#");
187 String readDirName = parts[1];
188 if(!readDirName.equals(currentDir)) {
189 ClusterStreamChunk chunk = streamLRU.purge(parts[0]);
190 System.err.println("purge removes " + chunk);
195 for (String fileKey : state.cs) {
196 String[] parts = fileKey.split("#");
197 String readDirName = parts[1];
198 if(!readDirName.equals(currentDir)) {
199 Long revisionId = Long.parseLong(parts[0]);
200 ChangeSetInfo info = csLRU.purge(revisionId);
201 System.err.println("purge removes " + info);
203 // Path readDir = dbFolder.resolve(parts[1]);
204 // Long revisionId = Long.parseLong(parts[0]);
205 // int offset = Integer.parseInt(parts[2]);
206 // int length = Integer.parseInt(parts[3]);
207 // ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
211 state.tailChangeSetId = state.headChangeSetId;
213 makeSnapshot(locator, true);
215 Files.walk(dbFolder, 1).filter(Files::isDirectory).forEach(f -> tryPurgeDirectory(f));
217 } catch (IllegalAcornStateException e) {
218 notSafeToMakeSnapshot(e);
220 } catch (IOException e) {
221 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
222 notSafeToMakeSnapshot(e1);
224 } catch (AcornAccessVerificationException e) {
225 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
226 notSafeToMakeSnapshot(e1);
232 void tryPurgeDirectory(Path f) {
235 System.err.println("purge deletes " + f);
237 String currentDir = f.getFileName().toString();
238 if(currentDir.endsWith("db"))
241 if(currentDir.endsWith("_baseline"))
242 currentDir = currentDir.replace("_baseline", "");
244 int ordinal = Integer.parseInt(currentDir);
245 if(ordinal < mainState.headDir - 1) {
246 System.err.println("purge deletes " + f);
247 FileUtils.deleteDir(f.toFile());
252 public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
254 if (!safeToMakeSnapshot.get())
256 // Maximum autosave frequency is per 60s
257 if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
258 // System.err.println("lastSnapshot too early");
262 // Cluster files are always there
263 // Nothing has been written => no need to do anything
264 long amountOfFiles = countFiles(workingDirectory);
265 if(!fullSave && amountOfFiles == 0) {
266 // System.err.println("amountOfFiles < 3");
270 LOGGER.info("makeSnapshot");
272 // Schedule writing of all data to disk
275 // Wait for all files to be written
276 clusterLRU.shutdown();
278 streamLRU.shutdown();
281 // Lets check if it is still safe to make a snapshot
282 if (!safeToMakeSnapshot.get())
285 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
291 mainState.save(dbFolder);
293 amountOfFiles = countFiles(workingDirectory);
295 LOGGER.info(" -finished: amount of files is {}", amountOfFiles);
297 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
298 if (!Files.exists(workingDirectory)) {
299 Files.createDirectories(workingDirectory);
302 cssi.updateWriteDirectory(workingDirectory);
304 clusterLRU.setWriteDir(workingDirectory);
305 fileLRU.setWriteDir(workingDirectory);
306 streamLRU.setWriteDir(workingDirectory);
307 csLRU.setWriteDir(workingDirectory);
314 lastSnapshot = System.nanoTime();
317 } catch (IllegalAcornStateException e) {
318 notSafeToMakeSnapshot(e);
320 } catch (IOException e) {
321 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
322 notSafeToMakeSnapshot(e1);
327 private void refreshHeadState() throws IOException, IllegalAcornStateException {
328 state.clusters.clear();
330 state.stream.clear();
333 clusterLRU.persist(state.clusters);
334 fileLRU.persist(state.files);
335 streamLRU.persist(state.stream);
336 csLRU.persist(state.cs);
339 private void synchronizeWorkingDirectory() throws IOException {
340 // Sync current working directory
341 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
344 private void persistHeadState() throws IOException {
345 synchronizeWorkingDirectory();
346 state.save(workingDirectory);
351 // public void save() throws IOException {
353 // refreshHeadState();
355 // clusterLRU.shutdown();
356 // fileLRU.shutdown();
357 // streamLRU.shutdown();
360 // persistHeadState();
362 // mainState.save(getBaseDirectory());
365 // ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
366 // visualizer.read(new DataInputStream(new FileInputStream(
367 // ThreadLogger.LOG_FILE)));
368 // visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
370 // } catch (FileNotFoundException e) {
371 // // TODO Auto-generated catch block
372 // e.printStackTrace();
375 // System.err.println("-- load statistics --");
376 // for(Pair<ClusterUID, Integer> entry :
377 // CollectionUtils.valueSortedEntries(histogram)) {
378 // System.err.println(" " + entry.second + " " + entry.first);
383 private void acquireAll() throws IllegalAcornStateException {
384 clusterLRU.acquireMutex();
385 fileLRU.acquireMutex();
386 streamLRU.acquireMutex();
387 csLRU.acquireMutex();
390 private void releaseAll() {
391 csLRU.releaseMutex();
392 streamLRU.releaseMutex();
393 fileLRU.releaseMutex();
394 clusterLRU.releaseMutex();
397 private AtomicBoolean rollback = new AtomicBoolean(false);
399 boolean rolledback() {
400 return rollback.get();
403 public void load() throws IOException {
406 mainState = MainState.load(dbFolder, () -> rollback.set(true));
408 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
412 state = HeadState.load(lastSessionDirectory);
413 } catch (InvalidHeadStateException e) {
414 // For backwards compatibility only!
415 Throwable cause = e.getCause();
416 if (cause instanceof Throwable) {
418 org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
420 HeadState newState = new HeadState();
421 newState.clusters = oldState.clusters;
422 newState.cs = oldState.cs;
423 newState.files = oldState.files;
424 newState.stream = oldState.stream;
425 newState.headChangeSetId = oldState.headChangeSetId;
426 newState.reservedIds = oldState.reservedIds;
427 newState.transactionId = oldState.transactionId;
429 } catch (InvalidHeadStateException e1) {
430 throw new IOException("Could not load HeadState due to corruption", e1);
433 // This should never happen as MainState.load() checks the integrity
434 // of head.state files and rolls back in cases of corruption until a
435 // consistent state is found (could be case 0 - initial db state)
436 // IF this does happen something is completely wrong
437 throw new IOException("Could not load HeadState due to corruption", e);
441 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
442 Files.createDirectories(workingDirectory);
444 csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
445 streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
446 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
447 fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
452 for (String clusterKey : state.clusters) {
453 String[] parts1 = clusterKey.split("#");
454 String[] parts = parts1[0].split("\\.");
455 long first = new BigInteger(parts[0], 16).longValue();
456 long second = new BigInteger(parts[1], 16).longValue();
457 ClusterUID uuid = ClusterUID.make(first, second);
458 Path readDir = dbFolder.resolve(parts1[1]);
459 int offset = Integer.parseInt(parts1[2]);
460 int length = Integer.parseInt(parts1[3]);
461 clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
464 for (String fileKey : state.files) {
465 // System.err.println("loadFile: " + fileKey);
466 String[] parts = fileKey.split("#");
467 Path readDir = dbFolder.resolve(parts[1]);
468 int offset = Integer.parseInt(parts[2]);
469 int length = Integer.parseInt(parts[3]);
470 FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
474 for (String fileKey : state.stream) {
475 // System.err.println("loadStream: " + fileKey);
476 String[] parts = fileKey.split("#");
477 Path readDir = dbFolder.resolve(parts[1]);
478 int offset = Integer.parseInt(parts[2]);
479 int length = Integer.parseInt(parts[3]);
480 ClusterStreamChunk info = new ClusterStreamChunk(this,
481 streamLRU, readDir, parts[0], offset, length);
485 for (String fileKey : state.cs) {
486 String[] parts = fileKey.split("#");
487 Path readDir = dbFolder.resolve(parts[1]);
488 Long revisionId = Long.parseLong(parts[0]);
489 int offset = Integer.parseInt(parts[2]);
490 int length = Integer.parseInt(parts[3]);
491 ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
496 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
497 // ROLLBACK ONE DIR UNTIL WE ARE FINE!
498 throw new IOException(e);
502 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
504 clusterLRU.ensureUpdates(uid);
506 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
507 return info.clone(uid, creator);
510 //private int loadCounter = 0;
512 public static void startLog(String msg) {
513 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
516 public static void endLog(String msg) {
517 ITask task = tasks.get(msg);
522 static Map<String, ITask> tasks = new HashMap<String, ITask>();
524 public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
525 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
534 public long getClusterIdOrCreate(ClusterUID clusterUID) {
538 public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
539 return clusterLRU.getResourceKey(uid, index);
542 public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
543 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
546 public ClusterIds getClusterIds() throws IllegalAcornStateException {
547 clusterLRU.acquireMutex();
550 Collection<ClusterInfo> infos = clusterLRU.values();
551 final int status = infos.size();
552 final long[] firsts = new long[status];
553 final long[] seconds = new long[status];
556 for (ClusterInfo info : infos) {
558 seconds[index] = info.getKey().second;
562 return new ClusterIds() {
565 public int getStatus() {
570 public long[] getFirst() {
575 public long[] getSecond() {
581 } catch (Throwable t) {
582 throw new IllegalAcornStateException(t);
584 clusterLRU.releaseMutex();
588 public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
589 csLRU.acquireMutex();
592 currentChanges.add(ccs);
593 } catch (Throwable t) {
594 throw new IllegalAcornStateException(t);
596 csLRU.releaseMutex();
600 public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
601 csLRU.acquireMutex();
603 ArrayList<String> csids = new ArrayList<String>(currentChanges);
604 currentChanges = new ArrayList<String>();
605 new ChangeSetInfo(csLRU, changeSetId, data, csids);
606 } catch (Throwable t) {
607 throw new IllegalAcornStateException(t);
609 csLRU.releaseMutex();
613 public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
615 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
616 if (info == null) return null;
619 return info.getMetadataBytes();
620 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
622 } catch (Throwable t) {
623 throw new IllegalAcornStateException(t);
629 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
631 ClusterUID uid = ClusterUID.make(clusterUID, 0);
632 String key = uid.toString() + "_" + resourceIndex;
633 FileInfo info = fileLRU.getWithoutMutex(key);
634 if(info == null) return null;
637 return info.getResourceFile();
638 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
640 } catch (Throwable t) {
641 throw new IllegalAcornStateException(t);
647 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
648 ClusterUID uid = ClusterUID.make(clusterUID, 0);
650 String key = uid.toString() + "_" + resourceIndex;
651 FileInfo info = fileLRU.getWithoutMutex(key);
652 if(info == null) return null;
655 return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
656 } catch (Throwable t) {
657 throw new IllegalAcornStateException(t);
663 public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
665 String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
667 FileInfo info = null;
668 fileLRU.acquireMutex();
670 info = fileLRU.get(key);
672 info = new FileInfo(fileLRU, key, (int) (offset + size));
674 } catch (Throwable t) {
675 throw new IllegalAcornStateException(t);
677 fileLRU.releaseMutex();
682 info.updateData(bytes, offset, pos, size);
683 } catch (Throwable t) {
684 throw new IllegalAcornStateException(t);
688 } catch (DatabaseException e) {
693 public void shutdown() {
694 clusterLRU.shutdown();
696 streamLRU.shutdown();
700 public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
701 this.safeToMakeSnapshot.compareAndSet(true, false);
705 public long getTailChangeSetId() {
706 return state.tailChangeSetId;