Fixed Acorn deadlock during snapshotting 11/1511/9
authorAntti Villberg <antti.villberg@semantum.fi>
Wed, 28 Feb 2018 07:56:08 +0000 (09:56 +0200)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Wed, 28 Feb 2018 23:24:20 +0000 (01:24 +0200)
refs #7713

Change-Id: Ibb3e87bb6ab4d13bfb6445bf55e16ca8555bee29

bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java [new file with mode: 0644]
bundles/org.simantics.acorn/src/org/simantics/acorn/backup/AcornBackupProvider.java

index 09929f775b728ab02a9f808d5b3576d7cb1a6cd2..7a57053bcf6aa9f1af09548424bfe00c59456751 100644 (file)
  *******************************************************************************/
 package org.simantics.acorn;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.backup.AcornBackupProvider;
+import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
 import org.simantics.acorn.exception.AcornAccessVerificationException;
 import org.simantics.acorn.exception.IllegalAcornStateException;
 import org.simantics.acorn.internal.ClusterChange;
@@ -31,6 +36,7 @@ import org.simantics.acorn.lru.ClusterChangeSet.Entry;
 import org.simantics.acorn.lru.ClusterInfo;
 import org.simantics.acorn.lru.ClusterStreamChunk;
 import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.backup.BackupException;
 import org.simantics.db.ClusterCreator;
 import org.simantics.db.Database;
 import org.simantics.db.ServiceLocator;
@@ -40,6 +46,7 @@ import org.simantics.db.server.ProCoreException;
 import org.simantics.db.service.ClusterSetsSupport;
 import org.simantics.db.service.ClusterUID;
 import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.DataContainer;
 import org.simantics.utils.datastructures.Pair;
 import org.simantics.utils.logging.TimeLogger;
 import org.slf4j.Logger;
