]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java
Make Write-interfaces as @FunctionalInterface for lambdas
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / ClusterManager.java
1 package org.simantics.acorn;
2
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.HashMap;
11 import java.util.Map;
12 import java.util.concurrent.atomic.AtomicBoolean;
13
14 import org.simantics.acorn.cluster.ClusterImpl;
15 import org.simantics.acorn.exception.AcornAccessVerificationException;
16 import org.simantics.acorn.exception.IllegalAcornStateException;
17 import org.simantics.acorn.exception.InvalidHeadStateException;
18 import org.simantics.acorn.internal.ClusterSupport2;
19 import org.simantics.acorn.lru.ChangeSetInfo;
20 import org.simantics.acorn.lru.ClusterInfo;
21 import org.simantics.acorn.lru.ClusterLRU;
22 import org.simantics.acorn.lru.ClusterStreamChunk;
23 import org.simantics.acorn.lru.FileInfo;
24 import org.simantics.acorn.lru.LRU;
25 import org.simantics.db.ClusterCreator;
26 import org.simantics.db.Database.Session.ClusterIds;
27 import org.simantics.db.Database.Session.ResourceSegment;
28 import org.simantics.db.ServiceLocator;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.impl.ClusterBase;
31 import org.simantics.db.impl.ClusterI;
32 import org.simantics.db.impl.ClusterSupport;
33 import org.simantics.db.procore.cluster.ClusterTraits;
34 import org.simantics.db.service.ClusterSetsSupport;
35 import org.simantics.db.service.ClusterUID;
36 import org.simantics.utils.threads.logger.ITask;
37 import org.simantics.utils.threads.logger.ThreadLogger;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class ClusterManager {
42     
43     final static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); 
44
45         private ArrayList<String> currentChanges = new ArrayList<String>();
46
47         public final Path dbFolder;
48         public Path lastSessionDirectory;
49         public Path workingDirectory;
50
51         public LRU<String, ClusterStreamChunk> streamLRU;
52         public LRU<Long, ChangeSetInfo> csLRU;
53         public ClusterLRU clusterLRU;
54         public LRU<String, FileInfo> fileLRU;
55
56         public MainState mainState;
57         public HeadState state;
58
59         private long lastSnapshot = System.nanoTime();
60
61         final public ClusterSupport2 support = new ClusterSupport2(this);
62
63         /*
64          * Public interface
65          * 
66          */
67
68         public ClusterManager(Path dbFolder) {
69                 this.dbFolder = dbFolder;
70         }
71
72         public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
73                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
74                 info.acquireMutex();
75                 try {
76                         info.makeResident();
77                         return info.getCCSIds();
78                 } finally {
79                         info.releaseMutex();
80                 }
81         }
82
83         public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
84                 return clusterLRU.getClusterByClusterKey(clusterKey);
85         }
86         
87         public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
88                 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
89         }
90
91         public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
92                 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
93         }
94
95         public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
96                 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
97         }
98
99         public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
100                 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
101         }
102
103         public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
104                 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
105         }
106         
107         public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
108                 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
109         }
110
111         public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
112                 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
113         }
114
115         public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
116                 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
117         }
118
119         /*
120          * Private implementation
121          * 
122          */
123
124         private static long countFiles(Path directory) throws IOException {
125                 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
126                         int count = 0;
127                         for (@SuppressWarnings("unused") Path p : ds)
128                                 ++count;
129                         return count;
130                 }
131         }
132
133         // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
134         private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
135         private IllegalAcornStateException cause;
136         
137         public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
138             try {
139             if (!safeToMakeSnapshot.get())
140                 throw cause;
141                 // Maximum autosave frequency is per 60s
142                 if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
143     //              System.err.println("lastSnapshot too early");
144                     return false;
145                 }
146     
147                 // Cluster files are always there 
148                 // Nothing has been written => no need to do anything
149                 long amountOfFiles = countFiles(workingDirectory);
150                 if(!fullSave && amountOfFiles == 0) {
151     //              System.err.println("amountOfFiles < 3");
152                     return false;
153                 }
154     
155                 LOGGER.info("makeSnapshot");
156     
157                 // Schedule writing of all data to disk
158             refreshHeadState();
159     
160                 // Wait for all files to be written
161                 clusterLRU.shutdown();
162                 fileLRU.shutdown();
163                 streamLRU.shutdown();
164                 csLRU.shutdown();
165                 
166                 // Lets check if it is still safe to make a snapshot
167                 if (!safeToMakeSnapshot.get())
168                     throw cause;
169                 
170                 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
171                 cssi.save();
172
173                 persistHeadState();
174
175                 if (fullSave)
176                     mainState.save(dbFolder);
177     
178                 amountOfFiles = countFiles(workingDirectory);
179                 
180                 LOGGER.info(" -finished: amount of files is {}", amountOfFiles);
181     
182                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
183                 if (!Files.exists(workingDirectory)) {
184                     Files.createDirectories(workingDirectory);
185                 }
186     
187                 cssi.updateWriteDirectory(workingDirectory);
188     
189                 clusterLRU.setWriteDir(workingDirectory);
190                 fileLRU.setWriteDir(workingDirectory);
191                 streamLRU.setWriteDir(workingDirectory);
192                 csLRU.setWriteDir(workingDirectory);
193     
194                 clusterLRU.resume();
195                 fileLRU.resume();
196                 streamLRU.resume();
197                 csLRU.resume();
198     
199                 lastSnapshot = System.nanoTime();
200                 
201                 return true;
202             } catch (IllegalAcornStateException e) {
203                 notSafeToMakeSnapshot(e);
204                 throw e;
205             } catch (IOException e) {
206                 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
207                 notSafeToMakeSnapshot(e1);
208                 throw e1;
209             }
210         }
211         
212         private void refreshHeadState() throws IOException, IllegalAcornStateException {
213                 state.clusters.clear();
214                 state.files.clear();
215                 state.stream.clear();
216                 state.cs.clear();
217
218                 clusterLRU.persist(state.clusters);
219                 fileLRU.persist(state.files);
220                 streamLRU.persist(state.stream);
221                 csLRU.persist(state.cs);
222         }
223         
224         private void persistHeadState() throws IOException {
225                 // Sync current working directory
226                 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
227                 state.save(workingDirectory);
228                 mainState.headDir++;
229         }
230
231         
232 //      public void save() throws IOException {
233 //
234 //              refreshHeadState();
235 //              
236 //              clusterLRU.shutdown();
237 //              fileLRU.shutdown();
238 //              streamLRU.shutdown();
239 //              csLRU.shutdown();
240 //
241 //              persistHeadState();
242 //
243 //              mainState.save(getBaseDirectory());
244
245 //              try {
246 //                      ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
247 //                      visualizer.read(new DataInputStream(new FileInputStream(
248 //                                      ThreadLogger.LOG_FILE)));
249 //                      visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
250 //                                      + ".svg"));
251 //              } catch (FileNotFoundException e) {
252 //                      // TODO Auto-generated catch block
253 //                      e.printStackTrace();
254 //              }
255
256                 // System.err.println("-- load statistics --");
257                 // for(Pair<ClusterUID, Integer> entry :
258                 // CollectionUtils.valueSortedEntries(histogram)) {
259                 // System.err.println(" " + entry.second + " " + entry.first);
260                 // }
261
262 //      }
263         
264         private void acquireAll() throws IllegalAcornStateException {
265                 clusterLRU.acquireMutex();
266                 fileLRU.acquireMutex();
267                 streamLRU.acquireMutex();
268                 csLRU.acquireMutex();
269         }
270         
271         private void releaseAll() {
272                 csLRU.releaseMutex();
273                 streamLRU.releaseMutex();
274                 fileLRU.releaseMutex();
275                 clusterLRU.releaseMutex();
276         }
277
278         private AtomicBoolean rollback = new AtomicBoolean(false);
279         
280         boolean rolledback() {
281             return rollback.get();
282         }
283         
284         public void load() throws IOException {
285
286                 // Main state
287                 mainState = MainState.load(dbFolder, t -> rollback.set(true));
288
289                 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
290                 
291                 // Head State
292                 try {
293             state = HeadState.load(lastSessionDirectory);
294         } catch (InvalidHeadStateException e) {
295             // For backwards compatibility only!
296             Throwable cause = e.getCause();
297             if (cause instanceof Throwable) {
298                 try {
299                     org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
300                     
301                     HeadState newState = new HeadState();
302                     newState.clusters = oldState.clusters;
303                     newState.cs = oldState.cs;
304                     newState.files = oldState.files;
305                     newState.stream = oldState.stream;
306                     newState.headChangeSetId = oldState.headChangeSetId;
307                     newState.reservedIds = oldState.reservedIds;
308                     newState.transactionId = oldState.transactionId;
309                     state = newState;
310                 } catch (InvalidHeadStateException e1) {
311                     throw new IOException("Could not load HeadState due to corruption", e1);
312                 }
313             } else {
314                 // This should never happen as MainState.load() checks the integrity
315                 // of head.state files and rolls back in cases of corruption until a
316                 // consistent state is found (could be case 0 - initial db state)
317                 // IF this does happen something is completely wrong
318                 throw new IOException("Could not load HeadState due to corruption", e);
319             }
320         }
321                 try {
322                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
323                 Files.createDirectories(workingDirectory);
324     
325                 csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
326                 streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
327                 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
328                 fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
329     
330                 acquireAll();
331                 
332                 // Clusters
333                 for (String clusterKey : state.clusters) {
334                         String[] parts1 = clusterKey.split("#");
335                         String[] parts = parts1[0].split("\\.");
336                         long first = new BigInteger(parts[0], 16).longValue();
337                         long second = new BigInteger(parts[1], 16).longValue();
338                         ClusterUID uuid = ClusterUID.make(first, second);
339                         Path readDir = dbFolder.resolve(parts1[1]);
340                         int offset = Integer.parseInt(parts1[2]);
341                         int length = Integer.parseInt(parts1[3]);
342                         clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
343                 }
344                 // Files
345                 for (String fileKey : state.files) {
346     //                  System.err.println("loadFile: " + fileKey);
347                         String[] parts = fileKey.split("#");
348                         Path readDir = dbFolder.resolve(parts[1]);
349                         int offset = Integer.parseInt(parts[2]);
350                         int length = Integer.parseInt(parts[3]);
351                         FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
352                         fileLRU.map(info);
353                 }
354                 // Update chunks
355                 for (String fileKey : state.stream) {
356     //                  System.err.println("loadStream: " + fileKey);
357                         String[] parts = fileKey.split("#");
358                         Path readDir = dbFolder.resolve(parts[1]);
359                         int offset = Integer.parseInt(parts[2]);
360                         int length = Integer.parseInt(parts[3]);
361                         ClusterStreamChunk info = new ClusterStreamChunk(this,
362                                         streamLRU, readDir, parts[0], offset, length);
363                         streamLRU.map(info);
364                 }
365                 // Change sets
366                 for (String fileKey : state.cs) {
367                         String[] parts = fileKey.split("#");
368                         Path readDir = dbFolder.resolve(parts[1]);
369                         Long revisionId = Long.parseLong(parts[0]);
370                         int offset = Integer.parseInt(parts[2]);
371                         int length = Integer.parseInt(parts[3]);
372                         ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
373                         csLRU.map(info);
374                 }
375                 
376                 releaseAll();
377                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
378                     // ROLLBACK ONE DIR UNTIL WE ARE FINE!
379                     throw new IOException(e);
380                 }
381         }
382
383         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
384                 
385                 clusterLRU.ensureUpdates(uid);
386                 
387                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
388                 return info.clone(uid, creator);
389         }
390
391         //private int loadCounter = 0;
392
393         public static void startLog(String msg) {
394                 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
395         }
396
397         public static void endLog(String msg) {
398                 ITask task = tasks.get(msg);
399                 if (task != null)
400                         task.finish();
401         }
402
403         static Map<String, ITask> tasks = new HashMap<String, ITask>();
404
405         public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
406                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
407                 info.acquireMutex();
408                 try {
409                         info.update(clu);
410                 } finally {
411                         info.releaseMutex();
412                 }
413         }
414
415         public long getClusterIdOrCreate(ClusterUID clusterUID) {
416                 return 1;
417         }
418
419         public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
420                 return clusterLRU.getResourceKey(uid, index);
421         }
422
423         public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
424                 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
425         }
426
427         public ClusterIds getClusterIds() throws IllegalAcornStateException {
428                 clusterLRU.acquireMutex();
429
430                 try {
431                         Collection<ClusterInfo> infos = clusterLRU.values();
432                         final int status = infos.size();
433                         final long[] firsts = new long[status];
434                         final long[] seconds = new long[status];
435
436                         int index = 0;
437                         for (ClusterInfo info : infos) {
438                                 firsts[index] = 0;
439                                 seconds[index] = info.getKey().second;
440                                 index++;
441                         }
442
443                         return new ClusterIds() {
444
445                                 @Override
446                                 public int getStatus() {
447                                         return status;
448                                 }
449
450                                 @Override
451                                 public long[] getFirst() {
452                                         return firsts;
453                                 }
454
455                                 @Override
456                                 public long[] getSecond() {
457                                         return seconds;
458                                 }
459
460                         };
461
462                 } catch (Throwable t) {
463                         throw new IllegalAcornStateException(t);
464                 } finally {
465                         clusterLRU.releaseMutex();
466                 }
467         }
468
469         public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
470                 csLRU.acquireMutex();
471
472                 try {
473                         currentChanges.add(ccs);
474                 } catch (Throwable t) {
475                         throw new IllegalAcornStateException(t);
476                 } finally {
477                         csLRU.releaseMutex();
478                 }
479         }
480
481         public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
482                 csLRU.acquireMutex();
483                 try {
484                         ArrayList<String> csids = new ArrayList<String>(currentChanges);
485                         currentChanges = new ArrayList<String>();
486                         new ChangeSetInfo(csLRU, changeSetId, data, csids);
487                 } catch (Throwable t) {
488                         throw new IllegalAcornStateException(t);
489                 } finally {
490                         csLRU.releaseMutex();
491                 }
492         }
493
494         public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
495                 
496                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
497                 if (info == null) return null;
498         info.acquireMutex();
499         try {
500             return info.getMetadataBytes();
501         } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
502             throw e;
503         } catch (Throwable t) {
504                         throw new IllegalAcornStateException(t);
505                 } finally {
506                         info.releaseMutex();
507                 }
508         }
509
510         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
511
512                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
513                 String key = uid.toString() + "_" + resourceIndex;
514                 FileInfo info = fileLRU.getWithoutMutex(key);
515                 if(info == null) return null;
516                 info.acquireMutex();
517                 try {
518                         return info.getResourceFile();
519                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
520                     throw e;
521                 } catch (Throwable t) {
522                         throw new IllegalAcornStateException(t);
523                 } finally {
524                         info.releaseMutex();
525                 }
526         }
527
528         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
529                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
530
531                 String key = uid.toString() + "_" + resourceIndex;
532                 FileInfo info = fileLRU.getWithoutMutex(key);
533                 if(info == null) return null;
534                 info.acquireMutex();
535                 try {
536                         return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
537                 } catch (Throwable t) {
538                         throw new IllegalAcornStateException(t);
539                 } finally {
540                         info.releaseMutex();
541                 }
542         }
543
544         public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
545                 try {
546                         String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
547
548                         FileInfo info = null;
549                         fileLRU.acquireMutex();
550                         try {
551                                 info = fileLRU.get(key);
552                                 if (info == null) {
553                                         info = new FileInfo(fileLRU, key, (int) (offset + size));
554                                 }
555                         } catch (Throwable t) {
556                                 throw new IllegalAcornStateException(t);
557                         } finally {
558                                 fileLRU.releaseMutex();
559                         }
560                         
561                         info.acquireMutex();
562                         try {
563                                 info.updateData(bytes, offset, pos, size);
564                         } catch (Throwable t) {
565                                 throw new IllegalAcornStateException(t);
566                         } finally {
567                                 info.releaseMutex();
568                         }
569                 } catch (DatabaseException e) {
570                         e.printStackTrace();
571                 }
572         }
573
574     public void shutdown() {
575         clusterLRU.shutdown();
576         fileLRU.shutdown();
577         streamLRU.shutdown();
578         csLRU.shutdown();
579     }
580
581     public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
582         this.safeToMakeSnapshot.compareAndSet(true, false);
583         this.cause = t;
584     }
585
586 }