]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Merge branch 'feature/funcwrite'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.acorn.lru.ClusterInfo;
32 import org.simantics.acorn.lru.ClusterStreamChunk;
33 import org.simantics.acorn.lru.ClusterUpdateOperation;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import gnu.trove.map.hash.TLongObjectHashMap;
49
50 public class GraphClientImpl2 implements Database.Session {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
53         public static final boolean DEBUG = false;
54
55         public final ClusterManager clusters;
56         
57         private TransactionManager transactionManager = new TransactionManager();
58         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
59         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
60
61         private Path dbFolder;
62         private final Database database;
63         private ServiceLocator locator;
64         private MainProgram mainProgram;
65
66         static class ClientThreadFactory implements ThreadFactory {
67                 
68                 final String name;
69                 final boolean daemon;
70                 
71                 public ClientThreadFactory(String name, boolean daemon) {
72                         this.name = name;
73                         this.daemon = daemon;
74                 }
75                 
76                 @Override
77                 public Thread newThread(Runnable r) {
78                         Thread thread = new Thread(r, name);
79                         thread.setDaemon(daemon);
80                         return thread;
81                 }
82         }
83
84         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
85             this.database = database;
86             this.dbFolder = dbFolder;
87             this.locator = locator;
88             this.clusters = new ClusterManager(dbFolder);
89             load();
90             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
91             cssi.setReadDirectory(clusters.lastSessionDirectory);
92             cssi.updateWriteDirectory(clusters.workingDirectory);
93             mainProgram = new MainProgram(this, clusters);
94             executor.execute(mainProgram);
95         }
96
97         public Path getDbFolder() {
98             return dbFolder;
99         }
100
101         public void tryMakeSnapshot() throws IOException {
102                 
103             if (isClosing || unexpectedClose)
104                 return;
105             
106                 saver.execute(new Runnable() {
107
108                         @Override
109                         public void run() {
110                                 Transaction tr = null;
111                                 try {
112                                         // First take a write transaction
113                                         tr = askWriteTransaction(-1);
114                                         // Then make sure that MainProgram is idling
115                                         mainProgram.mutex.acquire();
116                                         try {
117                                                 synchronized(mainProgram) {
118                                                         if(mainProgram.operations.isEmpty()) {
119                                                                 makeSnapshot(false);
120                                                         } else {
121                                                                 // MainProgram is becoming busy again - delay snapshotting
122                                                                 return;
123                                                         }
124                                                 }
125                                         } finally {
126                                                 mainProgram.mutex.release();
127                                         }
128                                 } catch (IllegalAcornStateException | ProCoreException e) {
129                                         LOGGER.error("Snapshotting failed", e);
130                                         unexpectedClose = true;
131                                 } catch (InterruptedException e) {
132                                     LOGGER.error("Snapshotting interrupted", e);
133                                 } finally {
134                                         try {
135                                                 if(tr != null)
136                                                         endTransaction(tr.getTransactionId());
137                                                 if (unexpectedClose) {
138                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
139                             try {
140                                 support.close();
141                             } catch (DatabaseException e1) {
142                                 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
143                             }
144                                                 }
145                                         } catch (ProCoreException e) {
146                                             LOGGER.error("Failed to end snapshotting write transaction", e);
147                                         }
148                                 }
149                         }
150                 });
151         }
152         
153     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
154         clusters.makeSnapshot(locator, fullSave);
155     }
156         
157         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
158             try {
159             return clusters.clone(uid, creator);
160         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
161             unexpectedClose = true;
162             throw new DatabaseException(e);
163         }
164         }
165
166 //      private void save() throws IOException {
167 //              clusters.save();
168 //      }
169         
170         public void load() throws IOException {
171                 clusters.load();
172         }
173         
174 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
175 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
176 //      }
177
178         @Override
179         public Database getDatabase() {
180                 return database;
181         }
182
183         private boolean closed = false;
184         private boolean isClosing = false;
185         private boolean unexpectedClose = false;
186         
187         @Override
188         public void close() throws ProCoreException {
189             LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
190                 if(!closed && !isClosing) {
191                     isClosing = true;
192                         try {
193                             if (!unexpectedClose)
194                                 makeSnapshot(true);
195                                 
196                                 mainProgram.close();
197                                 clusters.shutdown();
198                                 executor.shutdown();
199                                 saver.shutdown();
200                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
201                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
202                                 
203                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
204                                 
205                                 mainProgram = null;
206                                 executor = null;
207                                 saver = null;
208                                 
209                         } catch (IllegalAcornStateException | InterruptedException e) {
210                                 throw new ProCoreException(e);
211                         }
212                         closed = true;
213                 }
214                 //impl.close();
215         }
216
217         @Override
218         public void open() throws ProCoreException {
219                 throw new UnsupportedOperationException();
220         }
221
222         @Override
223         public boolean isClosed() throws ProCoreException {
224                 return closed;
225         }
226         
227         @Override
228         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
229                 clusters.state.headChangeSetId++;
230                 long committedChangeSetId = changeSetId + 1;
231                 try {
232                 clusters.commitChangeSet(committedChangeSetId, metadata);
233                 
234                 clusters.state.transactionId = transactionId;
235                 
236                 mainProgram.committed();
237                 
238                 TimeLogger.log("Accepted commit");
239                 } catch (IllegalAcornStateException e) {
240                     throw new ProCoreException(e);
241                 }
242         }
243
244         @Override
245         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
246                 // Accept and finalize current transaction and then undo it
247                 acceptCommit(transactionId, changeSetId, metadata);
248
249                 try {
250                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
251                         clusters.state.headChangeSetId++;
252                         return clusters.state.headChangeSetId;
253                 } catch (SDBException e) {
254                     LOGGER.error("Failed to undo cancelled transaction", e);
255                         throw new ProCoreException(e);
256                 }
257         }
258
259         @Override
260         public Transaction askReadTransaction() throws ProCoreException {
261                 return transactionManager.askReadTransaction();
262         }
263
264         enum TransactionState {
265                 IDLE,WRITE,READ
266         }
267         
268         class TransactionRequest {
269                 public TransactionState state;
270                 public Semaphore semaphore;
271                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
272                         this.state = state;
273                         this.semaphore = semaphore;
274                 }
275         }
276
277         class TransactionManager {
278
279                 private TransactionState currentTransactionState = TransactionState.IDLE;
280                 
281                 private int reads = 0;
282                 
283                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
284                 
285                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
286                 
287                 private synchronized Transaction makeTransaction(TransactionRequest req) {
288                         
289                         final int csId = clusters.state.headChangeSetId;
290                         final long trId = clusters.state.transactionId+1;
291                         requestMap.put(trId, req);
292                         return new Transaction() {
293                                 
294                                 @Override
295                                 public long getTransactionId() {
296                                         return trId;
297                                 }
298                                 
299                                 @Override
300                                 public long getHeadChangeSetId() {
301                                         return csId;
302                                 }
303                         };
304                 }
305                 
306                 /*
307                  * This method cannot be synchronized since it waits and must support multiple entries
308                  * by query thread(s) and internal transactions such as snapshot saver
309                  */
310                 public Transaction askReadTransaction() throws ProCoreException {
311                 
312                         Semaphore semaphore = new Semaphore(0);
313                         
314                         TransactionRequest req = queue(TransactionState.READ, semaphore);
315                         
316                         try {
317                                 semaphore.acquire();
318                         } catch (InterruptedException e) {
319                                 throw new ProCoreException(e);
320                         }
321                         
322                         return makeTransaction(req);
323
324                 }
325
326                 private synchronized void dispatch() {
327                         TransactionRequest r = requests.removeFirst();
328                         if(r.state == TransactionState.READ) reads++;
329                         r.semaphore.release();
330                 }
331                 
332                 private synchronized void processRequests() {
333                         
334                         while(true) {
335
336                                 if(requests.isEmpty()) return;
337                                 TransactionRequest req = requests.peek();
338
339                                 if(currentTransactionState == TransactionState.IDLE) {
340                                 
341                                         // Accept anything while IDLE
342                                         currentTransactionState = req.state;
343                                         dispatch();
344                                         
345                                 } else if (currentTransactionState == TransactionState.READ) {
346                                         
347                                         if(req.state == currentTransactionState) {
348
349                                                 // Allow other reads
350                                                 dispatch();
351
352                                         } else {
353                                                 
354                                                 // Wait
355                                                 return;
356                                                 
357                                         }
358                                         
359                                 }  else if (currentTransactionState == TransactionState.WRITE) {
360
361                                         // Wait
362                                         return;
363                                         
364                                 }
365                                 
366                         }
367                         
368                 }
369                 
370                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
371                         TransactionRequest req = new TransactionRequest(state, semaphore); 
372                         requests.addLast(req);
373                         processRequests();
374                         return req;
375                 }
376                 
377                 /*
378                  * This method cannot be synchronized since it waits and must support multiple entries
379                  * by query thread(s) and internal transactions such as snapshot saver
380                  */
381                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
382                         
383                         Semaphore semaphore = new Semaphore(0);
384                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
385                         
386                         try {
387                                 semaphore.acquire();
388                         } catch (InterruptedException e) {
389                                 throw new IllegalAcornStateException(e);
390                         }
391                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
392                         return makeTransaction(req);
393                 }
394                 
395                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
396                         
397                         TransactionRequest req = requestMap.remove(transactionId);
398                         if(req.state == TransactionState.WRITE) {
399                                 currentTransactionState = TransactionState.IDLE;
400                                 processRequests();
401                         } else {
402                                 reads--;
403                                 if(reads == 0) {
404                                         currentTransactionState = TransactionState.IDLE;
405                                         processRequests();
406                                 }
407                         }
408                         return clusters.state.transactionId;
409                 }
410
411         }
412         
413         @Override
414         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
415                 try {
416                     if (isClosing || unexpectedClose || closed) {
417                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
418                     }
419             return transactionManager.askWriteTransaction();
420         } catch (IllegalAcornStateException e) {
421             throw new ProCoreException(e);
422         }
423         }
424
425         @Override
426         public long endTransaction(long transactionId) throws ProCoreException {
427                 return transactionManager.endTransaction(transactionId);
428         }
429
430         @Override
431         public String execute(String command) throws ProCoreException {
432                 // This is called only by WriteGraphImpl.commitAccessorChanges
433                 // We can ignore this in Acorn
434                 return "";
435         }
436
437         @Override
438         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
439                 try {
440             return clusters.getMetadata(changeSetId);
441         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
442             throw new ProCoreException(e);
443         }
444         }
445
446         @Override
447         public ChangeSetData getChangeSetData(long minChangeSetId,
448                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
449                         throws ProCoreException {
450                 
451                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
452                 return null;
453                 
454         }
455
456         @Override
457         public ChangeSetIds getChangeSetIds() throws ProCoreException {
458                 throw new UnsupportedOperationException();
459         }
460
461         @Override
462         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
463                 throw new UnsupportedOperationException();
464         }
465
466         @Override
467         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
468                         throws ProCoreException {
469                 throw new UnsupportedOperationException();
470         }
471
472         @Override
473         public ClusterIds getClusterIds() throws ProCoreException {
474                 try {
475             return clusters.getClusterIds();
476         } catch (IllegalAcornStateException e) {
477             throw new ProCoreException(e);
478         }
479         }
480
481         @Override
482         public Information getInformation() throws ProCoreException {
483                 return new Information() {
484
485                         @Override
486                         public String getServerId() {
487                                 return "server";
488                         }
489
490                         @Override
491                         public String getProtocolId() {
492                                 return "";
493                         }
494
495                         @Override
496                         public String getDatabaseId() {
497                                 return "database";
498                         }
499
500                         @Override
501                         public long getFirstChangeSetId() {
502                                 return 0;
503                         }
504                         
505                 };
506         }
507
508         @Override
509         public Refresh getRefresh(long changeSetId) throws ProCoreException {
510                 
511                 final ClusterIds ids = getClusterIds();
512                 
513                 return new Refresh() {
514
515                         @Override
516                         public long getHeadChangeSetId() {
517                                 return clusters.state.headChangeSetId;
518                         }
519
520                         @Override
521                         public long[] getFirst() {
522                                 return ids.getFirst();
523                         }
524
525                         @Override
526                         public long[] getSecond() {
527                                 return ids.getSecond();
528                         }
529                         
530                 };
531                 
532         }
533
534         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
535                 return clusters.getResourceFile(clusterUID, resourceIndex);
536         }
537
538         @Override
539         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
540                 try {
541             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
542         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
543             throw new ProCoreException(e);
544         }
545         }
546
547         @Override
548         public long reserveIds(int count) throws ProCoreException {
549                 return clusters.state.reservedIds++;
550         }
551
552         @Override
553         public void updateCluster(byte[] operations) throws ProCoreException {
554             ClusterInfo info = null;
555             try {
556                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
557                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
558                 if(info == null)
559                     throw new IllegalAcornStateException("info == null for operation " + operation);
560                 info.acquireMutex();
561                         info.scheduleUpdate();
562                         mainProgram.schedule(operation);
563                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
564             throw new ProCoreException(e);
565         } finally {
566             if (info != null)
567                 info.releaseMutex();
568                 }
569         }
570
571         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
572
573                 String[] ss = ccsId.split("\\.");
574                 String chunkKey = ss[0];
575                 int chunkOffset = Integer.parseInt(ss[1]);
576                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
577                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
578                 chunk.acquireMutex();
579                 try {
580                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
581                 } catch (DatabaseException e) {
582                     throw e;
583                 } catch (Throwable t) {
584                         throw new IllegalStateException(t);
585                 } finally {
586                         chunk.releaseMutex();
587                 }
588         }
589         
590         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
591                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
592
593                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
594
595                 clusters.clusterLRU.acquireMutex();
596                 try {
597
598                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
599                         for(int i=0;i<proc.entries.size();i++) {
600                                 
601                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
602                                 e.process(clusters, cs, clusterKey);
603                         }
604                         cs.flush();
605
606                 } finally {
607                         clusters.clusterLRU.releaseMutex();
608                 }
609         }
610
611         @Override
612         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
613
614                 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
615
616                         @Override
617                         public void run() throws Exception {
618
619                                 try {
620
621                                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
622                                 
623                                 UndoClusterSupport support = new UndoClusterSupport(clusters);
624                                 
625                                 final int changeSetId = clusters.state.headChangeSetId;
626                                 
627                                 if(ClusterUpdateProcessorBase.DEBUG)
628                                         System.err.println(" === BEGIN UNDO ===");
629                                 
630                                 for(int i=0;i<changeSetIds.length;i++) {
631                                         final long id = changeSetIds[changeSetIds.length-1-i];
632                                         ArrayList<String> ccss = clusters.getChanges(id);
633                     
634                                         for(int j=0;j<ccss.size();j++) {
635                                                 String ccsid = ccss.get(ccss.size()-j-1);
636                                                 try {
637                                                         if(ClusterUpdateProcessorBase.DEBUG)
638                                                                 System.err.println("performUndo " + ccsid);
639                                                         performUndo(ccsid, clusterChanges, support);
640                                                 } catch (DatabaseException e) {
641                                                         e.printStackTrace();
642                                                 }
643                                         }
644                                 }
645                     
646                                 if(ClusterUpdateProcessorBase.DEBUG)
647                                         System.err.println(" === END UNDO ===");
648                     
649                                 for(int i=0;i<clusterChanges.size();i++) {
650                                         
651                                         final int changeSetIndex = i;
652                                         
653                                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
654                                         
655                                         final ClusterUID cuid = pair.first;
656                                         final byte[] data = pair.second;
657                     
658                                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
659                     
660                                                 @Override
661                                                 public long getChangeSetId() {
662                                                         return changeSetId;
663                                                 }
664                     
665                                                 @Override
666                                                 public int getChangeSetIndex() {
667                                                         return 0;
668                                                 }
669                     
670                                                 @Override
671                                                 public int getNumberOfClusterChangeSets() {
672                                                         return clusterChanges.size();
673                                                 }
674                     
675                                                 @Override
676                                                 public int getIndexOfClusterChangeSet() {
677                                                         return changeSetIndex;
678                                                 }
679                     
680                                                 @Override
681                                                 public byte[] getClusterId() {
682                                                         return cuid.asBytes();
683                                                 }
684                     
685                                                 @Override
686                                                 public boolean getNewCluster() {
687                                                         return false;
688                                                 }
689                     
690                                                 @Override
691                                                 public byte[] getData() {
692                                                         return data;
693                                                 }
694                     
695                                         });
696                                 }
697                         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
698                             throw new ProCoreException(e1);
699                         }
700
701                         }
702
703                         @Override
704                         public void done() {
705
706                         }
707
708                 });
709
710                 if(exception instanceof SDBException) throw (SDBException)exception;
711                 else if(exception != null) throw new IllegalAcornStateException(exception);
712                 
713                 return false;
714                 
715         }
716         
717         public ServiceLocator getServiceLocator() {
718             return locator;
719         }
720
721     @Override
722     public boolean refreshEnabled() {
723         return false;
724     }
725
726     @Override
727     public boolean rolledback() {
728         return clusters.rolledback();
729     }
730     
731     public void purge() throws IllegalAcornStateException {
732         clusters.purge(locator);
733     }
734
735     public void purgeDatabase() {
736         
737             if (isClosing || unexpectedClose)
738                 return;
739             
740                 saver.execute(new Runnable() {
741
742                         @Override
743                         public void run() {
744                                 Transaction tr = null;
745                                 try {
746                                         // First take a write transaction
747                                         tr = askWriteTransaction(-1);
748                                         // Then make sure that MainProgram is idling
749                                         mainProgram.mutex.acquire();
750                                         try {
751                                                 synchronized(mainProgram) {
752                                                         if(mainProgram.operations.isEmpty()) {
753                                                                 purge();
754                                                         } else {
755                                                                 // MainProgram is becoming busy again - delay snapshotting
756                                                                 return;
757                                                         }
758                                                 }
759                                         } finally {
760                                                 mainProgram.mutex.release();
761                                         }
762                                 } catch (IllegalAcornStateException | ProCoreException e) {
763                                     LOGGER.error("Purge failed", e);
764                                         unexpectedClose = true;
765                                 } catch (InterruptedException e) {
766                                     LOGGER.error("Purge interrupted", e);
767                                 } finally {
768                                         try {
769                                                 if(tr != null)
770                                                         endTransaction(tr.getTransactionId());
771                                                 if (unexpectedClose) {
772                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
773                             try {
774                                 support.close();
775                             } catch (DatabaseException e1) {
776                                 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
777                             }
778                                                 }
779                                         } catch (ProCoreException e) {
780                                             LOGGER.error("Failed to end purge write transaction", e);
781                                         }
782                                 }
783                         }
784                 });
785         
786     }
787     
788     public long getTailChangeSetId() {
789         return clusters.getTailChangeSetId();
790     }
791     
792 }
793