@@ -52,8 +59,8 @@ public class GraphClientImpl2 implements Database.Session {
     private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
        public static final boolean DEBUG = false;
 
-       public final ClusterManager clusters;
-       
+       final ClusterManager clusters;
+
        private TransactionManager transactionManager = new TransactionManager();
        private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
        private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
@@ -63,16 +70,16 @@ public class GraphClientImpl2 implements Database.Session {
        private ServiceLocator locator;
        private MainProgram mainProgram;
 
-       static class ClientThreadFactory implements ThreadFactory {
-               
+       private static class ClientThreadFactory implements ThreadFactory {
+
                final String name;
                final boolean daemon;
-               
+
                public ClientThreadFactory(String name, boolean daemon) {
                        this.name = name;
                        this.daemon = daemon;
                }
-               
+
                @Override
                public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, name);
@@ -87,7 +94,7 @@ public class GraphClientImpl2 implements Database.Session {
            this.locator = locator;
            this.clusters = new ClusterManager(dbFolder);
            load();
-           ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
+           ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
            cssi.setReadDirectory(clusters.lastSessionDirectory);
            cssi.updateWriteDirectory(clusters.workingDirectory);
            mainProgram = new MainProgram(this, clusters);
@@ -98,11 +105,15 @@ public class GraphClientImpl2 implements Database.Session {
            return dbFolder;
        }
 
-       public void tryMakeSnapshot() throws IOException {
-               
-           if (isClosing || unexpectedClose)
-               return;
-           
+       /*
+        * This method schedules snapshotting.
+        * No lock and thread restrictions.
+        */
+       void tryMakeSnapshot() throws IOException {
+
+               if (isClosing || unexpectedClose)
+                       return;
+
                saver.execute(new Runnable() {
 
                        @Override
@@ -112,35 +123,24 @@ public class GraphClientImpl2 implements Database.Session {
                                        // First take a write transaction
                                        tr = askWriteTransaction(-1);
                                        // Then make sure that MainProgram is idling
-                                       mainProgram.mutex.acquire();
-                                       try {
-                                               synchronized(mainProgram) {
-                                                       if(mainProgram.operations.isEmpty()) {
-                                                               makeSnapshot(false);
-                                                       } else {
-                                                               // MainProgram is becoming busy again - delay snapshotting
-                                                               return;
-                                                       }
-                                               }
-                                       } finally {
-                                               mainProgram.mutex.release();
-                                       }
+                                       synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
                                } catch (IllegalAcornStateException | ProCoreException e) {
                                        LOGGER.error("Snapshotting failed", e);
                                        unexpectedClose = true;
-                               } catch (InterruptedException e) {
-                                   LOGGER.error("Snapshotting interrupted", e);
+                               } catch (SDBException e) {
+                                       LOGGER.error("Snapshotting failed", e);
+                                       unexpectedClose = true;
                                } finally {
                                        try {
                                                if(tr != null)
                                                        endTransaction(tr.getTransactionId());
                                                if (unexpectedClose) {
-                                               LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
-                           try {
-                               support.close();
-                           } catch (DatabaseException e1) {
-                               LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
-                           }
+                                                       LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+                                                       try {
+                                                               support.close();
+                                                       } catch (DatabaseException e1) {
+                                                               LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
+                                                       }
                                                }
                                        } catch (ProCoreException e) {
                                            LOGGER.error("Failed to end snapshotting write transaction", e);
@@ -149,11 +149,12 @@ public class GraphClientImpl2 implements Database.Session {
                        }
                });
        }
-       
-    public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
-        clusters.makeSnapshot(locator, fullSave);
-    }
-       
+
+       private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+               clusters.makeSnapshot(locator, fullSave);
+       }
+
+       @Override
        public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
            try {
             return clusters.clone(uid, creator);
@@ -163,17 +164,9 @@ public class GraphClientImpl2 implements Database.Session {
         }
        }
 
-//     private void save() throws IOException {
-//             clusters.save();
-//     }
-       
-       public void load() throws IOException {
+       private void load() throws IOException {
                clusters.load();
        }
-       
-//     public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
-//             clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
-//     }
 
        @Override
        public Database getDatabase() {
@@ -183,23 +176,25 @@ public class GraphClientImpl2 implements Database.Session {
        private boolean closed = false;
        private boolean isClosing = false;
        private boolean unexpectedClose = false;
-       
+
        @Override
        public void close() throws ProCoreException {
-           LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
+               LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
                if(!closed && !isClosing) {
-                   isClosing = true;
+                       isClosing = true;
                        try {
-                           if (!unexpectedClose)
-                               makeSnapshot(true);
+
+                               if (!unexpectedClose)
+                                       synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
 
                                mainProgram.close();
                                clusters.shutdown();
                                executor.shutdown();
                                saver.shutdown();
+
                                boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
                                boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
-                               
+
                                LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
 
                                try {
@@ -211,9 +206,11 @@ public class GraphClientImpl2 implements Database.Session {
                                mainProgram = null;
                                executor = null;
                                saver = null;
-                               
+
                        } catch (IllegalAcornStateException | InterruptedException e) {
                                throw new ProCoreException(e);
+                       } catch (SDBException e1) {
+                               throw new ProCoreException(e1);
                        }
                        closed = true;
                }
@@ -229,19 +226,16 @@ public class GraphClientImpl2 implements Database.Session {
        public boolean isClosed() throws ProCoreException {
                return closed;
        }
-       
+
        @Override
        public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
                clusters.state.headChangeSetId++;
                long committedChangeSetId = changeSetId + 1;
                try {
-               clusters.commitChangeSet(committedChangeSetId, metadata);
-               
-               clusters.state.transactionId = transactionId;
-               
-               mainProgram.committed();
-               
-               TimeLogger.log("Accepted commit");
+                       clusters.commitChangeSet(committedChangeSetId, metadata);
+                       clusters.state.transactionId = transactionId;
+                       mainProgram.committed();
+                       TimeLogger.log("Accepted commit");
                } catch (IllegalAcornStateException e) {
                    throw new ProCoreException(e);
                }
@@ -257,7 +251,7 @@ public class GraphClientImpl2 implements Database.Session {
                        clusters.state.headChangeSetId++;
                        return clusters.state.headChangeSetId;
                } catch (SDBException e) {
-                   LOGGER.error("Failed to undo cancelled transaction", e);
+                       LOGGER.error("Failed to undo cancelled transaction", e);
                        throw new ProCoreException(e);
                }
        }
@@ -267,11 +261,11 @@ public class GraphClientImpl2 implements Database.Session {
                return transactionManager.askReadTransaction();
        }
 
-       enum TransactionState {
+       private enum TransactionState {
                IDLE,WRITE,READ
        }
-       
-       class TransactionRequest {
+
+       private class TransactionRequest {
                public TransactionState state;
                public Semaphore semaphore;
                public TransactionRequest(TransactionState state, Semaphore semaphore) {
@@ -280,51 +274,51 @@ public class GraphClientImpl2 implements Database.Session {
                }
        }
 
-       class TransactionManager {
+       private class TransactionManager {
 
                private TransactionState currentTransactionState = TransactionState.IDLE;
-               
+
                private int reads = 0;
-               
-               LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
-               
-               TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
-               
+
+               private LinkedList<TransactionRequest> requests = new LinkedList<>();
+
+               private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
+
                private synchronized Transaction makeTransaction(TransactionRequest req) {
-                       
+
                        final int csId = clusters.state.headChangeSetId;
                        final long trId = clusters.state.transactionId+1;
                        requestMap.put(trId, req);
                        return new Transaction() {
-                               
+
                                @Override
                                public long getTransactionId() {
                                        return trId;
                                }
-                               
+
                                @Override
                                public long getHeadChangeSetId() {
                                        return csId;
                                }
                        };
                }
-               
+
                /*
                 * This method cannot be synchronized since it waits and must support multiple entries
                 * by query thread(s) and internal transactions such as snapshot saver
                 */
-               public Transaction askReadTransaction() throws ProCoreException {
-               
+               private Transaction askReadTransaction() throws ProCoreException {
+
                        Semaphore semaphore = new Semaphore(0);
-                       
+
                        TransactionRequest req = queue(TransactionState.READ, semaphore);
-                       
+
                        try {
                                semaphore.acquire();
                        } catch (InterruptedException e) {
                                throw new ProCoreException(e);
                        }
-                       
+
                        return makeTransaction(req);
 
                }
@@ -334,61 +328,61 @@ public class GraphClientImpl2 implements Database.Session {
                        if(r.state == TransactionState.READ) reads++;
                        r.semaphore.release();
                }
-               
+
                private synchronized void processRequests() {
-                       
+
                        while(true) {
 
                                if(requests.isEmpty()) return;
                                TransactionRequest req = requests.peek();
 
                                if(currentTransactionState == TransactionState.IDLE) {
-                               
+
                                        // Accept anything while IDLE
                                        currentTransactionState = req.state;
                                        dispatch();
-                                       
+
                                } else if (currentTransactionState == TransactionState.READ) {
-                                       
+
                                        if(req.state == currentTransactionState) {
 
                                                // Allow other reads
                                                dispatch();
 
                                        } else {
-                                               
+
                                                // Wait
                                                return;
-                                               
+
                                        }
-                                       
+
                                }  else if (currentTransactionState == TransactionState.WRITE) {
 
                                        // Wait
                                        return;
-                                       
+
                                }
-                               
+
                        }
-                       
+
                }
-               
+
                private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
-                       TransactionRequest req = new TransactionRequest(state, semaphore); 
+                       TransactionRequest req = new TransactionRequest(state, semaphore);
                        requests.addLast(req);
                        processRequests();
                        return req;
                }
-               
+
                /*
                 * This method cannot be synchronized since it waits and must support multiple entries
                 * by query thread(s) and internal transactions such as snapshot saver
                 */
-               public Transaction askWriteTransaction() throws IllegalAcornStateException {
-                       
+               private Transaction askWriteTransaction() throws IllegalAcornStateException {
+
                        Semaphore semaphore = new Semaphore(0);
                        TransactionRequest req = queue(TransactionState.WRITE, semaphore);
-                       
+
                        try {
                                semaphore.acquire();
                        } catch (InterruptedException e) {
@@ -397,9 +391,9 @@ public class GraphClientImpl2 implements Database.Session {
                        mainProgram.startTransaction(clusters.state.headChangeSetId+1);
                        return makeTransaction(req);
                }
-               
-               public synchronized long endTransaction(long transactionId) throws ProCoreException {
-                       
+
+               private synchronized long endTransaction(long transactionId) throws ProCoreException {
+
                        TransactionRequest req = requestMap.remove(transactionId);
                        if(req.state == TransactionState.WRITE) {
                                currentTransactionState = TransactionState.IDLE;
@@ -415,7 +409,7 @@ public class GraphClientImpl2 implements Database.Session {
                }
 
        }
-       
+
        @Override
        public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
                try {
@@ -453,10 +447,10 @@ public class GraphClientImpl2 implements Database.Session {
        public ChangeSetData getChangeSetData(long minChangeSetId,
                        long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
                        throws ProCoreException {
-               
+
                new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
                return null;
-               
+
        }
 
        @Override
@@ -507,15 +501,15 @@ public class GraphClientImpl2 implements Database.Session {
                        public long getFirstChangeSetId() {
                                return 0;
                        }
-                       
+
                };
        }
 
        @Override
        public Refresh getRefresh(long changeSetId) throws ProCoreException {
-               
+
                final ClusterIds ids = getClusterIds();
-               
+
                return new Refresh() {
 
                        @Override
@@ -532,15 +526,15 @@ public class GraphClientImpl2 implements Database.Session {
                        public long[] getSecond() {
                                return ids.getSecond();
                        }
-                       
+
                };
-               
-       }
 
-       public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
-               return clusters.getResourceFile(clusterUID, resourceIndex);
        }
 
+//     public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
+//             return clusters.getResourceFile(clusterUID, resourceIndex);
+//     }
+
        @Override
        public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
                try {
@@ -592,7 +586,7 @@ public class GraphClientImpl2 implements Database.Session {
                        chunk.releaseMutex();
                }
        }
-       
+
        private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
                UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
 
@@ -603,7 +597,7 @@ public class GraphClientImpl2 implements Database.Session {
 
                        ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
                        for(int i=0;i<proc.entries.size();i++) {
-                               
+
                                Entry e = proc.entries.get(proc.entries.size() - 1 - i);
                                e.process(clusters, cs, clusterKey);
                        }
@@ -614,10 +608,57 @@ public class GraphClientImpl2 implements Database.Session {
                }
        }
 
+       private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
+
+               Exception[] exception = { null };
+               Semaphore s = new Semaphore(0);
+
+               mainProgram.runIdle(new MainProgramRunnable() {
+
+                       @Override
+                       public void success() {
+                               try {
+                                   runnable.success();
+                               } finally {
+                                   s.release();
+                               }
+                       }
+
+                       @Override
+                       public void error(Exception e) {
+                               exception[0] = e;
+                               try {
+                                   runnable.error(e);
+                               } finally {
+                                   s.release();
+                               }
+                       }
+
+                       @Override
+                       public void run() throws Exception {
+                               runnable.run();
+                       }
+
+               });
+
+               try {
+                       s.acquire();
+               } catch (InterruptedException e) {
+                       throw new IllegalAcornStateException("Unhandled interruption.", e);
+               }
+
+               Exception e = exception[0];
+               if(e != null) {
+                       if(e instanceof SDBException) throw (SDBException)e;
+                       else if(e != null) throw new IllegalAcornStateException(e);
+               }
+
+       }
+
        @Override
        public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
 
-               Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
+               synchronizeWithIdleMainProgram(new MainProgramRunnable() {
 
                        @Override
                        public void run() throws Exception {
@@ -625,18 +666,18 @@ public class GraphClientImpl2 implements Database.Session {
                                try {
 
                                final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
-                               
+
                                UndoClusterSupport support = new UndoClusterSupport(clusters);
-                               
+
                                final int changeSetId = clusters.state.headChangeSetId;
-                               
+
                                if(ClusterUpdateProcessorBase.DEBUG)
                                        LOGGER.info(" === BEGIN UNDO ===");
-                               
+
                                for(int i=0;i<changeSetIds.length;i++) {
                                        final long id = changeSetIds[changeSetIds.length-1-i];
                                        ArrayList<String> ccss = clusters.getChanges(id);
-                   
+
                                        for(int j=0;j<ccss.size();j++) {
                                                String ccsid = ccss.get(ccss.size()-j-1);
                                                try {
@@ -648,56 +689,56 @@ public class GraphClientImpl2 implements Database.Session {
                                                }
                                        }
                                }
-                   
+
                                if(ClusterUpdateProcessorBase.DEBUG)
                                        LOGGER.info(" === END UNDO ===");
-                   
+
                                for(int i=0;i<clusterChanges.size();i++) {
-                                       
+
                                        final int changeSetIndex = i;
-                                       
+
                                        final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
-                                       
+
                                        final ClusterUID cuid = pair.first;
                                        final byte[] data = pair.second;
-                   
+
                                        onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
-                   
+
                                                @Override
                                                public long getChangeSetId() {
                                                        return changeSetId;
                                                }
-                   
+
                                                @Override
                                                public int getChangeSetIndex() {
                                                        return 0;
                                                }
-                   
+
                                                @Override
                                                public int getNumberOfClusterChangeSets() {
                                                        return clusterChanges.size();
                                                }
-                   
+
                                                @Override
                                                public int getIndexOfClusterChangeSet() {
                                                        return changeSetIndex;
                                                }
-                   
+
                                                @Override
                                                public byte[] getClusterId() {
                                                        return cuid.asBytes();
                                                }
-                   
+
                                                @Override
                                                public boolean getNewCluster() {
                                                        return false;
                                                }
-                   
+
                                                @Override
                                                public byte[] getData() {
                                                        return data;
                                                }
-                   
+
                                        });
                                }
                        } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
@@ -706,21 +747,13 @@ public class GraphClientImpl2 implements Database.Session {
 
                        }
 
-                       @Override
-                       public void done() {
-
-                       }
-
                });
 
-               if(exception instanceof SDBException) throw (SDBException)exception;
-               else if(exception != null) throw new IllegalAcornStateException(exception);
-               
                return false;
-               
+
        }
-       
-       public ServiceLocator getServiceLocator() {
+
+       ServiceLocator getServiceLocator() {
            return locator;
        }
 
@@ -733,16 +766,16 @@ public class GraphClientImpl2 implements Database.Session {
     public boolean rolledback() {
         return clusters.rolledback();
     }
-    
-    public void purge() throws IllegalAcornStateException {
+
+    private void purge() throws IllegalAcornStateException {
         clusters.purge(locator);
     }
 
     public void purgeDatabase() {
-       
+
            if (isClosing || unexpectedClose)
                return;
-           
+
                saver.execute(new Runnable() {
 
                        @Override
@@ -752,35 +785,24 @@ public class GraphClientImpl2 implements Database.Session {
                                        // First take a write transaction
                                        tr = askWriteTransaction(-1);
                                        // Then make sure that MainProgram is idling
-                                       mainProgram.mutex.acquire();
-                                       try {
-                                               synchronized(mainProgram) {
-                                                       if(mainProgram.operations.isEmpty()) {
-                                                               purge();
-                                                       } else {
-                                                               // MainProgram is becoming busy again - delay snapshotting
-                                                               return;
-                                                       }
-                                               }
-                                       } finally {
-                                               mainProgram.mutex.release();
-                                       }
+                                       synchronizeWithIdleMainProgram(() -> purge());
                                } catch (IllegalAcornStateException | ProCoreException e) {
-                                   LOGGER.error("Purge failed", e);
+                                       LOGGER.error("Purge failed", e);
+                                       unexpectedClose = true;
+                               } catch (SDBException e) {
+                                       LOGGER.error("Purge failed", e);
                                        unexpectedClose = true;
-                               } catch (InterruptedException e) {
-                                   LOGGER.error("Purge interrupted", e);
                                } finally {
                                        try {
                                                if(tr != null)
                                                        endTransaction(tr.getTransactionId());
                                                if (unexpectedClose) {
-                                               LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
-                           try {
-                               support.close();
-                           } catch (DatabaseException e1) {
-                               LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
-                           }
+                                                       LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+                                                       try {
+                                                               support.close();
+                                                       } catch (DatabaseException e1) {
+                                                               LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
+                                                       }
                                                }
                                        } catch (ProCoreException e) {
                                            LOGGER.error("Failed to end purge write transaction", e);
@@ -788,12 +810,33 @@ public class GraphClientImpl2 implements Database.Session {
                                }
                        }
                });
-       
+
     }
-    
+
     public long getTailChangeSetId() {
        return clusters.getTailChangeSetId();
     }
-    
+
+       public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
+
+               makeSnapshot(true);
+
+               Path dbDir = getDbFolder();
+               int newestFolder = clusters.mainState.headDir - 1;
+               int latestFolder = -2;
+               Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
+               if (Files.exists(AcornMetadataFile)) {
+                       try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
+                               latestFolder = Integer.parseInt( br.readLine() );
+                       }
+               }
+
+               AcornBackupRunnable r = new AcornBackupRunnable(
+                               lock, targetPath, revision, dbDir, latestFolder, newestFolder);
+               new Thread(r, "Acorn backup thread").start();
+               return r;
+
+       }
+
 }
 
index b5ba2471b486b10ce91f169c2797cc0673ddc276..8dea16d7fe73a5cc0234a6e00998ad964f184513 100644 (file)
@@ -1,9 +1,9 @@
 package org.simantics.acorn;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -26,23 +26,22 @@ import org.slf4j.LoggerFactory;
 public class MainProgram implements Runnable, Closeable {
 
        private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
-       
+
        private static final int CLUSTER_THREADS = 4;
        private static final int CHUNK_CACHE_SIZE = 100;
 
        private final GraphClientImpl2 client;
-       private final ClusterManager clusters;
        private final ExecutorService[] clusterUpdateThreads;
     private final List<ClusterUpdateOperation>[] updateSchedules;
-       
-       private int residentOperationBytes = 0;
-       private long currentChangeSetId = -1;
-       private int nextChunkId = 0;
+
+       private Thread mainProgramThread;
+
        private boolean alive = true;
        private Semaphore deathBarrier = new Semaphore(0);
 
-       final Semaphore mutex = new Semaphore(1);
-       final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+       final ClusterManager clusters;
+
+       private final OperationQueue operationQueue = new OperationQueue(this);
 
        static class ClusterThreadFactory implements ThreadFactory {
 
@@ -62,7 +61,7 @@ public class MainProgram implements Runnable, Closeable {
                }
        }
 
-       public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+       MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
 
                this.client = client;
                this.clusters = clusters;
@@ -74,9 +73,8 @@ public class MainProgram implements Runnable, Closeable {
                }
        }
 
-       public void startTransaction(long id) {
-               currentChangeSetId = id;
-               nextChunkId = 0;
+       void startTransaction(long id) {
+               operationQueue.startTransaction(id);
        }
 
        private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
@@ -89,138 +87,37 @@ public class MainProgram implements Runnable, Closeable {
 
        @Override
        public void run() {
+
+               mainProgramThread = Thread.currentThread();
+
                try {
 
-                       mutex.acquire();
                        main:
                        while(alive) {
 
                                TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
 
-                               synchronized(MainProgram.this) {
-
-                                       while(!operations.isEmpty() && updates.size() < 100) {
+                               operationQueue.pumpUpdates(updates);
 
-                                               ClusterStreamChunk chunk = operations.pollFirst();
+                               if(updates.isEmpty()) {
 
-                                               for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
-                                                       ClusterUpdateOperation o = chunk.operations.get(i);
-                                                       ClusterUID uid = o.uid;
-                                                       List<ClusterUpdateOperation> ops = updates.get(uid);
-                                                       if(ops == null) {
-                                                               ops = new ArrayList<ClusterUpdateOperation>();
-                                                               updates.put(uid, ops);
-                                                       }
-                                                       ops.add(o);
-                                               }
+                                       long start = System.nanoTime();
 
-                                               chunk.nextToProcess = chunk.operations.size();
+                                       operationQueue.waitFor();
 
-                                               if(!chunk.isCommitted()) {
-                                                       assert(operations.isEmpty());
-                                                       operations.add(chunk);
-                                                       break;
-                                               }
+                                       if (!alive)
+                                               break main;
 
-                                       }
-
-                                       if(updates.isEmpty()) {
-                                               try {
-                                                       long start = System.nanoTime();
-                                                       mutex.release();
-                                                       MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed  
-                                                       mutex.acquire();
-                                                       if (!alive)
-                                                               break main;
-                                                       long duration = System.nanoTime()-start;
-                                                       if(duration > 4000000000L) {
-
-                                                               // Was this a time-out or a new stream request?
-                                                               if(operations.isEmpty()) {
-
-                                                                       /*
-                                                                        * We are idling here.
-                                                                        * Flush all caches gradually
-                                                                        */
-
-                                                                       // Write pending cs to disk
-                                                                       boolean written = clusters.csLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.csLRU.swapForced();
-                                                                       }
-                                                                       // Write pending chunks to disk
-                                                                       written = clusters.streamLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.streamLRU.swapForced();
-                                                                       }
-                                                                       // Write pending files to disk
-                                                                       written = clusters.fileLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.fileLRU.swapForced();
-                                                                       }
-                                                                       // Write pending clusters to disk
-                                                                       written = clusters.clusterLRU.swapForced();
-                                                                       while(written) {
-                                                                               if(!updates.isEmpty()) break;
-                                                                               written = clusters.clusterLRU.swapForced();
-                                                                       }
-
-                                                                       client.tryMakeSnapshot();
-                                                               }
-                                                       }
-                                               } catch (InterruptedException e) {
-                                                       e.printStackTrace();
-                                               }
+                                       long duration = System.nanoTime()-start;
+                                       if(duration > 4000000000L) {
+                                               checkIdle();
                                        }
                                }
 
 //                             long sss = System.nanoTime();
 
-                               for(int i=0;i<CLUSTER_THREADS;i++)
-                                       updateSchedules[i].clear();
-
-                               final Semaphore s = new Semaphore(0);
-
-                               for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
-                                       ClusterUID key = entry.getKey();
-                                       int hash = key.hashCode() & (clusterUpdateThreads.length-1);
-                                       updateSchedules[hash].addAll(entry.getValue());
-                               }
-
-                               //                              final AtomicLong elapsed = new AtomicLong(0);
-                               int acquireAmount = 0;
-                               for(int i=0;i<CLUSTER_THREADS;i++) {
-                                       final List<ClusterUpdateOperation> ops = updateSchedules[i];
-                                       if (!ops.isEmpty()) {
-                                               acquireAmount++;
-                                               clusterUpdateThreads[i].submit(new Callable<Object>() {
-
-                            @Override
-                            public Object call() throws Exception {
-                                //long st = System.nanoTime();
-                                try {
-                                    for(ClusterUpdateOperation op : ops) {
-                                        op.run();
-                                    }
-                                } finally {
-                                    s.release();
-                                }
-                                return null;
-                                
-    //                          long duration = System.nanoTime()-st;
-    //                          elapsed.addAndGet(duration);
-    //                          double dur = 1e-9*duration;
-    //                          if(dur > 0.05)
-    //                              System.err.println("duration=" + dur + "s. " + ops.size());
-                            }
-                        });
-                                       }
-                               }
-
-                               s.acquire(acquireAmount);
+                               runUpdates(updates);
+                               runTasksIfEmpty();
 
                                /*
                                 * Here we are actively processing updates from client.
@@ -254,48 +151,138 @@ public class MainProgram implements Runnable, Closeable {
                        deathBarrier.release();
                }
        }
-       
+
+       @FunctionalInterface
        static interface MainProgramRunnable {
-               
-               public void run() throws Exception;
-               public void done();
-               
+               void run() throws Exception;
+               default void error(Exception e) {
+                       LOGGER.error("An error occured", e);
+               }
+               default void success() {}
        }
 
-       public Exception runIdle(MainProgramRunnable runnable) {
-               try {
-                       long startTime = System.currentTimeMillis();
-                       while (true) {
-                               boolean hasMutex = false;
+       private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
+
+               for(int i=0;i<CLUSTER_THREADS;i++)
+                       updateSchedules[i].clear();
+
+               final Semaphore s = new Semaphore(0);
+
+               for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+                       ClusterUID key = entry.getKey();
+                       int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+                       updateSchedules[hash].addAll(entry.getValue());
+               }
+
+               //                              final AtomicLong elapsed = new AtomicLong(0);
+               int acquireAmount = 0;
+               for(int i=0;i<CLUSTER_THREADS;i++) {
+                       final List<ClusterUpdateOperation> ops = updateSchedules[i];
+                       if (!ops.isEmpty()) {
+                               acquireAmount++;
+                               clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+                    @Override
+                    public Object call() throws Exception {
+                        //long st = System.nanoTime();
+                        try {
+                            for(ClusterUpdateOperation op : ops) {
+                                op.run();
+                            }
+                        } finally {
+                            s.release();
+                        }
+                        return null;
+
+//                          long duration = System.nanoTime()-st;
+//                          elapsed.addAndGet(duration);
+//                          double dur = 1e-9*duration;
+//                          if(dur > 0.05)
+//                              System.err.println("duration=" + dur + "s. " + ops.size());
+                    }
+                });
+                       }
+               }
+
+               s.acquire(acquireAmount);
+
+       }
+
+       /*
+        * This shall be run when no updates are currently available.
+        */
+       private void runTasksIfEmpty() {
+               if(operationQueue.isEmpty()) {
+                       List<MainProgramRunnable> todo = new ArrayList<>();
+                       operationQueue.pumpTasks(todo);
+                       for(MainProgramRunnable runnable : todo) {
                                try {
-                                       synchronized (MainProgram.this) {
-                                               if (hasMutex = mutex.tryAcquire()) {
-                                                       if (operations.isEmpty()) {
-                                                               runnable.run();
-                                                               return null;
-                                                       }
-                                               }
-                                       }
-                                       long endTime = System.currentTimeMillis(); 
-                                       if ((endTime - startTime) > 100) {
-                                               startTime = endTime; 
-                                               LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
-                                       }
+                                       runnable.run();
+                                       runnable.success();
                                } catch (Exception e) {
-                                       return e;
-                               } finally {
-                                       if (hasMutex)
-                                               mutex.release();
+                                       runnable.error(e);
                                }
                        }
-               } finally {
-                       runnable.done();
                }
        }
-       
+
+       /*
+        * This gets called when an idle period has been detected
+        */
+       private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
+
+               // Was this a time-out or a new stream request?
+               if(operationQueue.isEmpty()) {
+
+                       /*
+                        * We are idling here.
+                        * Flush all caches gradually
+                        */
+
+                       // Write pending cs to disk
+                       boolean written = clusters.csLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.csLRU.swapForced();
+                       }
+                       // Write pending chunks to disk
+                       written = clusters.streamLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.streamLRU.swapForced();
+                       }
+                       // Write pending files to disk
+                       written = clusters.fileLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.fileLRU.swapForced();
+                       }
+                       // Write pending clusters to disk
+                       written = clusters.clusterLRU.swapForced();
+                       while(written) {
+                               if(!operationQueue.isEmpty()) break;
+                               written = clusters.clusterLRU.swapForced();
+                       }
+
+                       client.tryMakeSnapshot();
+
+               }
+
+       }
+
+
+       /*
+        * This schedules tasks to be run in MainProgram thread
+        * Called from other threads than MainProgram thread
+        *
+        */
+       void runIdle(MainProgramRunnable task) {
+               operationQueue.scheduleTask(task);
+       }
+
        /*
         * Mutex for streamLRU is assumed here
-        * 
+        *
         */
        private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
 
@@ -309,45 +296,34 @@ public class MainProgram implements Runnable, Closeable {
                while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
        }
 
-       public synchronized void committed() {
+       /*
+        * Called by DB client write threads
+        */
+       void committed() {
 
-               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               ClusterStreamChunk last = operationQueue.commitLast();
         if (!alive) {
             LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
-//          return;
         }
-               if(last != null) {
-                       last.commit();
-                       notifyAll();
-               }
 
        }
 
-       public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
-           if (!alive) {
+       /*
+        * Called by DB client write threads
+        */
+       void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
+
+               if (!alive) {
                LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
-//             return;
            }
+
                clusters.streamLRU.acquireMutex();
 
                try {
 
-                       ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
-                       if(last == null || last.isCommitted()) {
-                               String id = "" + currentChangeSetId + "-" + nextChunkId++;
-                               last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
-                               operations.add(last);
-                       }
-
-                       String chunkId = last.getKey();
-                       int chunkOffset = last.operations.size();
-                       operation.scheduled(chunkId + "." + chunkOffset);
-
-                       last.addOperation(operation);
-
+                       operationQueue.scheduleUpdate(operation);
                        swapChunks();
 
-                       notifyAll();
                } catch (IllegalAcornStateException e) {
                    throw e;
                } catch (Throwable t) {
@@ -355,14 +331,17 @@ public class MainProgram implements Runnable, Closeable {
                } finally {
                        clusters.streamLRU.releaseMutex();
                }
+
        }
 
     @Override
     public void close() {
+
         alive = false;
-        synchronized (this) {
-            notifyAll();
-        }
+
+        // This will wake up the sleeping beauty
+        operationQueue.scheduleTask(() -> {});
+
         try {
             deathBarrier.acquire();
         } catch (InterruptedException e) {
@@ -382,4 +361,12 @@ public class MainProgram implements Runnable, Closeable {
         }
     }
 
+    void assertMainProgramThread() {
+        assert(Thread.currentThread().equals(mainProgramThread));
+    }
+
+    void assertNoMainProgramThread() {
+        assert(!Thread.currentThread().equals(mainProgramThread));
+    }
+
 }
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java
new file mode 100644 (file)
index 0000000..7a9d8ab
--- /dev/null
@@ -0,0 +1,175 @@
+package org.simantics.acorn;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Provides work for MainProgram thread
+ * -cluster stream updates from client
+ * -tasks from other threads
+ *
+ * Synchronized via monitor.
+ *
+ */
+class OperationQueue {
+
+       private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
+
+       private final MainProgram mainProgram;
+
+       private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+       private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
+
+       private long currentChangeSetId = -1;
+       private int nextChunkId = 0;
+
+       OperationQueue(MainProgram mainProgram) {
+               this.mainProgram = mainProgram;
+       }
+
+       /*
+        * Called by other Acorn threads than MainProgram thread
+        */
+       synchronized void scheduleTask(MainProgramRunnable task) {
+
+               mainProgram.assertNoMainProgramThread();
+
+               tasks.add(task);
+
+               // The only thread waiting for this is MainProgram thread
+               notify();
+
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized void startTransaction(long id) {
+               currentChangeSetId = id;
+               nextChunkId = 0;
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized ClusterStreamChunk commitLast() {
+               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               if(last != null) {
+                       last.commit();
+                       notifyAll();
+               }
+               return last;
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               if(last == null || last.isCommitted()) {
+                       String id = "" + currentChangeSetId + "-" + nextChunkId++;
+                       last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
+                       operations.add(last);
+               }
+
+               String chunkId = last.getKey();
+               int chunkOffset = last.operations.size();
+               operation.scheduled(chunkId + "." + chunkOffset);
+
+               last.addOperation(operation);
+
+               // The only thread waiting for this is MainProgram thread
+               notify();
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized void pumpTasks(List<MainProgramRunnable> todo) {
+
+               mainProgram.assertMainProgramThread();
+
+               todo.addAll(tasks);
+               tasks.clear();
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
+
+               mainProgram.assertMainProgramThread();
+
+               while(!operations.isEmpty() && updates.size() < 100) {
+
+                       ClusterStreamChunk chunk = operations.pollFirst();
+
+                       for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+                               ClusterUpdateOperation o = chunk.operations.get(i);
+                               ClusterUID uid = o.uid;
+                               List<ClusterUpdateOperation> ops = updates.get(uid);
+                               if(ops == null) {
+                                       ops = new ArrayList<ClusterUpdateOperation>();
+                                       updates.put(uid, ops);
+                               }
+                               ops.add(o);
+                       }
+
+                       chunk.nextToProcess = chunk.operations.size();
+
+                       if(!chunk.isCommitted()) {
+                               assert(operations.isEmpty());
+                               operations.add(chunk);
+                               break;
+                       }
+
+               }
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized boolean isEmpty() {
+
+               mainProgram.assertMainProgramThread();
+
+               return operations.isEmpty();
+
+       }
+
+       /*
+        * Wake up when new operations are scheduled or the last operation is committed
+        * Called by MainProgram thread
+        */
+       synchronized void waitFor() {
+
+               mainProgram.assertMainProgramThread();
+
+               // One last check within the monitor 
+               if(!operations.isEmpty() || !tasks.isEmpty()) return;
+
+               try {
+                       wait(5000);
+               } catch (InterruptedException e) {
+                       LOGGER.error("Unexpected interruption", e);
+               }
+
+       }
+
+}
index 3977ad73d0d86bb1e2a11dba50899ea3955494a2..a5aa4f2ef6bcfc7b8d19c70d2a0d7effeca9c4ab 100644 (file)
@@ -39,11 +39,11 @@ public class AcornBackupProvider implements IBackupProvider {
     public AcornBackupProvider() {
         this.client = AcornSessionManagerImpl.getInstance().getClient();
     }
-    
-    private static Path getAcornMetadataFile(Path dbFolder) {
+
+    public static Path getAcornMetadataFile(Path dbFolder) {
         return dbFolder.getParent().resolve(IDENTIFIER);
     }
-    
+
     @Override
     public void lock() throws BackupException {
         try {
@@ -60,25 +60,9 @@ public class AcornBackupProvider implements IBackupProvider {
         boolean releaseLock = true;
         try {
             lock.acquire();
-
-            client.makeSnapshot(true);
-
-            Path dbDir = client.getDbFolder();
-            int newestFolder = client.clusters.mainState.headDir - 1;
-            int latestFolder = -2;
-            Path AcornMetadataFile = getAcornMetadataFile(dbDir);
-            if (Files.exists(AcornMetadataFile)) {
-                try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
-                    latestFolder = Integer.parseInt( br.readLine() );
-                }
-            }
-
-            AcornBackupRunnable r = new AcornBackupRunnable(
-                    lock, targetPath, revision, dbDir, latestFolder, newestFolder);
-            new Thread(r, "Acorn backup thread").start();
-
-             releaseLock = false;
-             return r;
+            Future<BackupException> r = client.getBackupRunnable(lock, targetPath, revision);
+            releaseLock = false;
+            return r;
         } catch (InterruptedException e) {
             releaseLock = false;
             throw new BackupException("Failed to lock Acorn for backup.", e);
@@ -127,7 +111,7 @@ public class AcornBackupProvider implements IBackupProvider {
             if (dbRoot != restorePath) {
                 FileUtils.deleteAll(dbRoot.toFile());
                 Files.move(restorePath, dbRoot);
-            } 
+            }
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -178,7 +162,7 @@ public class AcornBackupProvider implements IBackupProvider {
         }
     }
 
-    private static class AcornBackupRunnable implements Runnable, Future<BackupException> {
+    public static class AcornBackupRunnable implements Runnable, Future<BackupException> {
 
         private final Semaphore lock;
         private final Path targetPath;