]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Fixing problems in the database unit testing environment with Acorn
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.acorn.lru.ClusterInfo;
32 import org.simantics.acorn.lru.ClusterStreamChunk;
33 import org.simantics.acorn.lru.ClusterUpdateOperation;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import gnu.trove.map.hash.TLongObjectHashMap;
49
50 public class GraphClientImpl2 implements Database.Session {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
53         public static final boolean DEBUG = false;
54
55         public final ClusterManager clusters;
56         
57         private TransactionManager transactionManager = new TransactionManager();
58         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
59         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
60
61         private Path dbFolder;
62         private final Database database;
63         private ServiceLocator locator;
64         private FileCache fileCache;
65         private MainProgram mainProgram;
66
67         static class ClientThreadFactory implements ThreadFactory {
68                 
69                 final String name;
70                 final boolean daemon;
71                 
72                 public ClientThreadFactory(String name, boolean daemon) {
73                         this.name = name;
74                         this.daemon = daemon;
75                 }
76                 
77                 @Override
78                 public Thread newThread(Runnable r) {
79                         Thread thread = new Thread(r, name);
80                         thread.setDaemon(daemon);
81                         return thread;
82                 }
83         }
84
85         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
86             this.database = database;
87             this.dbFolder = dbFolder;
88             this.locator = locator;
89             this.fileCache = new FileCache();
90             // This disposes the cache when the session is shut down 
91             locator.registerService(FileCache.class, fileCache);
92             this.clusters = new ClusterManager(dbFolder, fileCache);
93             load();
94             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
95             cssi.setReadDirectory(clusters.lastSessionDirectory);
96             cssi.updateWriteDirectory(clusters.workingDirectory);
97             mainProgram = new MainProgram(this, clusters);
98             executor.execute(mainProgram);
99         }
100
101         public Path getDbFolder() {
102             return dbFolder;
103         }
104
105         public void tryMakeSnapshot() throws IOException {
106                 
107             if (isClosing || unexpectedClose)
108                 return;
109             
110                 saver.execute(new Runnable() {
111
112                         @Override
113                         public void run() {
114                                 Transaction tr = null;
115                                 try {
116                                         // First take a write transaction
117                                         tr = askWriteTransaction(-1);
118                                         // Then make sure that MainProgram is idling
119                                         mainProgram.mutex.acquire();
120                                         try {
121                                                 synchronized(mainProgram) {
122                                                         if(mainProgram.operations.isEmpty()) {
123                                                                 makeSnapshot(false);
124                                                         } else {
125                                                                 // MainProgram is becoming busy again - delay snapshotting
126                                                                 return;
127                                                         }
128                                                 }
129                                         } finally {
130                                                 mainProgram.mutex.release();
131                                         }
132                                 } catch (IllegalAcornStateException | ProCoreException e) {
133                                         LOGGER.error("Snapshotting failed", e);
134                                         unexpectedClose = true;
135                                 } catch (InterruptedException e) {
136                                     LOGGER.error("Snapshotting interrupted", e);
137                                 } finally {
138                                         try {
139                                                 if(tr != null)
140                                                         endTransaction(tr.getTransactionId());
141                                                 if (unexpectedClose) {
142                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
143                             try {
144                                 support.close();
145                             } catch (DatabaseException e1) {
146                                 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
147                             }
148                                                 }
149                                         } catch (ProCoreException e) {
150                                             LOGGER.error("Failed to end snapshotting write transaction", e);
151                                         }
152                                 }
153                         }
154                 });
155         }
156         
157     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
158         clusters.makeSnapshot(locator, fullSave);
159     }
160         
161         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
162             try {
163             return clusters.clone(uid, creator);
164         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
165             unexpectedClose = true;
166             throw new DatabaseException(e);
167         }
168         }
169
170 //      private void save() throws IOException {
171 //              clusters.save();
172 //      }
173         
174         public void load() throws IOException {
175                 clusters.load();
176         }
177         
178 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
179 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
180 //      }
181
182         @Override
183         public Database getDatabase() {
184                 return database;
185         }
186
187         private boolean closed = false;
188         private boolean isClosing = false;
189         private boolean unexpectedClose = false;
190         
191         @Override
192         public void close() throws ProCoreException {
193             LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
194                 if(!closed && !isClosing) {
195                     isClosing = true;
196                         try {
197                             if (!unexpectedClose)
198                                 makeSnapshot(true);
199
200                                 mainProgram.close();
201                                 clusters.shutdown();
202                                 executor.shutdown();
203                                 saver.shutdown();
204                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
205                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
206                                 
207                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
208
209                                 try {
210                                         clusters.mainState.save(dbFolder);
211                                 } catch (IOException e) {
212                                         LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
213                                 }
214
215                                 mainProgram = null;
216                                 executor = null;
217                                 saver = null;
218                                 
219                         } catch (IllegalAcornStateException | InterruptedException e) {
220                                 throw new ProCoreException(e);
221                         }
222                         closed = true;
223                 }
224                 //impl.close();
225         }
226
227         @Override
228         public void open() throws ProCoreException {
229                 throw new UnsupportedOperationException();
230         }
231
232         @Override
233         public boolean isClosed() throws ProCoreException {
234                 return closed;
235         }
236         
237         @Override
238         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
239                 clusters.state.headChangeSetId++;
240                 long committedChangeSetId = changeSetId + 1;
241                 try {
242                 clusters.commitChangeSet(committedChangeSetId, metadata);
243                 
244                 clusters.state.transactionId = transactionId;
245                 
246                 mainProgram.committed();
247                 
248                 TimeLogger.log("Accepted commit");
249                 } catch (IllegalAcornStateException e) {
250                     throw new ProCoreException(e);
251                 }
252         }
253
254         @Override
255         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
256                 // Accept and finalize current transaction and then undo it
257                 acceptCommit(transactionId, changeSetId, metadata);
258
259                 try {
260                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
261                         clusters.state.headChangeSetId++;
262                         return clusters.state.headChangeSetId;
263                 } catch (SDBException e) {
264                     LOGGER.error("Failed to undo cancelled transaction", e);
265                         throw new ProCoreException(e);
266                 }
267         }
268
269         @Override
270         public Transaction askReadTransaction() throws ProCoreException {
271                 return transactionManager.askReadTransaction();
272         }
273
274         enum TransactionState {
275                 IDLE,WRITE,READ
276         }
277         
278         class TransactionRequest {
279                 public TransactionState state;
280                 public Semaphore semaphore;
281                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
282                         this.state = state;
283                         this.semaphore = semaphore;
284                 }
285         }
286
287         class TransactionManager {
288
289                 private TransactionState currentTransactionState = TransactionState.IDLE;
290                 
291                 private int reads = 0;
292                 
293                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
294                 
295                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
296                 
297                 private synchronized Transaction makeTransaction(TransactionRequest req) {
298                         
299                         final int csId = clusters.state.headChangeSetId;
300                         final long trId = clusters.state.transactionId+1;
301                         requestMap.put(trId, req);
302                         return new Transaction() {
303                                 
304                                 @Override
305                                 public long getTransactionId() {
306                                         return trId;
307                                 }
308                                 
309                                 @Override
310                                 public long getHeadChangeSetId() {
311                                         return csId;
312                                 }
313                         };
314                 }
315                 
316                 /*
317                  * This method cannot be synchronized since it waits and must support multiple entries
318                  * by query thread(s) and internal transactions such as snapshot saver
319                  */
320                 public Transaction askReadTransaction() throws ProCoreException {
321                 
322                         Semaphore semaphore = new Semaphore(0);
323                         
324                         TransactionRequest req = queue(TransactionState.READ, semaphore);
325                         
326                         try {
327                                 semaphore.acquire();
328                         } catch (InterruptedException e) {
329                                 throw new ProCoreException(e);
330                         }
331                         
332                         return makeTransaction(req);
333
334                 }
335
336                 private synchronized void dispatch() {
337                         TransactionRequest r = requests.removeFirst();
338                         if(r.state == TransactionState.READ) reads++;
339                         r.semaphore.release();
340                 }
341                 
342                 private synchronized void processRequests() {
343                         
344                         while(true) {
345
346                                 if(requests.isEmpty()) return;
347                                 TransactionRequest req = requests.peek();
348
349                                 if(currentTransactionState == TransactionState.IDLE) {
350                                 
351                                         // Accept anything while IDLE
352                                         currentTransactionState = req.state;
353                                         dispatch();
354                                         
355                                 } else if (currentTransactionState == TransactionState.READ) {
356                                         
357                                         if(req.state == currentTransactionState) {
358
359                                                 // Allow other reads
360                                                 dispatch();
361
362                                         } else {
363                                                 
364                                                 // Wait
365                                                 return;
366                                                 
367                                         }
368                                         
369                                 }  else if (currentTransactionState == TransactionState.WRITE) {
370
371                                         // Wait
372                                         return;
373                                         
374                                 }
375                                 
376                         }
377                         
378                 }
379                 
380                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
381                         TransactionRequest req = new TransactionRequest(state, semaphore); 
382                         requests.addLast(req);
383                         processRequests();
384                         return req;
385                 }
386                 
387                 /*
388                  * This method cannot be synchronized since it waits and must support multiple entries
389                  * by query thread(s) and internal transactions such as snapshot saver
390                  */
391                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
392                         
393                         Semaphore semaphore = new Semaphore(0);
394                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
395                         
396                         try {
397                                 semaphore.acquire();
398                         } catch (InterruptedException e) {
399                                 throw new IllegalAcornStateException(e);
400                         }
401                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
402                         return makeTransaction(req);
403                 }
404                 
405                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
406                         
407                         TransactionRequest req = requestMap.remove(transactionId);
408                         if(req.state == TransactionState.WRITE) {
409                                 currentTransactionState = TransactionState.IDLE;
410                                 processRequests();
411                         } else {
412                                 reads--;
413                                 if(reads == 0) {
414                                         currentTransactionState = TransactionState.IDLE;
415                                         processRequests();
416                                 }
417                         }
418                         return clusters.state.transactionId;
419                 }
420
421         }
422         
423         @Override
424         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
425                 try {
426                     if (isClosing || unexpectedClose || closed) {
427                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
428                     }
429             return transactionManager.askWriteTransaction();
430         } catch (IllegalAcornStateException e) {
431             throw new ProCoreException(e);
432         }
433         }
434
435         @Override
436         public long endTransaction(long transactionId) throws ProCoreException {
437                 return transactionManager.endTransaction(transactionId);
438         }
439
440         @Override
441         public String execute(String command) throws ProCoreException {
442                 // This is called only by WriteGraphImpl.commitAccessorChanges
443                 // We can ignore this in Acorn
444                 return "";
445         }
446
447         @Override
448         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
449                 try {
450             return clusters.getMetadata(changeSetId);
451         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
452             throw new ProCoreException(e);
453         }
454         }
455
456         @Override
457         public ChangeSetData getChangeSetData(long minChangeSetId,
458                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
459                         throws ProCoreException {
460                 
461                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
462                 return null;
463                 
464         }
465
466         @Override
467         public ChangeSetIds getChangeSetIds() throws ProCoreException {
468                 throw new UnsupportedOperationException();
469         }
470
471         @Override
472         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
473                 throw new UnsupportedOperationException();
474         }
475
476         @Override
477         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
478                         throws ProCoreException {
479                 throw new UnsupportedOperationException();
480         }
481
482         @Override
483         public ClusterIds getClusterIds() throws ProCoreException {
484                 try {
485             return clusters.getClusterIds();
486         } catch (IllegalAcornStateException e) {
487             throw new ProCoreException(e);
488         }
489         }
490
491         @Override
492         public Information getInformation() throws ProCoreException {
493                 return new Information() {
494
495                         @Override
496                         public String getServerId() {
497                                 return "server";
498                         }
499
500                         @Override
501                         public String getProtocolId() {
502                                 return "";
503                         }
504
505                         @Override
506                         public String getDatabaseId() {
507                                 return "database";
508                         }
509
510                         @Override
511                         public long getFirstChangeSetId() {
512                                 return 0;
513                         }
514                         
515                 };
516         }
517
518         @Override
519         public Refresh getRefresh(long changeSetId) throws ProCoreException {
520                 
521                 final ClusterIds ids = getClusterIds();
522                 
523                 return new Refresh() {
524
525                         @Override
526                         public long getHeadChangeSetId() {
527                                 return clusters.state.headChangeSetId;
528                         }
529
530                         @Override
531                         public long[] getFirst() {
532                                 return ids.getFirst();
533                         }
534
535                         @Override
536                         public long[] getSecond() {
537                                 return ids.getSecond();
538                         }
539                         
540                 };
541                 
542         }
543
544         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
545                 return clusters.getResourceFile(clusterUID, resourceIndex);
546         }
547
548         @Override
549         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
550                 try {
551             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
552         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
553             throw new ProCoreException(e);
554         }
555         }
556
557         @Override
558         public long reserveIds(int count) throws ProCoreException {
559                 return clusters.state.reservedIds++;
560         }
561
562         @Override
563         public void updateCluster(byte[] operations) throws ProCoreException {
564             ClusterInfo info = null;
565             try {
566                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
567                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
568                 if(info == null)
569                     throw new IllegalAcornStateException("info == null for operation " + operation);
570                 info.acquireMutex();
571                         info.scheduleUpdate();
572                         mainProgram.schedule(operation);
573                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
574             throw new ProCoreException(e);
575         } finally {
576             if (info != null)
577                 info.releaseMutex();
578                 }
579         }
580
581         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
582
583                 String[] ss = ccsId.split("\\.");
584                 String chunkKey = ss[0];
585                 int chunkOffset = Integer.parseInt(ss[1]);
586                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
587                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
588                 chunk.acquireMutex();
589                 try {
590                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
591                 } catch (DatabaseException e) {
592                     throw e;
593                 } catch (Throwable t) {
594                         throw new IllegalStateException(t);
595                 } finally {
596                         chunk.releaseMutex();
597                 }
598         }
599         
600         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
601                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
602
603                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
604
605                 clusters.clusterLRU.acquireMutex();
606                 try {
607
608                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
609                         for(int i=0;i<proc.entries.size();i++) {
610                                 
611                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
612                                 e.process(clusters, cs, clusterKey);
613                         }
614                         cs.flush();
615
616                 } finally {
617                         clusters.clusterLRU.releaseMutex();
618                 }
619         }
620
621         @Override
622         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
623
624                 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
625
626                         @Override
627                         public void run() throws Exception {
628
629                                 try {
630
631                                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
632                                 
633                                 UndoClusterSupport support = new UndoClusterSupport(clusters);
634                                 
635                                 final int changeSetId = clusters.state.headChangeSetId;
636                                 
637                                 if(ClusterUpdateProcessorBase.DEBUG)
638                                         System.err.println(" === BEGIN UNDO ===");
639                                 
640                                 for(int i=0;i<changeSetIds.length;i++) {
641                                         final long id = changeSetIds[changeSetIds.length-1-i];
642                                         ArrayList<String> ccss = clusters.getChanges(id);
643                     
644                                         for(int j=0;j<ccss.size();j++) {
645                                                 String ccsid = ccss.get(ccss.size()-j-1);
646                                                 try {
647                                                         if(ClusterUpdateProcessorBase.DEBUG)
648                                                                 System.err.println("performUndo " + ccsid);
649                                                         performUndo(ccsid, clusterChanges, support);
650                                                 } catch (DatabaseException e) {
651                                                         e.printStackTrace();
652                                                 }
653                                         }
654                                 }
655                     
656                                 if(ClusterUpdateProcessorBase.DEBUG)
657                                         System.err.println(" === END UNDO ===");
658                     
659                                 for(int i=0;i<clusterChanges.size();i++) {
660                                         
661                                         final int changeSetIndex = i;
662                                         
663                                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
664                                         
665                                         final ClusterUID cuid = pair.first;
666                                         final byte[] data = pair.second;
667                     
668                                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
669                     
670                                                 @Override
671                                                 public long getChangeSetId() {
672                                                         return changeSetId;
673                                                 }
674                     
675                                                 @Override
676                                                 public int getChangeSetIndex() {
677                                                         return 0;
678                                                 }
679                     
680                                                 @Override
681                                                 public int getNumberOfClusterChangeSets() {
682                                                         return clusterChanges.size();
683                                                 }
684                     
685                                                 @Override
686                                                 public int getIndexOfClusterChangeSet() {
687                                                         return changeSetIndex;
688                                                 }
689                     
690                                                 @Override
691                                                 public byte[] getClusterId() {
692                                                         return cuid.asBytes();
693                                                 }
694                     
695                                                 @Override
696                                                 public boolean getNewCluster() {
697                                                         return false;
698                                                 }
699                     
700                                                 @Override
701                                                 public byte[] getData() {
702                                                         return data;
703                                                 }
704                     
705                                         });
706                                 }
707                         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
708                             throw new ProCoreException(e1);
709                         }
710
711                         }
712
713                         @Override
714                         public void done() {
715
716                         }
717
718                 });
719
720                 if(exception instanceof SDBException) throw (SDBException)exception;
721                 else if(exception != null) throw new IllegalAcornStateException(exception);
722                 
723                 return false;
724                 
725         }
726         
727         public ServiceLocator getServiceLocator() {
728             return locator;
729         }
730
731     @Override
732     public boolean refreshEnabled() {
733         return false;
734     }
735
736     @Override
737     public boolean rolledback() {
738         return clusters.rolledback();
739     }
740     
741     public void purge() throws IllegalAcornStateException {
742         clusters.purge(locator);
743     }
744
745     public void purgeDatabase() {
746         
747             if (isClosing || unexpectedClose)
748                 return;
749             
750                 saver.execute(new Runnable() {
751
752                         @Override
753                         public void run() {
754                                 Transaction tr = null;
755                                 try {
756                                         // First take a write transaction
757                                         tr = askWriteTransaction(-1);
758                                         // Then make sure that MainProgram is idling
759                                         mainProgram.mutex.acquire();
760                                         try {
761                                                 synchronized(mainProgram) {
762                                                         if(mainProgram.operations.isEmpty()) {
763                                                                 purge();
764                                                         } else {
765                                                                 // MainProgram is becoming busy again - delay snapshotting
766                                                                 return;
767                                                         }
768                                                 }
769                                         } finally {
770                                                 mainProgram.mutex.release();
771                                         }
772                                 } catch (IllegalAcornStateException | ProCoreException e) {
773                                     LOGGER.error("Purge failed", e);
774                                         unexpectedClose = true;
775                                 } catch (InterruptedException e) {
776                                     LOGGER.error("Purge interrupted", e);
777                                 } finally {
778                                         try {
779                                                 if(tr != null)
780                                                         endTransaction(tr.getTransactionId());
781                                                 if (unexpectedClose) {
782                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
783                             try {
784                                 support.close();
785                             } catch (DatabaseException e1) {
786                                 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
787                             }
788                                                 }
789                                         } catch (ProCoreException e) {
790                                             LOGGER.error("Failed to end purge write transaction", e);
791                                         }
792                                 }
793                         }
794                 });
795         
796     }
797     
798     public long getTailChangeSetId() {
799         return clusters.getTailChangeSetId();
800     }
801     
802 }
803