1 package org.simantics.acorn;
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.HashMap;
12 import java.util.concurrent.atomic.AtomicBoolean;
14 import org.simantics.acorn.cluster.ClusterImpl;
15 import org.simantics.acorn.exception.AcornAccessVerificationException;
16 import org.simantics.acorn.exception.IllegalAcornStateException;
17 import org.simantics.acorn.exception.InvalidHeadStateException;
18 import org.simantics.acorn.internal.ClusterSupport2;
19 import org.simantics.acorn.lru.ChangeSetInfo;
20 import org.simantics.acorn.lru.ClusterInfo;
21 import org.simantics.acorn.lru.ClusterLRU;
22 import org.simantics.acorn.lru.ClusterStreamChunk;
23 import org.simantics.acorn.lru.FileInfo;
24 import org.simantics.acorn.lru.LRU;
25 import org.simantics.db.ClusterCreator;
26 import org.simantics.db.Database.Session.ClusterIds;
27 import org.simantics.db.Database.Session.ResourceSegment;
28 import org.simantics.db.ServiceLocator;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.impl.ClusterBase;
31 import org.simantics.db.impl.ClusterI;
32 import org.simantics.db.impl.ClusterSupport;
33 import org.simantics.db.procore.cluster.ClusterTraits;
34 import org.simantics.db.service.ClusterSetsSupport;
35 import org.simantics.db.service.ClusterUID;
36 import org.simantics.utils.threads.logger.ITask;
37 import org.simantics.utils.threads.logger.ThreadLogger;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 public class ClusterManager {
43 final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
45 private ArrayList<String> currentChanges = new ArrayList<String>();
47 public final Path dbFolder;
48 public Path lastSessionDirectory;
49 public Path workingDirectory;
51 public LRU<String, ClusterStreamChunk> streamLRU;
52 public LRU<Long, ChangeSetInfo> csLRU;
53 public ClusterLRU clusterLRU;
54 public LRU<String, FileInfo> fileLRU;
56 public MainState mainState;
57 public HeadState state;
59 private long lastSnapshot = System.nanoTime();
61 final public ClusterSupport2 support = new ClusterSupport2(this);
68 public ClusterManager(Path dbFolder) {
69 this.dbFolder = dbFolder;
72 public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
73 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
77 return info.getCCSIds();
83 public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
84 return clusterLRU.getClusterByClusterKey(clusterKey);
87 public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
88 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
91 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
92 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
95 public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
96 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
99 public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
100 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
103 public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
104 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
107 public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
108 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
111 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
112 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
115 public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
116 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
120 * Private implementation
124 private static long countFiles(Path directory) throws IOException {
125 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
127 for (@SuppressWarnings("unused") Path p : ds)
133 // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
134 private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
135 private IllegalAcornStateException cause;
137 public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
139 if (!safeToMakeSnapshot.get())
141 // Maximum autosave frequency is per 60s
142 if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
143 // System.err.println("lastSnapshot too early");
147 // Cluster files are always there
148 // Nothing has been written => no need to do anything
149 long amountOfFiles = countFiles(workingDirectory);
150 if(!fullSave && amountOfFiles == 0) {
151 // System.err.println("amountOfFiles < 3");
155 LOGGER.info("makeSnapshot");
157 // Schedule writing of all data to disk
160 // Wait for all files to be written
161 clusterLRU.shutdown();
163 streamLRU.shutdown();
166 // Lets check if it is still safe to make a snapshot
167 if (!safeToMakeSnapshot.get())
170 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
176 mainState.save(dbFolder);
178 amountOfFiles = countFiles(workingDirectory);
180 LOGGER.info(" -finished: amount of files is {}", amountOfFiles);
182 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
183 if (!Files.exists(workingDirectory)) {
184 Files.createDirectories(workingDirectory);
187 cssi.updateWriteDirectory(workingDirectory);
189 clusterLRU.setWriteDir(workingDirectory);
190 fileLRU.setWriteDir(workingDirectory);
191 streamLRU.setWriteDir(workingDirectory);
192 csLRU.setWriteDir(workingDirectory);
199 lastSnapshot = System.nanoTime();
202 } catch (IllegalAcornStateException e) {
203 notSafeToMakeSnapshot(e);
205 } catch (IOException e) {
206 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
207 notSafeToMakeSnapshot(e1);
212 private void refreshHeadState() throws IOException, IllegalAcornStateException {
213 state.clusters.clear();
215 state.stream.clear();
218 clusterLRU.persist(state.clusters);
219 fileLRU.persist(state.files);
220 streamLRU.persist(state.stream);
221 csLRU.persist(state.cs);
224 private void persistHeadState() throws IOException {
225 // Sync current working directory
226 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
227 state.save(workingDirectory);
232 // public void save() throws IOException {
234 // refreshHeadState();
236 // clusterLRU.shutdown();
237 // fileLRU.shutdown();
238 // streamLRU.shutdown();
241 // persistHeadState();
243 // mainState.save(getBaseDirectory());
246 // ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
247 // visualizer.read(new DataInputStream(new FileInputStream(
248 // ThreadLogger.LOG_FILE)));
249 // visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
251 // } catch (FileNotFoundException e) {
252 // // TODO Auto-generated catch block
253 // e.printStackTrace();
256 // System.err.println("-- load statistics --");
257 // for(Pair<ClusterUID, Integer> entry :
258 // CollectionUtils.valueSortedEntries(histogram)) {
259 // System.err.println(" " + entry.second + " " + entry.first);
264 private void acquireAll() throws IllegalAcornStateException {
265 clusterLRU.acquireMutex();
266 fileLRU.acquireMutex();
267 streamLRU.acquireMutex();
268 csLRU.acquireMutex();
271 private void releaseAll() {
272 csLRU.releaseMutex();
273 streamLRU.releaseMutex();
274 fileLRU.releaseMutex();
275 clusterLRU.releaseMutex();
278 private AtomicBoolean rollback = new AtomicBoolean(false);
280 boolean rolledback() {
281 return rollback.get();
284 public void load() throws IOException {
287 mainState = MainState.load(dbFolder, t -> rollback.set(true));
289 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
293 state = HeadState.load(lastSessionDirectory);
294 } catch (InvalidHeadStateException e) {
295 // For backwards compatibility only!
296 Throwable cause = e.getCause();
297 if (cause instanceof Throwable) {
299 org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
301 HeadState newState = new HeadState();
302 newState.clusters = oldState.clusters;
303 newState.cs = oldState.cs;
304 newState.files = oldState.files;
305 newState.stream = oldState.stream;
306 newState.headChangeSetId = oldState.headChangeSetId;
307 newState.reservedIds = oldState.reservedIds;
308 newState.transactionId = oldState.transactionId;
310 } catch (InvalidHeadStateException e1) {
311 throw new IOException("Could not load HeadState due to corruption", e1);
314 // This should never happen as MainState.load() checks the integrity
315 // of head.state files and rolls back in cases of corruption until a
316 // consistent state is found (could be case 0 - initial db state)
317 // IF this does happen something is completely wrong
318 throw new IOException("Could not load HeadState due to corruption", e);
322 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
323 Files.createDirectories(workingDirectory);
325 csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
326 streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
327 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
328 fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
333 for (String clusterKey : state.clusters) {
334 String[] parts1 = clusterKey.split("#");
335 String[] parts = parts1[0].split("\\.");
336 long first = new BigInteger(parts[0], 16).longValue();
337 long second = new BigInteger(parts[1], 16).longValue();
338 ClusterUID uuid = ClusterUID.make(first, second);
339 Path readDir = dbFolder.resolve(parts1[1]);
340 int offset = Integer.parseInt(parts1[2]);
341 int length = Integer.parseInt(parts1[3]);
342 clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
345 for (String fileKey : state.files) {
346 // System.err.println("loadFile: " + fileKey);
347 String[] parts = fileKey.split("#");
348 Path readDir = dbFolder.resolve(parts[1]);
349 int offset = Integer.parseInt(parts[2]);
350 int length = Integer.parseInt(parts[3]);
351 FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
355 for (String fileKey : state.stream) {
356 // System.err.println("loadStream: " + fileKey);
357 String[] parts = fileKey.split("#");
358 Path readDir = dbFolder.resolve(parts[1]);
359 int offset = Integer.parseInt(parts[2]);
360 int length = Integer.parseInt(parts[3]);
361 ClusterStreamChunk info = new ClusterStreamChunk(this,
362 streamLRU, readDir, parts[0], offset, length);
366 for (String fileKey : state.cs) {
367 String[] parts = fileKey.split("#");
368 Path readDir = dbFolder.resolve(parts[1]);
369 Long revisionId = Long.parseLong(parts[0]);
370 int offset = Integer.parseInt(parts[2]);
371 int length = Integer.parseInt(parts[3]);
372 ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
377 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
378 // ROLLBACK ONE DIR UNTIL WE ARE FINE!
379 throw new IOException(e);
383 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
385 clusterLRU.ensureUpdates(uid);
387 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
388 return info.clone(uid, creator);
391 //private int loadCounter = 0;
393 public static void startLog(String msg) {
394 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
397 public static void endLog(String msg) {
398 ITask task = tasks.get(msg);
403 static Map<String, ITask> tasks = new HashMap<String, ITask>();
405 public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
406 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
415 public long getClusterIdOrCreate(ClusterUID clusterUID) {
419 public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
420 return clusterLRU.getResourceKey(uid, index);
423 public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
424 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
427 public ClusterIds getClusterIds() throws IllegalAcornStateException {
428 clusterLRU.acquireMutex();
431 Collection<ClusterInfo> infos = clusterLRU.values();
432 final int status = infos.size();
433 final long[] firsts = new long[status];
434 final long[] seconds = new long[status];
437 for (ClusterInfo info : infos) {
439 seconds[index] = info.getKey().second;
443 return new ClusterIds() {
446 public int getStatus() {
451 public long[] getFirst() {
456 public long[] getSecond() {
462 } catch (Throwable t) {
463 throw new IllegalAcornStateException(t);
465 clusterLRU.releaseMutex();
469 public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
470 csLRU.acquireMutex();
473 currentChanges.add(ccs);
474 } catch (Throwable t) {
475 throw new IllegalAcornStateException(t);
477 csLRU.releaseMutex();
481 public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
482 csLRU.acquireMutex();
484 ArrayList<String> csids = new ArrayList<String>(currentChanges);
485 currentChanges = new ArrayList<String>();
486 new ChangeSetInfo(csLRU, changeSetId, data, csids);
487 } catch (Throwable t) {
488 throw new IllegalAcornStateException(t);
490 csLRU.releaseMutex();
494 public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
496 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
497 if (info == null) return null;
500 return info.getMetadataBytes();
501 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
503 } catch (Throwable t) {
504 throw new IllegalAcornStateException(t);
510 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
512 ClusterUID uid = ClusterUID.make(clusterUID, 0);
513 String key = uid.toString() + "_" + resourceIndex;
514 FileInfo info = fileLRU.getWithoutMutex(key);
515 if(info == null) return null;
518 return info.getResourceFile();
519 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
521 } catch (Throwable t) {
522 throw new IllegalAcornStateException(t);
528 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
529 ClusterUID uid = ClusterUID.make(clusterUID, 0);
531 String key = uid.toString() + "_" + resourceIndex;
532 FileInfo info = fileLRU.getWithoutMutex(key);
533 if(info == null) return null;
536 return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
537 } catch (Throwable t) {
538 throw new IllegalAcornStateException(t);
544 public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
546 String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
548 FileInfo info = null;
549 fileLRU.acquireMutex();
551 info = fileLRU.get(key);
553 info = new FileInfo(fileLRU, key, (int) (offset + size));
555 } catch (Throwable t) {
556 throw new IllegalAcornStateException(t);
558 fileLRU.releaseMutex();
563 info.updateData(bytes, offset, pos, size);
564 } catch (Throwable t) {
565 throw new IllegalAcornStateException(t);
569 } catch (DatabaseException e) {
574 public void shutdown() {
575 clusterLRU.shutdown();
577 streamLRU.shutdown();
581 public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
582 this.safeToMakeSnapshot.compareAndSet(true, false);