]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
251a80314c32f9dc37f1565597cf061070280f11
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.simantics.acorn.MainProgram.MainProgramRunnable;
30 import org.simantics.acorn.backup.AcornBackupProvider;
31 import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
32 import org.simantics.acorn.exception.AcornAccessVerificationException;
33 import org.simantics.acorn.exception.IllegalAcornStateException;
34 import org.simantics.acorn.internal.ClusterChange;
35 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
36 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
37 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
38 import org.simantics.acorn.lru.ClusterInfo;
39 import org.simantics.acorn.lru.ClusterStreamChunk;
40 import org.simantics.acorn.lru.ClusterUpdateOperation;
41 import org.simantics.backup.BackupException;
42 import org.simantics.db.ClusterCreator;
43 import org.simantics.db.Database;
44 import org.simantics.db.ServiceLocator;
45 import org.simantics.db.exception.DatabaseException;
46 import org.simantics.db.exception.SDBException;
47 import org.simantics.db.server.ProCoreException;
48 import org.simantics.db.service.ClusterSetsSupport;
49 import org.simantics.db.service.ClusterUID;
50 import org.simantics.db.service.EventSupport;
51 import org.simantics.db.service.LifecycleSupport;
52 import org.simantics.utils.DataContainer;
53 import org.simantics.utils.datastructures.Pair;
54 import org.simantics.utils.logging.TimeLogger;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import fi.vtt.simantics.procore.internal.EventSupportImpl;
59 import gnu.trove.map.hash.TLongObjectHashMap;
60
61 public class GraphClientImpl2 implements Database.Session {
62
63     private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
64         public static final boolean DEBUG = false;
65         
66         public static final String CLOSE = "close";
67         public static final String PURGE = "purge";
68
69         final ClusterManager clusters;
70
71         private TransactionManager transactionManager = new TransactionManager();
72         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
73         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
74
75         private Path dbFolder;
76         private final Database database;
77         private ServiceLocator locator;
78         private FileCache fileCache;
79         private MainProgram mainProgram;
80         private EventSupportImpl eventSupport;
81
82         private static class ClientThreadFactory implements ThreadFactory {
83
84                 final String name;
85                 final boolean daemon;
86
87                 public ClientThreadFactory(String name, boolean daemon) {
88                         this.name = name;
89                         this.daemon = daemon;
90                 }
91
92                 @Override
93                 public Thread newThread(Runnable r) {
94                         Thread thread = new Thread(r, name);
95                         thread.setDaemon(daemon);
96                         return thread;
97                 }
98         }
99
100         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
101             this.database = database;
102             this.dbFolder = dbFolder;
103             this.locator = locator;
104             this.fileCache = new FileCache();
105             // This disposes the cache when the session is shut down 
106             locator.registerService(FileCache.class, fileCache);
107             this.clusters = new ClusterManager(dbFolder, fileCache);
108             load();
109             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
110             cssi.setReadDirectory(clusters.lastSessionDirectory);
111             cssi.updateWriteDirectory(clusters.workingDirectory);
112             mainProgram = new MainProgram(this, clusters);
113             executor.execute(mainProgram);
114             eventSupport = (EventSupportImpl)locator.getService(EventSupport.class);
115             
116         }
117
118         public Path getDbFolder() {
119             return dbFolder;
120         }
121
122         /*
123          * This method schedules snapshotting.
124          * No lock and thread restrictions.
125          */
126         void tryMakeSnapshot() throws IOException {
127
128                 if (isClosing || unexpectedClose)
129                         return;
130
131                 saver.execute(new Runnable() {
132
133                         @Override
134                         public void run() {
135                                 Transaction tr = null;
136                                 try {
137                                         // First take a write transaction
138                                         tr = askWriteTransaction(-1);
139                                         // Then make sure that MainProgram is idling
140                                         synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
141                                 } catch (IllegalAcornStateException | ProCoreException e) {
142                                         LOGGER.error("Snapshotting failed", e);
143                                         unexpectedClose = true;
144                                 } catch (SDBException e) {
145                                         LOGGER.error("Snapshotting failed", e);
146                                         unexpectedClose = true;
147                                 } finally {
148                                         try {
149                                                 if(tr != null)
150                                                         endTransaction(tr.getTransactionId());
151                                                 if (unexpectedClose) {
152                                                         LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
153                                                         try {
154                                                                 support.close();
155                                                         } catch (DatabaseException e1) {
156                                                                 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
157                                                         }
158                                                 }
159                                         } catch (ProCoreException e) {
160                                             LOGGER.error("Failed to end snapshotting write transaction", e);
161                                         }
162                                 }
163                         }
164                 });
165         }
166
167         private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
168                 clusters.makeSnapshot(locator, fullSave);
169         }
170
171         @Override
172         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
173             try {
174             return clusters.clone(uid, creator);
175         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
176             unexpectedClose = true;
177             throw new DatabaseException(e);
178         }
179         }
180
181         private void load() throws IOException {
182                 clusters.load();
183         }
184
185         @Override
186         public Database getDatabase() {
187                 return database;
188         }
189
190         private boolean closed = false;
191         private boolean isClosing = false;
192         private boolean unexpectedClose = false;
193
194         @Override
195         public void close() throws ProCoreException {
196                 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
197                 if(!closed && !isClosing) {
198                         isClosing = true;
199                         try {
200
201                                 if (!unexpectedClose)
202                                         synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
203
204                                 mainProgram.close();
205                                 clusters.shutdown();
206                                 executor.shutdown();
207                                 saver.shutdown();
208
209                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
210                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
211
212                                 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
213
214                                 try {
215                                         clusters.mainState.save(dbFolder);
216                                 } catch (IOException e) {
217                                         LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
218                                 }
219
220                                 mainProgram = null;
221                                 executor = null;
222                                 saver = null;
223
224                         } catch (IllegalAcornStateException | InterruptedException e) {
225                                 throw new ProCoreException(e);
226                         } catch (SDBException e1) {
227                                 throw new ProCoreException(e1);
228                         }
229                         closed = true;
230                         eventSupport.fireEvent(CLOSE, null);
231                 }
232                 //impl.close();
233         }
234
235         @Override
236         public void open() throws ProCoreException {
237                 throw new UnsupportedOperationException();
238         }
239
240         @Override
241         public boolean isClosed() throws ProCoreException {
242                 return closed;
243         }
244
245         @Override
246         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
247                 clusters.state.headChangeSetId++;
248                 long committedChangeSetId = changeSetId + 1;
249                 try {
250                         clusters.commitChangeSet(committedChangeSetId, metadata);
251                         clusters.state.transactionId = transactionId;
252                         mainProgram.committed();
253                         TimeLogger.log("Accepted commit");
254                 } catch (IllegalAcornStateException e) {
255                     throw new ProCoreException(e);
256                 }
257         }
258
259         @Override
260         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
261                 // Accept and finalize current transaction and then undo it
262                 acceptCommit(transactionId, changeSetId, metadata);
263
264                 try {
265                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
266                         clusters.state.headChangeSetId++;
267                         return clusters.state.headChangeSetId;
268                 } catch (SDBException e) {
269                         LOGGER.error("Failed to undo cancelled transaction", e);
270                         throw new ProCoreException(e);
271                 }
272         }
273
274         @Override
275         public Transaction askReadTransaction() throws ProCoreException {
276                 return transactionManager.askReadTransaction();
277         }
278
279         private enum TransactionState {
280                 IDLE,WRITE,READ
281         }
282
283         private class TransactionRequest {
284                 public TransactionState state;
285                 public Semaphore semaphore;
286                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
287                         this.state = state;
288                         this.semaphore = semaphore;
289                 }
290         }
291
292         private class TransactionManager {
293
294                 private TransactionState currentTransactionState = TransactionState.IDLE;
295
296                 private int reads = 0;
297
298                 private LinkedList<TransactionRequest> requests = new LinkedList<>();
299
300                 private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
301
302                 private synchronized Transaction makeTransaction(TransactionRequest req) {
303
304                         final int csId = clusters.state.headChangeSetId;
305                         final long trId = clusters.state.transactionId+1;
306                         requestMap.put(trId, req);
307                         return new Transaction() {
308
309                                 @Override
310                                 public long getTransactionId() {
311                                         return trId;
312                                 }
313
314                                 @Override
315                                 public long getHeadChangeSetId() {
316                                         return csId;
317                                 }
318                         };
319                 }
320
321                 /*
322                  * This method cannot be synchronized since it waits and must support multiple entries
323                  * by query thread(s) and internal transactions such as snapshot saver
324                  */
325                 private Transaction askReadTransaction() throws ProCoreException {
326
327                         Semaphore semaphore = new Semaphore(0);
328
329                         TransactionRequest req = queue(TransactionState.READ, semaphore);
330
331                         try {
332                                 semaphore.acquire();
333                         } catch (InterruptedException e) {
334                                 throw new ProCoreException(e);
335                         }
336
337                         return makeTransaction(req);
338
339                 }
340
341                 private synchronized void dispatch() {
342                         TransactionRequest r = requests.removeFirst();
343                         if(r.state == TransactionState.READ) reads++;
344                         r.semaphore.release();
345                 }
346
347                 private synchronized void processRequests() {
348
349                         while(true) {
350
351                                 if(requests.isEmpty()) return;
352                                 TransactionRequest req = requests.peek();
353
354                                 if(currentTransactionState == TransactionState.IDLE) {
355
356                                         // Accept anything while IDLE
357                                         currentTransactionState = req.state;
358                                         dispatch();
359
360                                 } else if (currentTransactionState == TransactionState.READ) {
361
362                                         if(req.state == currentTransactionState) {
363
364                                                 // Allow other reads
365                                                 dispatch();
366
367                                         } else {
368
369                                                 // Wait
370                                                 return;
371
372                                         }
373
374                                 }  else if (currentTransactionState == TransactionState.WRITE) {
375
376                                         // Wait
377                                         return;
378
379                                 }
380
381                         }
382
383                 }
384
385                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
386                         TransactionRequest req = new TransactionRequest(state, semaphore);
387                         requests.addLast(req);
388                         processRequests();
389                         return req;
390                 }
391
392                 /*
393                  * This method cannot be synchronized since it waits and must support multiple entries
394                  * by query thread(s) and internal transactions such as snapshot saver
395                  */
396                 private Transaction askWriteTransaction() throws IllegalAcornStateException {
397
398                         Semaphore semaphore = new Semaphore(0);
399                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
400
401                         try {
402                                 semaphore.acquire();
403                         } catch (InterruptedException e) {
404                                 throw new IllegalAcornStateException(e);
405                         }
406                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
407                         return makeTransaction(req);
408                 }
409
410                 private synchronized long endTransaction(long transactionId) throws ProCoreException {
411
412                         TransactionRequest req = requestMap.remove(transactionId);
413                         if(req.state == TransactionState.WRITE) {
414                                 currentTransactionState = TransactionState.IDLE;
415                                 processRequests();
416                         } else {
417                                 reads--;
418                                 if(reads == 0) {
419                                         currentTransactionState = TransactionState.IDLE;
420                                         processRequests();
421                                 }
422                         }
423                         return clusters.state.transactionId;
424                 }
425
426         }
427
428         @Override
429         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
430                 try {
431                     if (isClosing || unexpectedClose || closed) {
432                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
433                     }
434             return transactionManager.askWriteTransaction();
435         } catch (IllegalAcornStateException e) {
436             throw new ProCoreException(e);
437         }
438         }
439
440         @Override
441         public long endTransaction(long transactionId) throws ProCoreException {
442                 return transactionManager.endTransaction(transactionId);
443         }
444
445         @Override
446         public String execute(String command) throws ProCoreException {
447                 // This is called only by WriteGraphImpl.commitAccessorChanges
448                 // We can ignore this in Acorn
449                 return "";
450         }
451
452         @Override
453         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
454                 try {
455             return clusters.getMetadata(changeSetId);
456         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
457             throw new ProCoreException(e);
458         }
459         }
460
461         @Override
462         public ChangeSetData getChangeSetData(long minChangeSetId,
463                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
464                         throws ProCoreException {
465
466                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
467                 return null;
468
469         }
470
471         @Override
472         public ChangeSetIds getChangeSetIds() throws ProCoreException {
473                 throw new UnsupportedOperationException();
474         }
475
476         @Override
477         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
478                 throw new UnsupportedOperationException();
479         }
480
481         @Override
482         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
483                         throws ProCoreException {
484                 throw new UnsupportedOperationException();
485         }
486
487         @Override
488         public ClusterIds getClusterIds() throws ProCoreException {
489                 try {
490             return clusters.getClusterIds();
491         } catch (IllegalAcornStateException e) {
492             throw new ProCoreException(e);
493         }
494         }
495
496         @Override
497         public Information getInformation() throws ProCoreException {
498                 return new Information() {
499
500                         @Override
501                         public String getServerId() {
502                                 return "server";
503                         }
504
505                         @Override
506                         public String getProtocolId() {
507                                 return "";
508                         }
509
510                         @Override
511                         public String getDatabaseId() {
512                                 return "database";
513                         }
514
515                         @Override
516                         public long getFirstChangeSetId() {
517                                 return 0;
518                         }
519
520                 };
521         }
522
523         @Override
524         public Refresh getRefresh(long changeSetId) throws ProCoreException {
525
526                 final ClusterIds ids = getClusterIds();
527
528                 return new Refresh() {
529
530                         @Override
531                         public long getHeadChangeSetId() {
532                                 return clusters.state.headChangeSetId;
533                         }
534
535                         @Override
536                         public long[] getFirst() {
537                                 return ids.getFirst();
538                         }
539
540                         @Override
541                         public long[] getSecond() {
542                                 return ids.getSecond();
543                         }
544
545                 };
546
547         }
548
549 //      public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
550 //              return clusters.getResourceFile(clusterUID, resourceIndex);
551 //      }
552
553         @Override
554         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
555                 try {
556             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
557         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
558             throw new ProCoreException(e);
559         }
560         }
561
562         @Override
563         public long reserveIds(int count) throws ProCoreException {
564                 return clusters.state.reservedIds++;
565         }
566
567         @Override
568         public void updateCluster(byte[] operations) throws ProCoreException {
569             ClusterInfo info = null;
570             try {
571                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
572                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
573                 if(info == null)
574                     throw new IllegalAcornStateException("info == null for operation " + operation);
575                 info.acquireMutex();
576                         info.scheduleUpdate();
577                         mainProgram.schedule(operation);
578                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
579             throw new ProCoreException(e);
580         } finally {
581             if (info != null)
582                 info.releaseMutex();
583                 }
584         }
585
586         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
587
588                 String[] ss = ccsId.split("\\.");
589                 String chunkKey = ss[0];
590                 int chunkOffset = Integer.parseInt(ss[1]);
591                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
592                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
593                 chunk.acquireMutex();
594                 try {
595                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
596                 } catch (DatabaseException e) {
597                     throw e;
598                 } catch (Throwable t) {
599                         throw new IllegalStateException(t);
600                 } finally {
601                         chunk.releaseMutex();
602                 }
603         }
604
605         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
606                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
607
608                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
609
610                 clusters.clusterLRU.acquireMutex();
611                 try {
612
613                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
614                         for(int i=0;i<proc.entries.size();i++) {
615
616                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
617                                 e.process(clusters, cs, clusterKey);
618                         }
619                         cs.flush();
620
621                 } finally {
622                         clusters.clusterLRU.releaseMutex();
623                 }
624         }
625
626         private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
627
628                 Exception[] exception = { null };
629                 Semaphore s = new Semaphore(0);
630
631                 mainProgram.runIdle(new MainProgramRunnable() {
632
633                         @Override
634                         public void success() {
635                                 try {
636                                     runnable.success();
637                                 } finally {
638                                     s.release();
639                                 }
640                         }
641
642                         @Override
643                         public void error(Exception e) {
644                                 exception[0] = e;
645                                 try {
646                                     runnable.error(e);
647                                 } finally {
648                                     s.release();
649                                 }
650                         }
651
652                         @Override
653                         public void run() throws Exception {
654                                 runnable.run();
655                         }
656
657                 });
658
659                 try {
660                         s.acquire();
661                 } catch (InterruptedException e) {
662                         throw new IllegalAcornStateException("Unhandled interruption.", e);
663                 }
664
665                 Exception e = exception[0];
666                 if(e != null) {
667                         if(e instanceof SDBException) throw (SDBException)e;
668                         else if(e != null) throw new IllegalAcornStateException(e);
669                 }
670
671         }
672
673         @Override
674         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
675
676                 synchronizeWithIdleMainProgram(new MainProgramRunnable() {
677
678                         @Override
679                         public void run() throws Exception {
680
681                                 try {
682
683                                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
684
685                                 UndoClusterSupport support = new UndoClusterSupport(clusters);
686
687                                 final int changeSetId = clusters.state.headChangeSetId;
688
689                                 if(ClusterUpdateProcessorBase.DEBUG)
690                                         LOGGER.info(" === BEGIN UNDO ===");
691
692                                 for(int i=0;i<changeSetIds.length;i++) {
693                                         final long id = changeSetIds[changeSetIds.length-1-i];
694                                         ArrayList<String> ccss = clusters.getChanges(id);
695
696                                         for(int j=0;j<ccss.size();j++) {
697                                                 String ccsid = ccss.get(ccss.size()-j-1);
698                                                 try {
699                                                         if(ClusterUpdateProcessorBase.DEBUG)
700                                                                 LOGGER.info("performUndo " + ccsid);
701                                                         performUndo(ccsid, clusterChanges, support);
702                                                 } catch (DatabaseException e) {
703                                                         e.printStackTrace();
704                                                 }
705                                         }
706                                 }
707
708                                 if(ClusterUpdateProcessorBase.DEBUG)
709                                         LOGGER.info(" === END UNDO ===");
710
711                                 for(int i=0;i<clusterChanges.size();i++) {
712
713                                         final int changeSetIndex = i;
714
715                                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
716
717                                         final ClusterUID cuid = pair.first;
718                                         final byte[] data = pair.second;
719
720                                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
721
722                                                 @Override
723                                                 public long getChangeSetId() {
724                                                         return changeSetId;
725                                                 }
726
727                                                 @Override
728                                                 public int getChangeSetIndex() {
729                                                         return 0;
730                                                 }
731
732                                                 @Override
733                                                 public int getNumberOfClusterChangeSets() {
734                                                         return clusterChanges.size();
735                                                 }
736
737                                                 @Override
738                                                 public int getIndexOfClusterChangeSet() {
739                                                         return changeSetIndex;
740                                                 }
741
742                                                 @Override
743                                                 public byte[] getClusterId() {
744                                                         return cuid.asBytes();
745                                                 }
746
747                                                 @Override
748                                                 public boolean getNewCluster() {
749                                                         return false;
750                                                 }
751
752                                                 @Override
753                                                 public byte[] getData() {
754                                                         return data;
755                                                 }
756
757                                         });
758                                 }
759                         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
760                             throw new ProCoreException(e1);
761                         }
762
763                         }
764
765                 });
766
767                 return false;
768
769         }
770
771         ServiceLocator getServiceLocator() {
772             return locator;
773         }
774
775     @Override
776     public boolean refreshEnabled() {
777         return false;
778     }
779
780     @Override
781     public boolean rolledback() {
782         return clusters.rolledback();
783     }
784
785     private void purge() throws IllegalAcornStateException {
786         clusters.purge(locator);
787     }
788
789     public void purgeDatabase() {
790
791             if (isClosing || unexpectedClose)
792                 return;
793
794                 saver.execute(new Runnable() {
795
796                         @Override
797                         public void run() {
798                                 Transaction tr = null;
799                                 try {
800                                         // First take a write transaction
801                                         tr = askWriteTransaction(-1);
802                                         // Then make sure that MainProgram is idling
803                                         synchronizeWithIdleMainProgram(() -> purge());
804                                 } catch (IllegalAcornStateException | ProCoreException e) {
805                                         LOGGER.error("Purge failed", e);
806                                         unexpectedClose = true;
807                                 } catch (SDBException e) {
808                                         LOGGER.error("Purge failed", e);
809                                         unexpectedClose = true;
810                                 } finally {
811                                         try {
812                                                 if(tr != null) {
813                                                         endTransaction(tr.getTransactionId());
814                                                         eventSupport.fireEvent(PURGE, null);
815                                                 }
816                                                 if (unexpectedClose) {
817                                                         LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
818                                                         try {
819                                                                 support.close();
820                                                         } catch (DatabaseException e1) {
821                                                                 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
822                                                         }
823                                                 }
824                                         } catch (ProCoreException e) {
825                                             LOGGER.error("Failed to end purge write transaction", e);
826                                         }
827                                 }
828                         }
829                 });
830
831     }
832
833     public long getTailChangeSetId() {
834         return clusters.getTailChangeSetId();
835     }
836
837         public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
838
839                 makeSnapshot(true);
840
841                 Path dbDir = getDbFolder();
842                 int newestFolder = clusters.mainState.headDir - 1;
843                 int latestFolder = -2;
844                 Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
845                 if (Files.exists(AcornMetadataFile)) {
846                         try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
847                                 latestFolder = Integer.parseInt( br.readLine() );
848                         }
849                 }
850
851                 AcornBackupRunnable r = new AcornBackupRunnable(
852                                 lock, targetPath, revision, dbDir, latestFolder, newestFolder);
853                 new Thread(r, "Acorn backup thread").start();
854                 return r;
855
856         }
857
858 }
859