]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java
5b8e5abb8fb8589d08bcd5e0a4b03577e8c078ba
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / ClusterManager.java
1 package org.simantics.acorn;
2
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.HashMap;
11 import java.util.Map;
12
13 import org.simantics.acorn.cluster.ClusterImpl;
14 import org.simantics.acorn.internal.ClusterSupport2;
15 import org.simantics.acorn.lru.ChangeSetInfo;
16 import org.simantics.acorn.lru.ClusterInfo;
17 import org.simantics.acorn.lru.ClusterLRU;
18 import org.simantics.acorn.lru.ClusterStreamChunk;
19 import org.simantics.acorn.lru.FileInfo;
20 import org.simantics.acorn.lru.LRU;
21 import org.simantics.db.ClusterCreator;
22 import org.simantics.db.ServiceLocator;
23 import org.simantics.db.Database.Session.ClusterIds;
24 import org.simantics.db.Database.Session.ResourceSegment;
25 import org.simantics.db.exception.DatabaseException;
26 import org.simantics.db.impl.ClusterBase;
27 import org.simantics.db.impl.ClusterI;
28 import org.simantics.db.impl.ClusterSupport;
29 import org.simantics.db.procore.cluster.ClusterTraits;
30 import org.simantics.db.server.ProCoreException;
31 import org.simantics.db.service.ClusterSetsSupport;
32 import org.simantics.db.service.ClusterUID;
33 import org.simantics.utils.threads.logger.ITask;
34 import org.simantics.utils.threads.logger.ThreadLogger;
35
36 public class ClusterManager {
37
38         private ArrayList<String> currentChanges = new ArrayList<String>();
39
40         public final Path dbFolder;
41         public Path lastSessionDirectory;
42         public Path workingDirectory;
43
44         public LRU<String, ClusterStreamChunk> streamLRU;
45         public LRU<Long, ChangeSetInfo> csLRU;
46         public ClusterLRU clusterLRU;
47         public LRU<String, FileInfo> fileLRU;
48
49         public MainState mainState;
50         public HeadState state;
51
52         private long lastSnapshot = System.nanoTime();
53
54         final public ClusterSupport2 support = new ClusterSupport2(this);
55
56         /*
57          * Public interface
58          * 
59          */
60
61         public ClusterManager(Path dbFolder) {
62                 this.dbFolder = dbFolder;
63         }
64
65         public ArrayList<String> getChanges(long changeSetId) {
66                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
67                 info.acquireMutex();
68                 try {
69                         info.makeResident();
70                         return info.getCSSIds();
71                 } catch (Throwable t) {
72                         throw new IllegalStateException(t);
73                 } finally {
74                         info.releaseMutex();
75                 }
76         }
77
78         public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException {
79                 return clusterLRU.getClusterByClusterKey(clusterKey);
80         }
81         
82         public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException {
83                 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
84         }
85
86         public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException {
87                 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
88         }
89
90         public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) {
91                 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
92         }
93
94         public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) {
95                 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
96         }
97
98         public int getClusterKeyByUID(long id1, long id2) throws DatabaseException {
99                 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
100         }
101         
102         public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException {
103                 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
104         }
105
106         public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException {
107                 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
108         }
109
110         public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException {
111                 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
112         }
113
114         /*
115          * Private implementation
116          * 
117          */
118
119         private static long countFiles(Path directory) throws IOException {
120                 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
121                         int count = 0;
122                         for (@SuppressWarnings("unused") Path p : ds)
123                                 ++count;
124                         return count;
125                 }
126         }
127
128         public synchronized boolean makeSnapshot(ServiceLocator locator, boolean force) throws IOException {
129
130                 // Maximum autosave frequency is per 60s
131                 if(!force && System.nanoTime() - lastSnapshot < 10*1000000000L) {
132 //                  System.err.println("lastSnapshot too early");
133                     return false;
134                 }
135
136                 // Cluster files are always there 
137                 // Nothing has been written => no need to do anything
138                 long amountOfFiles = countFiles(workingDirectory);
139                 if(!force && amountOfFiles < 3) {
140 //                  System.err.println("amountOfFiles < 3");
141                     return false;
142                 }
143
144                 System.err.println("makeSnapshot");
145
146                 // Schedule writing of all data to disk
147                 refreshHeadState();
148
149                 // Wait for all files to be written
150                 clusterLRU.shutdown();
151                 fileLRU.shutdown();
152                 streamLRU.shutdown();
153                 csLRU.shutdown();
154                 
155                 persistHeadState();
156                 
157                 mainState.save(dbFolder);
158
159                 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
160                 cssi.save();
161
162                 amountOfFiles = countFiles(workingDirectory);
163                 
164                 System.err.println(" -finished: amount of files is " + amountOfFiles);
165
166                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
167                 if (!Files.exists(workingDirectory)) {
168                     Files.createDirectories(workingDirectory);
169                 }
170
171                 cssi.updateReadAndWriteDirectories(lastSessionDirectory, workingDirectory);
172
173                 clusterLRU.setWriteDir(workingDirectory);
174                 fileLRU.setWriteDir(workingDirectory);
175                 streamLRU.setWriteDir(workingDirectory);
176                 csLRU.setWriteDir(workingDirectory);
177
178                 clusterLRU.resume();
179                 fileLRU.resume();
180                 streamLRU.resume();
181                 csLRU.resume();
182
183                 lastSnapshot = System.nanoTime();
184                 
185                 return true;
186                 
187         }
188         
189         public void refreshHeadState() throws IOException {
190
191                 state.clusters.clear();
192                 state.files.clear();
193                 state.stream.clear();
194                 state.cs.clear();
195
196                 clusterLRU.persist(state.clusters);
197                 fileLRU.persist(state.files);
198                 streamLRU.persist(state.stream);
199                 csLRU.persist(state.cs);
200
201         }
202         
203         public void persistHeadState() throws IOException {
204
205                 // Sync current working directory
206                 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
207                 state.save(workingDirectory);
208                 mainState.headDir++;
209         }
210
211         
212 //      public void save() throws IOException {
213 //
214 //              refreshHeadState();
215 //              
216 //              clusterLRU.shutdown();
217 //              fileLRU.shutdown();
218 //              streamLRU.shutdown();
219 //              csLRU.shutdown();
220 //
221 //              persistHeadState();
222 //
223 //              mainState.save(getBaseDirectory());
224
225 //              try {
226 //                      ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
227 //                      visualizer.read(new DataInputStream(new FileInputStream(
228 //                                      ThreadLogger.LOG_FILE)));
229 //                      visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
230 //                                      + ".svg"));
231 //              } catch (FileNotFoundException e) {
232 //                      // TODO Auto-generated catch block
233 //                      e.printStackTrace();
234 //              }
235
236                 // System.err.println("-- load statistics --");
237                 // for(Pair<ClusterUID, Integer> entry :
238                 // CollectionUtils.valueSortedEntries(histogram)) {
239                 // System.err.println(" " + entry.second + " " + entry.first);
240                 // }
241
242 //      }
243         
244         private void acquireAll() {
245                 clusterLRU.acquireMutex();
246                 fileLRU.acquireMutex();
247                 streamLRU.acquireMutex();
248                 csLRU.acquireMutex();
249         }
250         
251         private void releaseAll() {
252                 csLRU.releaseMutex();
253                 streamLRU.releaseMutex();
254                 fileLRU.releaseMutex();
255                 clusterLRU.releaseMutex();
256         }
257
258         public void load() throws IOException {
259
260                 // Main state
261                 mainState = MainState.load(dbFolder);
262
263                 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
264                 
265                 // Head State
266                 try {
267             state = HeadState.load(lastSessionDirectory);
268         } catch (InvalidHeadStateException e) {
269             // For backwards compatibility only!
270             Throwable cause = e.getCause();
271             if (cause instanceof Throwable) {
272                 try {
273                     org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
274                     
275                     HeadState newState = new HeadState();
276                     newState.clusters = oldState.clusters;
277                     newState.cs = oldState.cs;
278                     newState.files = oldState.files;
279                     newState.stream = oldState.stream;
280                     newState.headChangeSetId = oldState.headChangeSetId;
281                     newState.reservedIds = oldState.reservedIds;
282                     newState.transactionId = oldState.transactionId;
283                     state = newState;
284                 } catch (InvalidHeadStateException e1) {
285                     throw new IOException("Could not load HeadState due to corruption", e1);
286                 }
287             } else {
288                 // This should never happen as MainState.load() checks the integrity
289                 // of head.state files and rolls back in cases of corruption until a
290                 // consistent state is found (could be case 0 - initial db state)
291                 // IF this does happen something is completely wrong
292                 throw new IOException("Could not load HeadState due to corruption", e);
293             }
294         }
295
296                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
297                 Files.createDirectories(workingDirectory);
298
299                 csLRU = new LRU<Long, ChangeSetInfo>("Change Set", workingDirectory);
300                 streamLRU = new LRU<String, ClusterStreamChunk>("Cluster Stream", workingDirectory);
301                 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
302                 fileLRU = new LRU<String, FileInfo>("External Value", workingDirectory);
303
304                 acquireAll();
305                 
306                 // Clusters
307                 for (String clusterKey : state.clusters) {
308                         String[] parts1 = clusterKey.split("#");
309                         String[] parts = parts1[0].split("\\.");
310                         long first = new BigInteger(parts[0], 16).longValue();
311                         long second = new BigInteger(parts[1], 16).longValue();
312                         ClusterUID uuid = ClusterUID.make(first, second);
313                         Path readDir = dbFolder.resolve(parts1[1]);
314                         int offset = Integer.parseInt(parts1[2]);
315                         int length = Integer.parseInt(parts1[3]);
316                         clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
317                 }
318                 // Files
319                 for (String fileKey : state.files) {
320 //                      System.err.println("loadFile: " + fileKey);
321                         String[] parts = fileKey.split("#");
322                         Path readDir = dbFolder.resolve(parts[1]);
323                         int offset = Integer.parseInt(parts[2]);
324                         int length = Integer.parseInt(parts[3]);
325                         FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
326                         fileLRU.map(info);
327                 }
328                 // Update chunks
329                 for (String fileKey : state.stream) {
330 //                      System.err.println("loadStream: " + fileKey);
331                         String[] parts = fileKey.split("#");
332                         Path readDir = dbFolder.resolve(parts[1]);
333                         int offset = Integer.parseInt(parts[2]);
334                         int length = Integer.parseInt(parts[3]);
335                         ClusterStreamChunk info = new ClusterStreamChunk(this,
336                                         streamLRU, readDir, parts[0], offset, length);
337                         streamLRU.map(info);
338                 }
339                 // Change sets
340                 for (String fileKey : state.cs) {
341                         String[] parts = fileKey.split("#");
342                         Path readDir = dbFolder.resolve(parts[1]);
343                         Long revisionId = Long.parseLong(parts[0]);
344                         int offset = Integer.parseInt(parts[2]);
345                         int length = Integer.parseInt(parts[3]);
346                         ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
347                         csLRU.map(info);
348                 }
349                 
350                 releaseAll();
351
352         }
353
354         public <T> T clone(ClusterUID uid, ClusterCreator creator)
355                         throws DatabaseException {
356                 
357                 clusterLRU.ensureUpdates(uid);
358                 
359                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
360                 return info.clone(uid, creator);
361
362         }
363
364         //private int loadCounter = 0;
365
366         public static void startLog(String msg) {
367                 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
368         }
369
370         public static void endLog(String msg) {
371                 ITask task = tasks.get(msg);
372                 if (task != null)
373                         task.finish();
374         }
375
376         static Map<String, ITask> tasks = new HashMap<String, ITask>();
377
378         public void update(ClusterUID uid, ClusterImpl clu) {
379
380                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
381                 info.acquireMutex();
382                 try {
383                         info.update(clu);
384                 } catch (Throwable t) {
385                         throw new IllegalStateException(t);
386                 } finally {
387                         info.releaseMutex();
388                 }
389                 
390         }
391
392         public long getClusterIdOrCreate(ClusterUID clusterUID) {
393                 return 1;
394         }
395
396         public int getResourceKey(ClusterUID uid, int index) {
397                 return clusterLRU.getResourceKey(uid, index);
398         }
399
400         public int getResourceKeyWitoutMutex(ClusterUID uid, int index) {
401                 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
402         }
403
404         public ClusterIds getClusterIds() throws ProCoreException {
405
406                 clusterLRU.acquireMutex();
407
408                 try {
409
410                         Collection<ClusterInfo> infos = clusterLRU.values();
411                         final int status = infos.size();
412                         final long[] firsts = new long[status];
413                         final long[] seconds = new long[status];
414
415                         int index = 0;
416                         for (ClusterInfo info : infos) {
417                                 firsts[index] = 0;
418                                 seconds[index] = info.getKey().second;
419                                 index++;
420                         }
421
422                         return new ClusterIds() {
423
424                                 @Override
425                                 public int getStatus() {
426                                         return status;
427                                 }
428
429                                 @Override
430                                 public long[] getFirst() {
431                                         return firsts;
432                                 }
433
434                                 @Override
435                                 public long[] getSecond() {
436                                         return seconds;
437                                 }
438
439                         };
440
441                 } catch (Throwable t) {
442                         throw new IllegalStateException(t);
443                 } finally {
444                         clusterLRU.releaseMutex();
445                 }
446
447         }
448
449         public void addIntoCurrentChangeSet(String ccs) {
450                 
451                 csLRU.acquireMutex();
452
453                 try {
454                         
455                         currentChanges.add(ccs);
456                         
457                 } catch (Throwable t) {
458                         throw new IllegalStateException(t);
459                 } finally {
460                         
461                         csLRU.releaseMutex();
462                         
463                 }
464
465         }
466
467         public void commitChangeSet(long changeSetId, byte[] data) {
468                 csLRU.acquireMutex();
469                 try {
470                         ArrayList<String> csids = new ArrayList<String>(currentChanges);
471                         currentChanges = new ArrayList<String>();
472                         new ChangeSetInfo(csLRU, changeSetId, data, csids);
473                 } catch (Throwable t) {
474                         throw new IllegalStateException(t);
475                 } finally {
476                         csLRU.releaseMutex();
477                 }
478         }
479
480         public byte[] getMetadata(long changeSetId) {
481                 
482                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
483                 if (info == null) return null;
484                 info.acquireMutex();
485                 try {
486                         return info.getMetadataBytes();
487                 } catch (Throwable t) {
488                         throw new IllegalStateException(t);
489                 } finally {
490                         info.releaseMutex();
491                 }
492                 
493         }
494
495         public byte[] getResourceFile(final byte[] clusterUID,
496                         final int resourceIndex) throws ProCoreException {
497
498                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
499                 String key = uid.toString() + "_" + resourceIndex;
500                 FileInfo info = fileLRU.getWithoutMutex(key);
501                 if(info == null) return null;
502                 info.acquireMutex();
503                 try {
504                         return info.getResourceFile();
505                 } catch (Throwable t) {
506                         throw new IllegalStateException(t);
507                 } finally {
508                         info.releaseMutex();
509                 }
510
511         }
512
513         public ResourceSegment getResourceSegment(final byte[] clusterUID,
514                         final int resourceIndex, final long segmentOffset, short segmentSize)
515                         throws ProCoreException {
516
517                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
518
519                 String key = uid.toString() + "_" + resourceIndex;
520                 FileInfo info = fileLRU.getWithoutMutex(key);
521                 if(info == null) return null;
522                 info.acquireMutex();
523                 try {
524                         return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
525                 } catch (Throwable t) {
526                         throw new IllegalStateException(t);
527                 } finally {
528                         info.releaseMutex();
529                 }
530
531         }
532
533         public void modiFileEx(ClusterUID uid, int resourceKey, long offset,
534                         long size, byte[] bytes, long pos, ClusterSupport support) {
535
536                 try {
537
538                         String key = uid.toString()
539                                         + "_"
540                                         + ClusterTraits
541                                                         .getResourceIndexFromResourceKey(resourceKey);
542
543                         FileInfo info = null;
544
545                         fileLRU.acquireMutex();
546
547                         try {
548
549                                 info = fileLRU.get(key);
550                                 if (info == null)
551                                         info = new FileInfo(fileLRU, key, (int) (offset + size));
552                                 
553                                 
554                         } catch (Throwable t) {
555                                 throw new IllegalStateException(t);
556                         } finally {
557                                 
558                                 fileLRU.releaseMutex();
559                                 
560                         }
561                         
562                         info.acquireMutex();
563                         try {
564                                 info.updateData(bytes, offset, pos, size);
565                         } catch (Throwable t) {
566                                 throw new IllegalStateException(t);
567                         } finally {
568                                 info.releaseMutex();
569                         }
570
571                 } catch (DatabaseException e) {
572                         e.printStackTrace();
573                 }
574
575         }
576
577     public void shutdown() {
578         clusterLRU.shutdown();
579         fileLRU.shutdown();
580         streamLRU.shutdown();
581         csLRU.shutdown();
582     }
583
584 }