]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/ClusterManager.java
Ensure GetElementClassRequest is not constructed without elementFactory
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / ClusterManager.java
1 package org.simantics.acorn;
2
3 import java.io.IOException;
4 import java.math.BigInteger;
5 import java.nio.file.DirectoryStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.HashMap;
11 import java.util.Map;
12 import java.util.concurrent.atomic.AtomicBoolean;
13
14 import org.simantics.acorn.cluster.ClusterImpl;
15 import org.simantics.acorn.exception.AcornAccessVerificationException;
16 import org.simantics.acorn.exception.IllegalAcornStateException;
17 import org.simantics.acorn.exception.InvalidHeadStateException;
18 import org.simantics.acorn.internal.ClusterSupport2;
19 import org.simantics.acorn.lru.ChangeSetInfo;
20 import org.simantics.acorn.lru.ClusterInfo;
21 import org.simantics.acorn.lru.ClusterLRU;
22 import org.simantics.acorn.lru.ClusterStreamChunk;
23 import org.simantics.acorn.lru.FileInfo;
24 import org.simantics.acorn.lru.LRU;
25 import org.simantics.db.ClusterCreator;
26 import org.simantics.db.Database.Session.ClusterIds;
27 import org.simantics.db.Database.Session.ResourceSegment;
28 import org.simantics.db.ServiceLocator;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.impl.ClusterBase;
31 import org.simantics.db.impl.ClusterI;
32 import org.simantics.db.impl.ClusterSupport;
33 import org.simantics.db.procore.cluster.ClusterTraits;
34 import org.simantics.db.service.ClusterSetsSupport;
35 import org.simantics.db.service.ClusterUID;
36 import org.simantics.utils.threads.logger.ITask;
37 import org.simantics.utils.threads.logger.ThreadLogger;
38
39 public class ClusterManager {
40
41         private ArrayList<String> currentChanges = new ArrayList<String>();
42
43         public final Path dbFolder;
44         public Path lastSessionDirectory;
45         public Path workingDirectory;
46
47         public LRU<String, ClusterStreamChunk> streamLRU;
48         public LRU<Long, ChangeSetInfo> csLRU;
49         public ClusterLRU clusterLRU;
50         public LRU<String, FileInfo> fileLRU;
51
52         public MainState mainState;
53         public HeadState state;
54
55         private long lastSnapshot = System.nanoTime();
56
57         final public ClusterSupport2 support = new ClusterSupport2(this);
58
59         /*
60          * Public interface
61          * 
62          */
63
64         public ClusterManager(Path dbFolder) {
65                 this.dbFolder = dbFolder;
66         }
67
68         public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
69                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
70                 info.acquireMutex();
71                 try {
72                         info.makeResident();
73                         return info.getCSSIds();
74                 } finally {
75                         info.releaseMutex();
76                 }
77         }
78
79         public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
80                 return clusterLRU.getClusterByClusterKey(clusterKey);
81         }
82         
83         public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
84                 return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
85         }
86
87         public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
88                 return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
89         }
90
91         public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
92                 return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
93         }
94
95         public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
96                 return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
97         }
98
99         public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
100                 return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
101         }
102         
103         public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
104                 return clusterLRU.getClusterProxyByResourceKey(resourceKey);
105         }
106
107         public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
108                 return clusterLRU.getClusterUIDByResourceKey(resourceKey);
109         }
110
111         public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
112                 return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
113         }
114
115         /*
116          * Private implementation
117          * 
118          */
119
120         private static long countFiles(Path directory) throws IOException {
121                 try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
122                         int count = 0;
123                         for (@SuppressWarnings("unused") Path p : ds)
124                                 ++count;
125                         return count;
126                 }
127         }
128
129         // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
130         private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
131         private IllegalAcornStateException cause;
132         
133         public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
134             try {
135             if (!safeToMakeSnapshot.get())
136                 throw cause;
137                 // Maximum autosave frequency is per 60s
138                 if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
139     //              System.err.println("lastSnapshot too early");
140                     return false;
141                 }
142     
143                 // Cluster files are always there 
144                 // Nothing has been written => no need to do anything
145                 long amountOfFiles = countFiles(workingDirectory);
146                 if(!fullSave && amountOfFiles < 3) {
147     //              System.err.println("amountOfFiles < 3");
148                     return false;
149                 }
150     
151                 System.err.println("makeSnapshot");
152     
153                 // Schedule writing of all data to disk
154             refreshHeadState();
155     
156                 // Wait for all files to be written
157                 clusterLRU.shutdown();
158                 fileLRU.shutdown();
159                 streamLRU.shutdown();
160                 csLRU.shutdown();
161                 
162                 // Lets check if it is still safe to make a snapshot
163                 if (!safeToMakeSnapshot.get())
164                     throw cause;
165                 
166                 persistHeadState();
167                 
168                 if (fullSave)
169                     mainState.save(dbFolder);
170     
171                 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
172                 cssi.save();
173     
174                 amountOfFiles = countFiles(workingDirectory);
175                 
176                 System.err.println(" -finished: amount of files is " + amountOfFiles);
177     
178                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
179                 if (!Files.exists(workingDirectory)) {
180                     Files.createDirectories(workingDirectory);
181                 }
182     
183                 cssi.updateWriteDirectory(workingDirectory);
184     
185                 clusterLRU.setWriteDir(workingDirectory);
186                 fileLRU.setWriteDir(workingDirectory);
187                 streamLRU.setWriteDir(workingDirectory);
188                 csLRU.setWriteDir(workingDirectory);
189     
190                 clusterLRU.resume();
191                 fileLRU.resume();
192                 streamLRU.resume();
193                 csLRU.resume();
194     
195                 lastSnapshot = System.nanoTime();
196                 
197                 return true;
198             } catch (IllegalAcornStateException e) {
199                 notSafeToMakeSnapshot(e);
200                 throw e;
201             } catch (IOException e) {
202                 IllegalAcornStateException e1 = new IllegalAcornStateException(e);
203                 notSafeToMakeSnapshot(e1);
204                 throw e1;
205             }
206         }
207         
208         private void refreshHeadState() throws IOException, IllegalAcornStateException {
209                 state.clusters.clear();
210                 state.files.clear();
211                 state.stream.clear();
212                 state.cs.clear();
213
214                 clusterLRU.persist(state.clusters);
215                 fileLRU.persist(state.files);
216                 streamLRU.persist(state.stream);
217                 csLRU.persist(state.cs);
218         }
219         
220         private void persistHeadState() throws IOException {
221                 // Sync current working directory
222                 Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
223                 state.save(workingDirectory);
224                 mainState.headDir++;
225         }
226
227         
228 //      public void save() throws IOException {
229 //
230 //              refreshHeadState();
231 //              
232 //              clusterLRU.shutdown();
233 //              fileLRU.shutdown();
234 //              streamLRU.shutdown();
235 //              csLRU.shutdown();
236 //
237 //              persistHeadState();
238 //
239 //              mainState.save(getBaseDirectory());
240
241 //              try {
242 //                      ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
243 //                      visualizer.read(new DataInputStream(new FileInputStream(
244 //                                      ThreadLogger.LOG_FILE)));
245 //                      visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
246 //                                      + ".svg"));
247 //              } catch (FileNotFoundException e) {
248 //                      // TODO Auto-generated catch block
249 //                      e.printStackTrace();
250 //              }
251
252                 // System.err.println("-- load statistics --");
253                 // for(Pair<ClusterUID, Integer> entry :
254                 // CollectionUtils.valueSortedEntries(histogram)) {
255                 // System.err.println(" " + entry.second + " " + entry.first);
256                 // }
257
258 //      }
259         
260         private void acquireAll() throws IllegalAcornStateException {
261                 clusterLRU.acquireMutex();
262                 fileLRU.acquireMutex();
263                 streamLRU.acquireMutex();
264                 csLRU.acquireMutex();
265         }
266         
267         private void releaseAll() {
268                 csLRU.releaseMutex();
269                 streamLRU.releaseMutex();
270                 fileLRU.releaseMutex();
271                 clusterLRU.releaseMutex();
272         }
273
274         private AtomicBoolean rollback = new AtomicBoolean(false);
275         
276         boolean rolledback() {
277             return rollback.get();
278         }
279         
280         public void load() throws IOException {
281
282                 // Main state
283                 mainState = MainState.load(dbFolder, t -> rollback.set(true));
284
285                 lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
286                 
287                 // Head State
288                 try {
289             state = HeadState.load(lastSessionDirectory);
290         } catch (InvalidHeadStateException e) {
291             // For backwards compatibility only!
292             Throwable cause = e.getCause();
293             if (cause instanceof Throwable) {
294                 try {
295                     org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
296                     
297                     HeadState newState = new HeadState();
298                     newState.clusters = oldState.clusters;
299                     newState.cs = oldState.cs;
300                     newState.files = oldState.files;
301                     newState.stream = oldState.stream;
302                     newState.headChangeSetId = oldState.headChangeSetId;
303                     newState.reservedIds = oldState.reservedIds;
304                     newState.transactionId = oldState.transactionId;
305                     state = newState;
306                 } catch (InvalidHeadStateException e1) {
307                     throw new IOException("Could not load HeadState due to corruption", e1);
308                 }
309             } else {
310                 // This should never happen as MainState.load() checks the integrity
311                 // of head.state files and rolls back in cases of corruption until a
312                 // consistent state is found (could be case 0 - initial db state)
313                 // IF this does happen something is completely wrong
314                 throw new IOException("Could not load HeadState due to corruption", e);
315             }
316         }
317                 try {
318                 workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
319                 Files.createDirectories(workingDirectory);
320     
321                 csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
322                 streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
323                 clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
324                 fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
325     
326                 acquireAll();
327                 
328                 // Clusters
329                 for (String clusterKey : state.clusters) {
330                         String[] parts1 = clusterKey.split("#");
331                         String[] parts = parts1[0].split("\\.");
332                         long first = new BigInteger(parts[0], 16).longValue();
333                         long second = new BigInteger(parts[1], 16).longValue();
334                         ClusterUID uuid = ClusterUID.make(first, second);
335                         Path readDir = dbFolder.resolve(parts1[1]);
336                         int offset = Integer.parseInt(parts1[2]);
337                         int length = Integer.parseInt(parts1[3]);
338                         clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
339                 }
340                 // Files
341                 for (String fileKey : state.files) {
342     //                  System.err.println("loadFile: " + fileKey);
343                         String[] parts = fileKey.split("#");
344                         Path readDir = dbFolder.resolve(parts[1]);
345                         int offset = Integer.parseInt(parts[2]);
346                         int length = Integer.parseInt(parts[3]);
347                         FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
348                         fileLRU.map(info);
349                 }
350                 // Update chunks
351                 for (String fileKey : state.stream) {
352     //                  System.err.println("loadStream: " + fileKey);
353                         String[] parts = fileKey.split("#");
354                         Path readDir = dbFolder.resolve(parts[1]);
355                         int offset = Integer.parseInt(parts[2]);
356                         int length = Integer.parseInt(parts[3]);
357                         ClusterStreamChunk info = new ClusterStreamChunk(this,
358                                         streamLRU, readDir, parts[0], offset, length);
359                         streamLRU.map(info);
360                 }
361                 // Change sets
362                 for (String fileKey : state.cs) {
363                         String[] parts = fileKey.split("#");
364                         Path readDir = dbFolder.resolve(parts[1]);
365                         Long revisionId = Long.parseLong(parts[0]);
366                         int offset = Integer.parseInt(parts[2]);
367                         int length = Integer.parseInt(parts[3]);
368                         ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
369                         csLRU.map(info);
370                 }
371                 
372                 releaseAll();
373                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
374                     // ROLLBACK ONE DIR UNTIL WE ARE FINE!
375                     throw new IOException(e);
376                 }
377         }
378
379         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
380                 
381                 clusterLRU.ensureUpdates(uid);
382                 
383                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
384                 return info.clone(uid, creator);
385         }
386
387         //private int loadCounter = 0;
388
389         public static void startLog(String msg) {
390                 tasks.put(msg, ThreadLogger.getInstance().begin(msg));
391         }
392
393         public static void endLog(String msg) {
394                 ITask task = tasks.get(msg);
395                 if (task != null)
396                         task.finish();
397         }
398
399         static Map<String, ITask> tasks = new HashMap<String, ITask>();
400
401         public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
402                 ClusterInfo info = clusterLRU.getWithoutMutex(uid);
403                 info.acquireMutex();
404                 try {
405                         info.update(clu);
406                 } finally {
407                         info.releaseMutex();
408                 }
409         }
410
411         public long getClusterIdOrCreate(ClusterUID clusterUID) {
412                 return 1;
413         }
414
415         public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
416                 return clusterLRU.getResourceKey(uid, index);
417         }
418
419         public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
420                 return clusterLRU.getResourceKeyWithoutMutex(uid, index);
421         }
422
423         public ClusterIds getClusterIds() throws IllegalAcornStateException {
424                 clusterLRU.acquireMutex();
425
426                 try {
427                         Collection<ClusterInfo> infos = clusterLRU.values();
428                         final int status = infos.size();
429                         final long[] firsts = new long[status];
430                         final long[] seconds = new long[status];
431
432                         int index = 0;
433                         for (ClusterInfo info : infos) {
434                                 firsts[index] = 0;
435                                 seconds[index] = info.getKey().second;
436                                 index++;
437                         }
438
439                         return new ClusterIds() {
440
441                                 @Override
442                                 public int getStatus() {
443                                         return status;
444                                 }
445
446                                 @Override
447                                 public long[] getFirst() {
448                                         return firsts;
449                                 }
450
451                                 @Override
452                                 public long[] getSecond() {
453                                         return seconds;
454                                 }
455
456                         };
457
458                 } catch (Throwable t) {
459                         throw new IllegalAcornStateException(t);
460                 } finally {
461                         clusterLRU.releaseMutex();
462                 }
463         }
464
465         public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
466                 csLRU.acquireMutex();
467
468                 try {
469                         currentChanges.add(ccs);
470                 } catch (Throwable t) {
471                         throw new IllegalAcornStateException(t);
472                 } finally {
473                         csLRU.releaseMutex();
474                 }
475         }
476
477         public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
478                 csLRU.acquireMutex();
479                 try {
480                         ArrayList<String> csids = new ArrayList<String>(currentChanges);
481                         currentChanges = new ArrayList<String>();
482                         new ChangeSetInfo(csLRU, changeSetId, data, csids);
483                 } catch (Throwable t) {
484                         throw new IllegalAcornStateException(t);
485                 } finally {
486                         csLRU.releaseMutex();
487                 }
488         }
489
490         public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
491                 
492                 ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
493                 if (info == null) return null;
494         info.acquireMutex();
495         try {
496             return info.getMetadataBytes();
497         } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
498             throw e;
499         } catch (Throwable t) {
500                         throw new IllegalAcornStateException(t);
501                 } finally {
502                         info.releaseMutex();
503                 }
504         }
505
506         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
507
508                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
509                 String key = uid.toString() + "_" + resourceIndex;
510                 FileInfo info = fileLRU.getWithoutMutex(key);
511                 if(info == null) return null;
512                 info.acquireMutex();
513                 try {
514                         return info.getResourceFile();
515                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
516                     throw e;
517                 } catch (Throwable t) {
518                         throw new IllegalAcornStateException(t);
519                 } finally {
520                         info.releaseMutex();
521                 }
522         }
523
524         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
525                 ClusterUID uid = ClusterUID.make(clusterUID, 0);
526
527                 String key = uid.toString() + "_" + resourceIndex;
528                 FileInfo info = fileLRU.getWithoutMutex(key);
529                 if(info == null) return null;
530                 info.acquireMutex();
531                 try {
532                         return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
533                 } catch (Throwable t) {
534                         throw new IllegalAcornStateException(t);
535                 } finally {
536                         info.releaseMutex();
537                 }
538         }
539
540         public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
541                 try {
542                         String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
543
544                         FileInfo info = null;
545                         fileLRU.acquireMutex();
546                         try {
547                                 info = fileLRU.get(key);
548                                 if (info == null) {
549                                         info = new FileInfo(fileLRU, key, (int) (offset + size));
550                                 }
551                         } catch (Throwable t) {
552                                 throw new IllegalAcornStateException(t);
553                         } finally {
554                                 fileLRU.releaseMutex();
555                         }
556                         
557                         info.acquireMutex();
558                         try {
559                                 info.updateData(bytes, offset, pos, size);
560                         } catch (Throwable t) {
561                                 throw new IllegalAcornStateException(t);
562                         } finally {
563                                 info.releaseMutex();
564                         }
565                 } catch (DatabaseException e) {
566                         e.printStackTrace();
567                 }
568         }
569
570     public void shutdown() {
571         clusterLRU.shutdown();
572         fileLRU.shutdown();
573         streamLRU.shutdown();
574         csLRU.shutdown();
575     }
576
577     public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
578         this.safeToMakeSnapshot.compareAndSet(true, false);
579         this.cause = t;
580     }
581
582 }