1 package org.simantics.acorn;
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.HashMap;
13 import org.simantics.acorn.cluster.ClusterImpl;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.acorn.lru.ChangeSetInfo;
16 import org.simantics.acorn.lru.ClusterInfo;
17 import org.simantics.acorn.lru.ClusterLRU;
18 import org.simantics.acorn.lru.ClusterStreamChunk;
19 import org.simantics.acorn.lru.FileInfo;
20 import org.simantics.acorn.lru.LRU;
21 import org.simantics.db.ClusterCreator;
22 import org.simantics.db.ServiceLocator;
23 import org.simantics.db.Database.Session.ClusterIds;
24 import org.simantics.db.Database.Session.ResourceSegment;
25 import org.simantics.db.exception.DatabaseException;
26 import org.simantics.db.impl.ClusterBase;
27 import org.simantics.db.impl.ClusterI;
28 import org.simantics.db.impl.ClusterSupport;
29 import org.simantics.db.procore.cluster.ClusterTraits;
30 import org.simantics.db.server.ProCoreException;
31 import org.simantics.db.service.ClusterSetsSupport;
32 import org.simantics.db.service.ClusterUID;
33 import org.simantics.utils.threads.logger.ITask;
34 import org.simantics.utils.threads.logger.ThreadLogger;
36 public class ClusterManager {
38 private ArrayList<String> currentChanges = new ArrayList<String>();
40 public final Path dbFolder;
41 public Path lastSessionDirectory;
42 public Path workingDirectory;
44 public LRU<String, ClusterStreamChunk> streamLRU;
45 public LRU<Long, ChangeSetInfo> csLRU;
46 public ClusterLRU clusterLRU;
47 public LRU<String, FileInfo> fileLRU;
49 public MainState mainState;
50 public HeadState state;
52 private long lastSnapshot = System.nanoTime();
54 final public ClusterSupport2 support = new ClusterSupport2(this);
61 public ClusterManager(Path dbFolder) {
62 this.dbFolder = dbFolder;
65 public ArrayList<String> getChanges(long changeSetId) {
66 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
70 return info.getCSSIds();
71 } catch (Throwable t) {
72 throw new IllegalStateException(t);
78 public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException {
79 return clusterLRU.getClusterByClusterKey(clusterKey);
82 public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException {
83 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
86 public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException {
87 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
90 public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) {
91 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
94 public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) {
95 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
98 public int getClusterKeyByUID(long id1, long id2) throws DatabaseException {
99 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
102 public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException {
103 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
106 public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
107 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
110 public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException {
111 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
115 * Private implementation
119 private static long countFiles(Path directory) throws IOException {
120 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
122 for (@SuppressWarnings("unused") Path p : ds)
128 public synchronized boolean makeSnapshot(ServiceLocator locator, boolean force) throws IOException {
130 // Maximum autosave frequency is per 60s
131 if(!force && System.nanoTime() - lastSnapshot < 10*1000000000L) {
132 // System.err.println("lastSnapshot too early");
136 // Cluster files are always there
137 // Nothing has been written => no need to do anything
138 long amountOfFiles = countFiles(workingDirectory);
139 if(!force && amountOfFiles < 3) {
140 // System.err.println("amountOfFiles < 3");
144 System.err.println("makeSnapshot");
146 // Schedule writing of all data to disk
149 // Wait for all files to be written
150 clusterLRU.shutdown();
152 streamLRU.shutdown();
157 mainState.save(dbFolder);
159 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
162 amountOfFiles = countFiles(workingDirectory);
164 System.err.println(" -finished: amount of files is " + amountOfFiles);
166 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
167 if (!Files.exists(workingDirectory)) {
168 Files.createDirectories(workingDirectory);
171 cssi.updateReadAndWriteDirectories(lastSessionDirectory, workingDirectory);
173 clusterLRU.setWriteDir(workingDirectory);
174 fileLRU.setWriteDir(workingDirectory);
175 streamLRU.setWriteDir(workingDirectory);
176 csLRU.setWriteDir(workingDirectory);
183 lastSnapshot = System.nanoTime();
189 public void refreshHeadState() throws IOException {
191 state.clusters.clear();
193 state.stream.clear();
196 clusterLRU.persist(state.clusters);
197 fileLRU.persist(state.files);
198 streamLRU.persist(state.stream);
199 csLRU.persist(state.cs);
203 public void persistHeadState() throws IOException {
205 // Sync current working directory
206 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
207 state.save(workingDirectory);
212 // public void save() throws IOException {
214 // refreshHeadState();
216 // clusterLRU.shutdown();
217 // fileLRU.shutdown();
218 // streamLRU.shutdown();
221 // persistHeadState();
223 // mainState.save(getBaseDirectory());
226 // ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
227 // visualizer.read(new DataInputStream(new FileInputStream(
228 // ThreadLogger.LOG_FILE)));
229 // visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
231 // } catch (FileNotFoundException e) {
232 // // TODO Auto-generated catch block
233 // e.printStackTrace();
236 // System.err.println("-- load statistics --");
237 // for(Pair<ClusterUID, Integer> entry :
238 // CollectionUtils.valueSortedEntries(histogram)) {
239 // System.err.println(" " + entry.second + " " + entry.first);
244 private void acquireAll() {
245 clusterLRU.acquireMutex();
246 fileLRU.acquireMutex();
247 streamLRU.acquireMutex();
248 csLRU.acquireMutex();
251 private void releaseAll() {
252 csLRU.releaseMutex();
253 streamLRU.releaseMutex();
254 fileLRU.releaseMutex();
255 clusterLRU.releaseMutex();
258 public void load() throws IOException {
261 mainState = MainState.load(dbFolder);
263 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
267 state = HeadState.load(lastSessionDirectory);
268 } catch (InvalidHeadStateException e) {
269 // For backwards compatibility only!
270 Throwable cause = e.getCause();
271 if (cause instanceof Throwable) {
273 org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
275 HeadState newState = new HeadState();
276 newState.clusters = oldState.clusters;
277 newState.cs = oldState.cs;
278 newState.files = oldState.files;
279 newState.stream = oldState.stream;
280 newState.headChangeSetId = oldState.headChangeSetId;
281 newState.reservedIds = oldState.reservedIds;
282 newState.transactionId = oldState.transactionId;
284 } catch (InvalidHeadStateException e1) {
285 throw new IOException("Could not load HeadState due to corruption", e1);
288 // This should never happen as MainState.load() checks the integrity
289 // of head.state files and rolls back in cases of corruption until a
290 // consistent state is found (could be case 0 - initial db state)
291 // IF this does happen something is completely wrong
292 throw new IOException("Could not load HeadState due to corruption", e);
296 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
297 Files.createDirectories(workingDirectory);
299 csLRU = new LRU<Long, ChangeSetInfo>("Change Set", workingDirectory);
300 streamLRU = new LRU<String, ClusterStreamChunk>("Cluster Stream", workingDirectory);
301 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
302 fileLRU = new LRU<String, FileInfo>("External Value", workingDirectory);
307 for (String clusterKey : state.clusters) {
308 String[] parts1 = clusterKey.split("#");
309 String[] parts = parts1[0].split("\\.");
310 long first = new BigInteger(parts[0], 16).longValue();
311 long second = new BigInteger(parts[1], 16).longValue();
312 ClusterUID uuid = ClusterUID.make(first, second);
313 Path readDir = dbFolder.resolve(parts1[1]);
314 int offset = Integer.parseInt(parts1[2]);
315 int length = Integer.parseInt(parts1[3]);
316 clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
319 for (String fileKey : state.files) {
320 // System.err.println("loadFile: " + fileKey);
321 String[] parts = fileKey.split("#");
322 Path readDir = dbFolder.resolve(parts[1]);
323 int offset = Integer.parseInt(parts[2]);
324 int length = Integer.parseInt(parts[3]);
325 FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
329 for (String fileKey : state.stream) {
330 // System.err.println("loadStream: " + fileKey);
331 String[] parts = fileKey.split("#");
332 Path readDir = dbFolder.resolve(parts[1]);
333 int offset = Integer.parseInt(parts[2]);
334 int length = Integer.parseInt(parts[3]);
335 ClusterStreamChunk info = new ClusterStreamChunk(this,
336 streamLRU, readDir, parts[0], offset, length);
340 for (String fileKey : state.cs) {
341 String[] parts = fileKey.split("#");
342 Path readDir = dbFolder.resolve(parts[1]);
343 Long revisionId = Long.parseLong(parts[0]);
344 int offset = Integer.parseInt(parts[2]);
345 int length = Integer.parseInt(parts[3]);
346 ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
354 public <T> T clone(ClusterUID uid, ClusterCreator creator)
355 throws DatabaseException {
357 clusterLRU.ensureUpdates(uid);
359 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
360 return info.clone(uid, creator);
364 //private int loadCounter = 0;
366 public static void startLog(String msg) {
367 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
370 public static void endLog(String msg) {
371 ITask task = tasks.get(msg);
376 static Map<String, ITask> tasks = new HashMap<String, ITask>();
378 public void update(ClusterUID uid, ClusterImpl clu) {
380 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
384 } catch (Throwable t) {
385 throw new IllegalStateException(t);
392 public long getClusterIdOrCreate(ClusterUID clusterUID) {
396 public int getResourceKey(ClusterUID uid, int index) {
397 return clusterLRU.getResourceKey(uid, index);
400 public int getResourceKeyWitoutMutex(ClusterUID uid, int index) {
401 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
404 public ClusterIds getClusterIds() throws ProCoreException {
406 clusterLRU.acquireMutex();
410 Collection<ClusterInfo> infos = clusterLRU.values();
411 final int status = infos.size();
412 final long[] firsts = new long[status];
413 final long[] seconds = new long[status];
416 for (ClusterInfo info : infos) {
418 seconds[index] = info.getKey().second;
422 return new ClusterIds() {
425 public int getStatus() {
430 public long[] getFirst() {
435 public long[] getSecond() {
441 } catch (Throwable t) {
442 throw new IllegalStateException(t);
444 clusterLRU.releaseMutex();
449 public void addIntoCurrentChangeSet(String ccs) {
451 csLRU.acquireMutex();
455 currentChanges.add(ccs);
457 } catch (Throwable t) {
458 throw new IllegalStateException(t);
461 csLRU.releaseMutex();
467 public void commitChangeSet(long changeSetId, byte[] data) {
468 csLRU.acquireMutex();
470 ArrayList<String> csids = new ArrayList<String>(currentChanges);
471 currentChanges = new ArrayList<String>();
472 new ChangeSetInfo(csLRU, changeSetId, data, csids);
473 } catch (Throwable t) {
474 throw new IllegalStateException(t);
476 csLRU.releaseMutex();
480 public byte[] getMetadata(long changeSetId) {
482 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
483 if (info == null) return null;
486 return info.getMetadataBytes();
487 } catch (Throwable t) {
488 throw new IllegalStateException(t);
495 public byte[] getResourceFile(final byte[] clusterUID,
496 final int resourceIndex) throws ProCoreException {
498 ClusterUID uid = ClusterUID.make(clusterUID, 0);
499 String key = uid.toString() + "_" + resourceIndex;
500 FileInfo info = fileLRU.getWithoutMutex(key);
501 if(info == null) return null;
504 return info.getResourceFile();
505 } catch (Throwable t) {
506 throw new IllegalStateException(t);
513 public ResourceSegment getResourceSegment(final byte[] clusterUID,
514 final int resourceIndex, final long segmentOffset, short segmentSize)
515 throws ProCoreException {
517 ClusterUID uid = ClusterUID.make(clusterUID, 0);
519 String key = uid.toString() + "_" + resourceIndex;
520 FileInfo info = fileLRU.getWithoutMutex(key);
521 if(info == null) return null;
524 return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
525 } catch (Throwable t) {
526 throw new IllegalStateException(t);
533 public void modiFileEx(ClusterUID uid, int resourceKey, long offset,
534 long size, byte[] bytes, long pos, ClusterSupport support) {
538 String key = uid.toString()
541 .getResourceIndexFromResourceKey(resourceKey);
543 FileInfo info = null;
545 fileLRU.acquireMutex();
549 info = fileLRU.get(key);
551 info = new FileInfo(fileLRU, key, (int) (offset + size));
554 } catch (Throwable t) {
555 throw new IllegalStateException(t);
558 fileLRU.releaseMutex();
564 info.updateData(bytes, offset, pos, size);
565 } catch (Throwable t) {
566 throw new IllegalStateException(t);
571 } catch (DatabaseException e) {
577 public void shutdown() {
578 clusterLRU.shutdown();
580 streamLRU.shutdown();