]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
07fe18e61201cc16fd4e62bd62b808d8d6b5041d
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.acorn.lru.ClusterInfo;
32 import org.simantics.acorn.lru.ClusterStreamChunk;
33 import org.simantics.acorn.lru.ClusterUpdateOperation;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import gnu.trove.map.hash.TLongObjectHashMap;
49
50 public class GraphClientImpl2 implements Database.Session {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
53         public static final boolean DEBUG = false;
54
55         public final ClusterManager clusters;
56         
57         private TransactionManager transactionManager = new TransactionManager();
58         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
59         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
60
61         private Path dbFolder;
62         private final Database database;
63         private ServiceLocator locator;
64         private MainProgram mainProgram;
65
66         static class ClientThreadFactory implements ThreadFactory {
67                 
68                 final String name;
69                 final boolean daemon;
70                 
71                 public ClientThreadFactory(String name, boolean daemon) {
72                         this.name = name;
73                         this.daemon = daemon;
74                 }
75                 
76                 @Override
77                 public Thread newThread(Runnable r) {
78                         Thread thread = new Thread(r, name);
79                         thread.setDaemon(daemon);
80                         return thread;
81                 }
82         }
83
84         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
85             this.database = database;
86             this.dbFolder = dbFolder;
87             this.locator = locator;
88             this.clusters = new ClusterManager(dbFolder);
89             load();
90             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
91             cssi.setReadDirectory(clusters.lastSessionDirectory);
92             cssi.updateWriteDirectory(clusters.workingDirectory);
93             mainProgram = new MainProgram(this, clusters);
94             executor.execute(mainProgram);
95         }
96
97         public Path getDbFolder() {
98             return dbFolder;
99         }
100
101         public void tryMakeSnapshot() throws IOException {
102                 
103             if (isClosing || unexpectedClose)
104                 return;
105             
106                 saver.execute(new Runnable() {
107
108                         @Override
109                         public void run() {
110                                 Transaction tr = null;
111                                 try {
112                                         // First take a write transaction
113                                         tr = askWriteTransaction(-1);
114                                         // Then make sure that MainProgram is idling
115                                         mainProgram.mutex.acquire();
116                                         try {
117                                                 synchronized(mainProgram) {
118                                                         if(mainProgram.operations.isEmpty()) {
119                                                                 makeSnapshot(false);
120                                                         } else {
121                                                                 // MainProgram is becoming busy again - delay snapshotting
122                                                                 return;
123                                                         }
124                                                 }
125                                         } finally {
126                                                 mainProgram.mutex.release();
127                                         }
128                                 } catch (IllegalAcornStateException | ProCoreException e) {
129                                         LOGGER.error("Snapshotting failed", e);
130                                         unexpectedClose = true;
131                                 } catch (InterruptedException e) {
132                                     LOGGER.error("Snapshotting interrupted", e);
133                                 } finally {
134                                         try {
135                                                 if(tr != null)
136                                                         endTransaction(tr.getTransactionId());
137                                                 if (unexpectedClose) {
138                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
139                             try {
140                                 support.close();
141                             } catch (DatabaseException e1) {
142                                 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
143                             }
144                                                 }
145                                         } catch (ProCoreException e) {
146                                             LOGGER.error("Failed to end snapshotting write transaction", e);
147                                         }
148                                 }
149                         }
150                 });
151         }
152         
153     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
154         clusters.makeSnapshot(locator, fullSave);
155     }
156         
157         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
158             try {
159             return clusters.clone(uid, creator);
160         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
161             unexpectedClose = true;
162             throw new DatabaseException(e);
163         }
164         }
165
166 //      private void save() throws IOException {
167 //              clusters.save();
168 //      }
169         
170         public void load() throws IOException {
171                 clusters.load();
172         }
173         
174 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
175 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
176 //      }
177
178         @Override
179         public Database getDatabase() {
180                 return database;
181         }
182
183         private boolean closed = false;
184         private boolean isClosing = false;
185         private boolean unexpectedClose = false;
186         
187         @Override
188         public void close() throws ProCoreException {
189             LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
190                 if(!closed && !isClosing) {
191                     isClosing = true;
192                         try {
193                             if (!unexpectedClose)
194                                 makeSnapshot(true);
195
196                                 mainProgram.close();
197                                 clusters.shutdown();
198                                 executor.shutdown();
199                                 saver.shutdown();
200                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
201                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
202                                 
203                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
204
205                                 try {
206                                         clusters.mainState.save(dbFolder);
207                                 } catch (IOException e) {
208                                         LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
209                                 }
210
211                                 mainProgram = null;
212                                 executor = null;
213                                 saver = null;
214                                 
215                         } catch (IllegalAcornStateException | InterruptedException e) {
216                                 throw new ProCoreException(e);
217                         }
218                         closed = true;
219                 }
220                 //impl.close();
221         }
222
223         @Override
224         public void open() throws ProCoreException {
225                 throw new UnsupportedOperationException();
226         }
227
228         @Override
229         public boolean isClosed() throws ProCoreException {
230                 return closed;
231         }
232         
233         @Override
234         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
235                 clusters.state.headChangeSetId++;
236                 long committedChangeSetId = changeSetId + 1;
237                 try {
238                 clusters.commitChangeSet(committedChangeSetId, metadata);
239                 
240                 clusters.state.transactionId = transactionId;
241                 
242                 mainProgram.committed();
243                 
244                 TimeLogger.log("Accepted commit");
245                 } catch (IllegalAcornStateException e) {
246                     throw new ProCoreException(e);
247                 }
248         }
249
250         @Override
251         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
252                 // Accept and finalize current transaction and then undo it
253                 acceptCommit(transactionId, changeSetId, metadata);
254
255                 try {
256                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
257                         clusters.state.headChangeSetId++;
258                         return clusters.state.headChangeSetId;
259                 } catch (SDBException e) {
260                     LOGGER.error("Failed to undo cancelled transaction", e);
261                         throw new ProCoreException(e);
262                 }
263         }
264
265         @Override
266         public Transaction askReadTransaction() throws ProCoreException {
267                 return transactionManager.askReadTransaction();
268         }
269
270         enum TransactionState {
271                 IDLE,WRITE,READ
272         }
273         
274         class TransactionRequest {
275                 public TransactionState state;
276                 public Semaphore semaphore;
277                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
278                         this.state = state;
279                         this.semaphore = semaphore;
280                 }
281         }
282
283         class TransactionManager {
284
285                 private TransactionState currentTransactionState = TransactionState.IDLE;
286                 
287                 private int reads = 0;
288                 
289                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
290                 
291                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
292                 
293                 private synchronized Transaction makeTransaction(TransactionRequest req) {
294                         
295                         final int csId = clusters.state.headChangeSetId;
296                         final long trId = clusters.state.transactionId+1;
297                         requestMap.put(trId, req);
298                         return new Transaction() {
299                                 
300                                 @Override
301                                 public long getTransactionId() {
302                                         return trId;
303                                 }
304                                 
305                                 @Override
306                                 public long getHeadChangeSetId() {
307                                         return csId;
308                                 }
309                         };
310                 }
311                 
312                 /*
313                  * This method cannot be synchronized since it waits and must support multiple entries
314                  * by query thread(s) and internal transactions such as snapshot saver
315                  */
316                 public Transaction askReadTransaction() throws ProCoreException {
317                 
318                         Semaphore semaphore = new Semaphore(0);
319                         
320                         TransactionRequest req = queue(TransactionState.READ, semaphore);
321                         
322                         try {
323                                 semaphore.acquire();
324                         } catch (InterruptedException e) {
325                                 throw new ProCoreException(e);
326                         }
327                         
328                         return makeTransaction(req);
329
330                 }
331
332                 private synchronized void dispatch() {
333                         TransactionRequest r = requests.removeFirst();
334                         if(r.state == TransactionState.READ) reads++;
335                         r.semaphore.release();
336                 }
337                 
338                 private synchronized void processRequests() {
339                         
340                         while(true) {
341
342                                 if(requests.isEmpty()) return;
343                                 TransactionRequest req = requests.peek();
344
345                                 if(currentTransactionState == TransactionState.IDLE) {
346                                 
347                                         // Accept anything while IDLE
348                                         currentTransactionState = req.state;
349                                         dispatch();
350                                         
351                                 } else if (currentTransactionState == TransactionState.READ) {
352                                         
353                                         if(req.state == currentTransactionState) {
354
355                                                 // Allow other reads
356                                                 dispatch();
357
358                                         } else {
359                                                 
360                                                 // Wait
361                                                 return;
362                                                 
363                                         }
364                                         
365                                 }  else if (currentTransactionState == TransactionState.WRITE) {
366
367                                         // Wait
368                                         return;
369                                         
370                                 }
371                                 
372                         }
373                         
374                 }
375                 
376                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
377                         TransactionRequest req = new TransactionRequest(state, semaphore); 
378                         requests.addLast(req);
379                         processRequests();
380                         return req;
381                 }
382                 
383                 /*
384                  * This method cannot be synchronized since it waits and must support multiple entries
385                  * by query thread(s) and internal transactions such as snapshot saver
386                  */
387                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
388                         
389                         Semaphore semaphore = new Semaphore(0);
390                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
391                         
392                         try {
393                                 semaphore.acquire();
394                         } catch (InterruptedException e) {
395                                 throw new IllegalAcornStateException(e);
396                         }
397                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
398                         return makeTransaction(req);
399                 }
400                 
401                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
402                         
403                         TransactionRequest req = requestMap.remove(transactionId);
404                         if(req.state == TransactionState.WRITE) {
405                                 currentTransactionState = TransactionState.IDLE;
406                                 processRequests();
407                         } else {
408                                 reads--;
409                                 if(reads == 0) {
410                                         currentTransactionState = TransactionState.IDLE;
411                                         processRequests();
412                                 }
413                         }
414                         return clusters.state.transactionId;
415                 }
416
417         }
418         
419         @Override
420         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
421                 try {
422                     if (isClosing || unexpectedClose || closed) {
423                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
424                     }
425             return transactionManager.askWriteTransaction();
426         } catch (IllegalAcornStateException e) {
427             throw new ProCoreException(e);
428         }
429         }
430
431         @Override
432         public long endTransaction(long transactionId) throws ProCoreException {
433                 return transactionManager.endTransaction(transactionId);
434         }
435
436         @Override
437         public String execute(String command) throws ProCoreException {
438                 // This is called only by WriteGraphImpl.commitAccessorChanges
439                 // We can ignore this in Acorn
440                 return "";
441         }
442
443         @Override
444         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
445                 try {
446             return clusters.getMetadata(changeSetId);
447         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
448             throw new ProCoreException(e);
449         }
450         }
451
452         @Override
453         public ChangeSetData getChangeSetData(long minChangeSetId,
454                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
455                         throws ProCoreException {
456                 
457                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
458                 return null;
459                 
460         }
461
462         @Override
463         public ChangeSetIds getChangeSetIds() throws ProCoreException {
464                 throw new UnsupportedOperationException();
465         }
466
467         @Override
468         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
469                 throw new UnsupportedOperationException();
470         }
471
472         @Override
473         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
474                         throws ProCoreException {
475                 throw new UnsupportedOperationException();
476         }
477
478         @Override
479         public ClusterIds getClusterIds() throws ProCoreException {
480                 try {
481             return clusters.getClusterIds();
482         } catch (IllegalAcornStateException e) {
483             throw new ProCoreException(e);
484         }
485         }
486
487         @Override
488         public Information getInformation() throws ProCoreException {
489                 return new Information() {
490
491                         @Override
492                         public String getServerId() {
493                                 return "server";
494                         }
495
496                         @Override
497                         public String getProtocolId() {
498                                 return "";
499                         }
500
501                         @Override
502                         public String getDatabaseId() {
503                                 return "database";
504                         }
505
506                         @Override
507                         public long getFirstChangeSetId() {
508                                 return 0;
509                         }
510                         
511                 };
512         }
513
514         @Override
515         public Refresh getRefresh(long changeSetId) throws ProCoreException {
516                 
517                 final ClusterIds ids = getClusterIds();
518                 
519                 return new Refresh() {
520
521                         @Override
522                         public long getHeadChangeSetId() {
523                                 return clusters.state.headChangeSetId;
524                         }
525
526                         @Override
527                         public long[] getFirst() {
528                                 return ids.getFirst();
529                         }
530
531                         @Override
532                         public long[] getSecond() {
533                                 return ids.getSecond();
534                         }
535                         
536                 };
537                 
538         }
539
540         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
541                 return clusters.getResourceFile(clusterUID, resourceIndex);
542         }
543
544         @Override
545         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
546                 try {
547             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
548         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
549             throw new ProCoreException(e);
550         }
551         }
552
553         @Override
554         public long reserveIds(int count) throws ProCoreException {
555                 return clusters.state.reservedIds++;
556         }
557
558         @Override
559         public void updateCluster(byte[] operations) throws ProCoreException {
560             ClusterInfo info = null;
561             try {
562                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
563                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
564                 if(info == null)
565                     throw new IllegalAcornStateException("info == null for operation " + operation);
566                 info.acquireMutex();
567                         info.scheduleUpdate();
568                         mainProgram.schedule(operation);
569                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
570             throw new ProCoreException(e);
571         } finally {
572             if (info != null)
573                 info.releaseMutex();
574                 }
575         }
576
577         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
578
579                 String[] ss = ccsId.split("\\.");
580                 String chunkKey = ss[0];
581                 int chunkOffset = Integer.parseInt(ss[1]);
582                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
583                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
584                 chunk.acquireMutex();
585                 try {
586                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
587                 } catch (DatabaseException e) {
588                     throw e;
589                 } catch (Throwable t) {
590                         throw new IllegalStateException(t);
591                 } finally {
592                         chunk.releaseMutex();
593                 }
594         }
595         
596         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
597                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
598
599                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
600
601                 clusters.clusterLRU.acquireMutex();
602                 try {
603
604                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
605                         for(int i=0;i<proc.entries.size();i++) {
606                                 
607                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
608                                 e.process(clusters, cs, clusterKey);
609                         }
610                         cs.flush();
611
612                 } finally {
613                         clusters.clusterLRU.releaseMutex();
614                 }
615         }
616
617         @Override
618         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
619
620                 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
621
622                         @Override
623                         public void run() throws Exception {
624
625                                 try {
626
627                                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
628                                 
629                                 UndoClusterSupport support = new UndoClusterSupport(clusters);
630                                 
631                                 final int changeSetId = clusters.state.headChangeSetId;
632                                 
633                                 if(ClusterUpdateProcessorBase.DEBUG)
634                                         System.err.println(" === BEGIN UNDO ===");
635                                 
636                                 for(int i=0;i<changeSetIds.length;i++) {
637                                         final long id = changeSetIds[changeSetIds.length-1-i];
638                                         ArrayList<String> ccss = clusters.getChanges(id);
639                     
640                                         for(int j=0;j<ccss.size();j++) {
641                                                 String ccsid = ccss.get(ccss.size()-j-1);
642                                                 try {
643                                                         if(ClusterUpdateProcessorBase.DEBUG)
644                                                                 System.err.println("performUndo " + ccsid);
645                                                         performUndo(ccsid, clusterChanges, support);
646                                                 } catch (DatabaseException e) {
647                                                         e.printStackTrace();
648                                                 }
649                                         }
650                                 }
651                     
652                                 if(ClusterUpdateProcessorBase.DEBUG)
653                                         System.err.println(" === END UNDO ===");
654                     
655                                 for(int i=0;i<clusterChanges.size();i++) {
656                                         
657                                         final int changeSetIndex = i;
658                                         
659                                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
660                                         
661                                         final ClusterUID cuid = pair.first;
662                                         final byte[] data = pair.second;
663                     
664                                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
665                     
666                                                 @Override
667                                                 public long getChangeSetId() {
668                                                         return changeSetId;
669                                                 }
670                     
671                                                 @Override
672                                                 public int getChangeSetIndex() {
673                                                         return 0;
674                                                 }
675                     
676                                                 @Override
677                                                 public int getNumberOfClusterChangeSets() {
678                                                         return clusterChanges.size();
679                                                 }
680                     
681                                                 @Override
682                                                 public int getIndexOfClusterChangeSet() {
683                                                         return changeSetIndex;
684                                                 }
685                     
686                                                 @Override
687                                                 public byte[] getClusterId() {
688                                                         return cuid.asBytes();
689                                                 }
690                     
691                                                 @Override
692                                                 public boolean getNewCluster() {
693                                                         return false;
694                                                 }
695                     
696                                                 @Override
697                                                 public byte[] getData() {
698                                                         return data;
699                                                 }
700                     
701                                         });
702                                 }
703                         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
704                             throw new ProCoreException(e1);
705                         }
706
707                         }
708
709                         @Override
710                         public void done() {
711
712                         }
713
714                 });
715
716                 if(exception instanceof SDBException) throw (SDBException)exception;
717                 else if(exception != null) throw new IllegalAcornStateException(exception);
718                 
719                 return false;
720                 
721         }
722         
723         public ServiceLocator getServiceLocator() {
724             return locator;
725         }
726
727     @Override
728     public boolean refreshEnabled() {
729         return false;
730     }
731
732     @Override
733     public boolean rolledback() {
734         return clusters.rolledback();
735     }
736     
737     public void purge() throws IllegalAcornStateException {
738         clusters.purge(locator);
739     }
740
741     public void purgeDatabase() {
742         
743             if (isClosing || unexpectedClose)
744                 return;
745             
746                 saver.execute(new Runnable() {
747
748                         @Override
749                         public void run() {
750                                 Transaction tr = null;
751                                 try {
752                                         // First take a write transaction
753                                         tr = askWriteTransaction(-1);
754                                         // Then make sure that MainProgram is idling
755                                         mainProgram.mutex.acquire();
756                                         try {
757                                                 synchronized(mainProgram) {
758                                                         if(mainProgram.operations.isEmpty()) {
759                                                                 purge();
760                                                         } else {
761                                                                 // MainProgram is becoming busy again - delay snapshotting
762                                                                 return;
763                                                         }
764                                                 }
765                                         } finally {
766                                                 mainProgram.mutex.release();
767                                         }
768                                 } catch (IllegalAcornStateException | ProCoreException e) {
769                                     LOGGER.error("Purge failed", e);
770                                         unexpectedClose = true;
771                                 } catch (InterruptedException e) {
772                                     LOGGER.error("Purge interrupted", e);
773                                 } finally {
774                                         try {
775                                                 if(tr != null)
776                                                         endTransaction(tr.getTransactionId());
777                                                 if (unexpectedClose) {
778                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
779                             try {
780                                 support.close();
781                             } catch (DatabaseException e1) {
782                                 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
783                             }
784                                                 }
785                                         } catch (ProCoreException e) {
786                                             LOGGER.error("Failed to end purge write transaction", e);
787                                         }
788                                 }
789                         }
790                 });
791         
792     }
793     
794     public long getTailChangeSetId() {
795         return clusters.getTailChangeSetId();
796     }
797     
798 }
799