From: Antti Villberg Date: Wed, 28 Feb 2018 07:56:08 +0000 (+0200) Subject: Fixed Acorn deadlock during snapshotting X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=11518dc555e151d6009620a7d236c7886934d977;p=simantics%2Fplatform.git Fixed Acorn deadlock during snapshotting Backported from original change, conflicts occurred in cherry-pick due to commit 1dfeb7d5. redmine #7713 gitlab #229 Change-Id: Ibb3e87bb6ab4d13bfb6445bf55e16ca8555bee29 (cherry picked from commit 751ee12501d220832b672dd433655a4d65806fd9) --- diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java index 20abf68db..8c7094ed8 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java @@ -11,17 +11,22 @@ *******************************************************************************/ package org.simantics.acorn; +import java.io.BufferedReader; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.MainProgram.MainProgramRunnable; +import org.simantics.acorn.backup.AcornBackupProvider; +import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterChange; @@ -31,6 +36,7 @@ import org.simantics.acorn.lru.ClusterChangeSet.Entry; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; +import org.simantics.backup.BackupException; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.ServiceLocator; @@ -40,6 +46,7 @@ import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.LifecycleSupport; +import org.simantics.utils.DataContainer; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.logging.TimeLogger; import org.slf4j.Logger; @@ -52,8 +59,8 @@ public class GraphClientImpl2 implements Database.Session { private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class); public static final boolean DEBUG = false; - public final ClusterManager clusters; - + final ClusterManager clusters; + private TransactionManager transactionManager = new TransactionManager(); private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false)); private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true)); @@ -64,16 +71,16 @@ public class GraphClientImpl2 implements Database.Session { private FileCache fileCache; private MainProgram mainProgram; - static class ClientThreadFactory implements ThreadFactory { - + private static class ClientThreadFactory implements ThreadFactory { + final String name; final boolean daemon; - + public ClientThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } - + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); @@ -91,7 +98,7 @@ public class GraphClientImpl2 implements Database.Session { locator.registerService(FileCache.class, fileCache); this.clusters = new ClusterManager(dbFolder, fileCache); load(); - ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); + ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.setReadDirectory(clusters.lastSessionDirectory); cssi.updateWriteDirectory(clusters.workingDirectory); mainProgram = new MainProgram(this, clusters); @@ -102,11 +109,15 @@ public class GraphClientImpl2 implements Database.Session { return dbFolder; } - public void tryMakeSnapshot() throws IOException { - - if (isClosing || unexpectedClose) - return; - + /* + * This method schedules snapshotting. + * No lock and thread restrictions. + */ + void tryMakeSnapshot() throws IOException { + + if (isClosing || unexpectedClose) + return; + saver.execute(new Runnable() { @Override @@ -116,35 +127,24 @@ public class GraphClientImpl2 implements Database.Session { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling - mainProgram.mutex.acquire(); - try { - synchronized(mainProgram) { - if(mainProgram.operations.isEmpty()) { - makeSnapshot(false); - } else { - // MainProgram is becoming busy again - delay snapshotting - return; - } - } - } finally { - mainProgram.mutex.release(); - } + synchronizeWithIdleMainProgram(() -> makeSnapshot(false)); } catch (IllegalAcornStateException | ProCoreException e) { LOGGER.error("Snapshotting failed", e); unexpectedClose = true; - } catch (InterruptedException e) { - LOGGER.error("Snapshotting interrupted", e); + } catch (SDBException e) { + LOGGER.error("Snapshotting failed", e); + unexpectedClose = true; } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { - LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); - try { - support.close(); - } catch (DatabaseException e1) { - LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); - } + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); + } } } catch (ProCoreException e) { LOGGER.error("Failed to end snapshotting write transaction", e); @@ -153,11 +153,12 @@ public class GraphClientImpl2 implements Database.Session { } }); } - - public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { - clusters.makeSnapshot(locator, fullSave); - } - + + private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { + clusters.makeSnapshot(locator, fullSave); + } + + @Override public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { try { return clusters.clone(uid, creator); @@ -167,17 +168,9 @@ public class GraphClientImpl2 implements Database.Session { } } -// private void save() throws IOException { -// clusters.save(); -// } - - public void load() throws IOException { + private void load() throws IOException { clusters.load(); } - -// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) { -// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support); -// } @Override public Database getDatabase() { @@ -187,24 +180,26 @@ public class GraphClientImpl2 implements Database.Session { private boolean closed = false; private boolean isClosing = false; private boolean unexpectedClose = false; - + @Override public void close() throws ProCoreException { - LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); + LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); if(!closed && !isClosing) { - isClosing = true; + isClosing = true; try { - if (!unexpectedClose) - makeSnapshot(true); + + if (!unexpectedClose) + synchronizeWithIdleMainProgram(() -> makeSnapshot(true)); mainProgram.close(); clusters.shutdown(); executor.shutdown(); saver.shutdown(); + boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS); boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS); - - System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); + + LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); try { clusters.mainState.save(dbFolder); @@ -215,9 +210,11 @@ public class GraphClientImpl2 implements Database.Session { mainProgram = null; executor = null; saver = null; - + } catch (IllegalAcornStateException | InterruptedException e) { throw new ProCoreException(e); + } catch (SDBException e1) { + throw new ProCoreException(e1); } closed = true; } @@ -233,19 +230,16 @@ public class GraphClientImpl2 implements Database.Session { public boolean isClosed() throws ProCoreException { return closed; } - + @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { clusters.state.headChangeSetId++; long committedChangeSetId = changeSetId + 1; try { - clusters.commitChangeSet(committedChangeSetId, metadata); - - clusters.state.transactionId = transactionId; - - mainProgram.committed(); - - TimeLogger.log("Accepted commit"); + clusters.commitChangeSet(committedChangeSetId, metadata); + clusters.state.transactionId = transactionId; + mainProgram.committed(); + TimeLogger.log("Accepted commit"); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } @@ -261,7 +255,7 @@ public class GraphClientImpl2 implements Database.Session { clusters.state.headChangeSetId++; return clusters.state.headChangeSetId; } catch (SDBException e) { - LOGGER.error("Failed to undo cancelled transaction", e); + LOGGER.error("Failed to undo cancelled transaction", e); throw new ProCoreException(e); } } @@ -271,11 +265,11 @@ public class GraphClientImpl2 implements Database.Session { return transactionManager.askReadTransaction(); } - enum TransactionState { + private enum TransactionState { IDLE,WRITE,READ } - - class TransactionRequest { + + private class TransactionRequest { public TransactionState state; public Semaphore semaphore; public TransactionRequest(TransactionState state, Semaphore semaphore) { @@ -284,51 +278,51 @@ public class GraphClientImpl2 implements Database.Session { } } - class TransactionManager { + private class TransactionManager { private TransactionState currentTransactionState = TransactionState.IDLE; - + private int reads = 0; - - LinkedList requests = new LinkedList(); - - TLongObjectHashMap requestMap = new TLongObjectHashMap(); - + + private LinkedList requests = new LinkedList<>(); + + private TLongObjectHashMap requestMap = new TLongObjectHashMap<>(); + private synchronized Transaction makeTransaction(TransactionRequest req) { - + final int csId = clusters.state.headChangeSetId; final long trId = clusters.state.transactionId+1; requestMap.put(trId, req); return new Transaction() { - + @Override public long getTransactionId() { return trId; } - + @Override public long getHeadChangeSetId() { return csId; } }; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askReadTransaction() throws ProCoreException { - + private Transaction askReadTransaction() throws ProCoreException { + Semaphore semaphore = new Semaphore(0); - + TransactionRequest req = queue(TransactionState.READ, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { throw new ProCoreException(e); } - + return makeTransaction(req); } @@ -338,61 +332,61 @@ public class GraphClientImpl2 implements Database.Session { if(r.state == TransactionState.READ) reads++; r.semaphore.release(); } - + private synchronized void processRequests() { - + while(true) { if(requests.isEmpty()) return; TransactionRequest req = requests.peek(); if(currentTransactionState == TransactionState.IDLE) { - + // Accept anything while IDLE currentTransactionState = req.state; dispatch(); - + } else if (currentTransactionState == TransactionState.READ) { - + if(req.state == currentTransactionState) { // Allow other reads dispatch(); } else { - + // Wait return; - + } - + } else if (currentTransactionState == TransactionState.WRITE) { // Wait return; - + } - + } - + } - + private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) { - TransactionRequest req = new TransactionRequest(state, semaphore); + TransactionRequest req = new TransactionRequest(state, semaphore); requests.addLast(req); processRequests(); return req; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askWriteTransaction() throws IllegalAcornStateException { - + private Transaction askWriteTransaction() throws IllegalAcornStateException { + Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.WRITE, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { @@ -401,9 +395,9 @@ public class GraphClientImpl2 implements Database.Session { mainProgram.startTransaction(clusters.state.headChangeSetId+1); return makeTransaction(req); } - - public synchronized long endTransaction(long transactionId) throws ProCoreException { - + + private synchronized long endTransaction(long transactionId) throws ProCoreException { + TransactionRequest req = requestMap.remove(transactionId); if(req.state == TransactionState.WRITE) { currentTransactionState = TransactionState.IDLE; @@ -419,7 +413,7 @@ public class GraphClientImpl2 implements Database.Session { } } - + @Override public Transaction askWriteTransaction(final long transactionId) throws ProCoreException { try { @@ -457,10 +451,10 @@ public class GraphClientImpl2 implements Database.Session { public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate onChangeSetupate) throws ProCoreException { - + new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();; return null; - + } @Override @@ -511,15 +505,15 @@ public class GraphClientImpl2 implements Database.Session { public long getFirstChangeSetId() { return 0; } - + }; } @Override public Refresh getRefresh(long changeSetId) throws ProCoreException { - + final ClusterIds ids = getClusterIds(); - + return new Refresh() { @Override @@ -536,15 +530,15 @@ public class GraphClientImpl2 implements Database.Session { public long[] getSecond() { return ids.getSecond(); } - + }; - - } - public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { - return clusters.getResourceFile(clusterUID, resourceIndex); } +// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { +// return clusters.getResourceFile(clusterUID, resourceIndex); +// } + @Override public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException { try { @@ -596,7 +590,7 @@ public class GraphClientImpl2 implements Database.Session { chunk.releaseMutex(); } } - + private void performUndo(String ccsId, ArrayList> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { UndoClusterUpdateProcessor proc = getUndoCSS(ccsId); @@ -607,7 +601,7 @@ public class GraphClientImpl2 implements Database.Session { ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID()); for(int i=0;i> clusterChanges = new ArrayList>(); - + UndoClusterSupport support = new UndoClusterSupport(clusters); - + final int changeSetId = clusters.state.headChangeSetId; - + if(ClusterUpdateProcessorBase.DEBUG) - System.err.println(" === BEGIN UNDO ==="); - + LOGGER.info(" === BEGIN UNDO ==="); + for(int i=0;i ccss = clusters.getChanges(id); - + for(int j=0;j pair = clusterChanges.get(i); - + final ClusterUID cuid = pair.first; final byte[] data = pair.second; - + onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { - + @Override public long getChangeSetId() { return changeSetId; } - + @Override public int getChangeSetIndex() { return 0; } - + @Override public int getNumberOfClusterChangeSets() { return clusterChanges.size(); } - + @Override public int getIndexOfClusterChangeSet() { return changeSetIndex; } - + @Override public byte[] getClusterId() { return cuid.asBytes(); } - + @Override public boolean getNewCluster() { return false; } - + @Override public byte[] getData() { return data; } - + }); } } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { @@ -710,21 +751,13 @@ public class GraphClientImpl2 implements Database.Session { } - @Override - public void done() { - - } - }); - if(exception instanceof SDBException) throw (SDBException)exception; - else if(exception != null) throw new IllegalAcornStateException(exception); - return false; - + } - - public ServiceLocator getServiceLocator() { + + ServiceLocator getServiceLocator() { return locator; } @@ -737,16 +770,16 @@ public class GraphClientImpl2 implements Database.Session { public boolean rolledback() { return clusters.rolledback(); } - - public void purge() throws IllegalAcornStateException { + + private void purge() throws IllegalAcornStateException { clusters.purge(locator); } public void purgeDatabase() { - + if (isClosing || unexpectedClose) return; - + saver.execute(new Runnable() { @Override @@ -756,35 +789,24 @@ public class GraphClientImpl2 implements Database.Session { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling - mainProgram.mutex.acquire(); - try { - synchronized(mainProgram) { - if(mainProgram.operations.isEmpty()) { - purge(); - } else { - // MainProgram is becoming busy again - delay snapshotting - return; - } - } - } finally { - mainProgram.mutex.release(); - } + synchronizeWithIdleMainProgram(() -> purge()); } catch (IllegalAcornStateException | ProCoreException e) { - LOGGER.error("Purge failed", e); + LOGGER.error("Purge failed", e); + unexpectedClose = true; + } catch (SDBException e) { + LOGGER.error("Purge failed", e); unexpectedClose = true; - } catch (InterruptedException e) { - LOGGER.error("Purge interrupted", e); } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { - LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); - try { - support.close(); - } catch (DatabaseException e1) { - LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); - } + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); + } } } catch (ProCoreException e) { LOGGER.error("Failed to end purge write transaction", e); @@ -792,12 +814,33 @@ public class GraphClientImpl2 implements Database.Session { } } }); - + } - + public long getTailChangeSetId() { return clusters.getTailChangeSetId(); } - + + public Future getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException { + + makeSnapshot(true); + + Path dbDir = getDbFolder(); + int newestFolder = clusters.mainState.headDir - 1; + int latestFolder = -2; + Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir); + if (Files.exists(AcornMetadataFile)) { + try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) { + latestFolder = Integer.parseInt( br.readLine() ); + } + } + + AcornBackupRunnable r = new AcornBackupRunnable( + lock, targetPath, revision, dbDir, latestFolder, newestFolder); + new Thread(r, "Acorn backup thread").start(); + return r; + + } + } diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java index b5ba2471b..8dea16d7f 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java @@ -1,9 +1,9 @@ package org.simantics.acorn; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -26,23 +26,22 @@ import org.slf4j.LoggerFactory; public class MainProgram implements Runnable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class); - + private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; private final GraphClientImpl2 client; - private final ClusterManager clusters; private final ExecutorService[] clusterUpdateThreads; private final List[] updateSchedules; - - private int residentOperationBytes = 0; - private long currentChangeSetId = -1; - private int nextChunkId = 0; + + private Thread mainProgramThread; + private boolean alive = true; private Semaphore deathBarrier = new Semaphore(0); - final Semaphore mutex = new Semaphore(1); - final LinkedList operations = new LinkedList<>(); + final ClusterManager clusters; + + private final OperationQueue operationQueue = new OperationQueue(this); static class ClusterThreadFactory implements ThreadFactory { @@ -62,7 +61,7 @@ public class MainProgram implements Runnable, Closeable { } } - public MainProgram(GraphClientImpl2 client, ClusterManager clusters) { + MainProgram(GraphClientImpl2 client, ClusterManager clusters) { this.client = client; this.clusters = clusters; @@ -74,9 +73,8 @@ public class MainProgram implements Runnable, Closeable { } } - public void startTransaction(long id) { - currentChangeSetId = id; - nextChunkId = 0; + void startTransaction(long id) { + operationQueue.startTransaction(id); } private static Comparator clusterComparator = new Comparator() { @@ -89,138 +87,37 @@ public class MainProgram implements Runnable, Closeable { @Override public void run() { + + mainProgramThread = Thread.currentThread(); + try { - mutex.acquire(); main: while(alive) { TreeMap> updates = new TreeMap>(clusterComparator); - synchronized(MainProgram.this) { - - while(!operations.isEmpty() && updates.size() < 100) { + operationQueue.pumpUpdates(updates); - ClusterStreamChunk chunk = operations.pollFirst(); + if(updates.isEmpty()) { - for(int i=chunk.nextToProcess;i ops = updates.get(uid); - if(ops == null) { - ops = new ArrayList(); - updates.put(uid, ops); - } - ops.add(o); - } + long start = System.nanoTime(); - chunk.nextToProcess = chunk.operations.size(); + operationQueue.waitFor(); - if(!chunk.isCommitted()) { - assert(operations.isEmpty()); - operations.add(chunk); - break; - } + if (!alive) + break main; - } - - if(updates.isEmpty()) { - try { - long start = System.nanoTime(); - mutex.release(); - MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed - mutex.acquire(); - if (!alive) - break main; - long duration = System.nanoTime()-start; - if(duration > 4000000000L) { - - // Was this a time-out or a new stream request? - if(operations.isEmpty()) { - - /* - * We are idling here. - * Flush all caches gradually - */ - - // Write pending cs to disk - boolean written = clusters.csLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.csLRU.swapForced(); - } - // Write pending chunks to disk - written = clusters.streamLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.streamLRU.swapForced(); - } - // Write pending files to disk - written = clusters.fileLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.fileLRU.swapForced(); - } - // Write pending clusters to disk - written = clusters.clusterLRU.swapForced(); - while(written) { - if(!updates.isEmpty()) break; - written = clusters.clusterLRU.swapForced(); - } - - client.tryMakeSnapshot(); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } + long duration = System.nanoTime()-start; + if(duration > 4000000000L) { + checkIdle(); } } // long sss = System.nanoTime(); - for(int i=0;i> entry : updates.entrySet()) { - ClusterUID key = entry.getKey(); - int hash = key.hashCode() & (clusterUpdateThreads.length-1); - updateSchedules[hash].addAll(entry.getValue()); - } - - // final AtomicLong elapsed = new AtomicLong(0); - int acquireAmount = 0; - for(int i=0;i ops = updateSchedules[i]; - if (!ops.isEmpty()) { - acquireAmount++; - clusterUpdateThreads[i].submit(new Callable() { - - @Override - public Object call() throws Exception { - //long st = System.nanoTime(); - try { - for(ClusterUpdateOperation op : ops) { - op.run(); - } - } finally { - s.release(); - } - return null; - - // long duration = System.nanoTime()-st; - // elapsed.addAndGet(duration); - // double dur = 1e-9*duration; - // if(dur > 0.05) - // System.err.println("duration=" + dur + "s. " + ops.size()); - } - }); - } - } - - s.acquire(acquireAmount); + runUpdates(updates); + runTasksIfEmpty(); /* * Here we are actively processing updates from client. @@ -254,48 +151,138 @@ public class MainProgram implements Runnable, Closeable { deathBarrier.release(); } } - + + @FunctionalInterface static interface MainProgramRunnable { - - public void run() throws Exception; - public void done(); - + void run() throws Exception; + default void error(Exception e) { + LOGGER.error("An error occured", e); + } + default void success() {} } - public Exception runIdle(MainProgramRunnable runnable) { - try { - long startTime = System.currentTimeMillis(); - while (true) { - boolean hasMutex = false; + private void runUpdates(TreeMap> updates) throws InterruptedException { + + for(int i=0;i> entry : updates.entrySet()) { + ClusterUID key = entry.getKey(); + int hash = key.hashCode() & (clusterUpdateThreads.length-1); + updateSchedules[hash].addAll(entry.getValue()); + } + + // final AtomicLong elapsed = new AtomicLong(0); + int acquireAmount = 0; + for(int i=0;i ops = updateSchedules[i]; + if (!ops.isEmpty()) { + acquireAmount++; + clusterUpdateThreads[i].submit(new Callable() { + + @Override + public Object call() throws Exception { + //long st = System.nanoTime(); + try { + for(ClusterUpdateOperation op : ops) { + op.run(); + } + } finally { + s.release(); + } + return null; + +// long duration = System.nanoTime()-st; +// elapsed.addAndGet(duration); +// double dur = 1e-9*duration; +// if(dur > 0.05) +// System.err.println("duration=" + dur + "s. " + ops.size()); + } + }); + } + } + + s.acquire(acquireAmount); + + } + + /* + * This shall be run when no updates are currently available. + */ + private void runTasksIfEmpty() { + if(operationQueue.isEmpty()) { + List todo = new ArrayList<>(); + operationQueue.pumpTasks(todo); + for(MainProgramRunnable runnable : todo) { try { - synchronized (MainProgram.this) { - if (hasMutex = mutex.tryAcquire()) { - if (operations.isEmpty()) { - runnable.run(); - return null; - } - } - } - long endTime = System.currentTimeMillis(); - if ((endTime - startTime) > 100) { - startTime = endTime; - LOGGER.info("MainProgram.runIdle() retry mutex acquire!"); - } + runnable.run(); + runnable.success(); } catch (Exception e) { - return e; - } finally { - if (hasMutex) - mutex.release(); + runnable.error(e); } } - } finally { - runnable.done(); } } - + + /* + * This gets called when an idle period has been detected + */ + private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException { + + // Was this a time-out or a new stream request? + if(operationQueue.isEmpty()) { + + /* + * We are idling here. + * Flush all caches gradually + */ + + // Write pending cs to disk + boolean written = clusters.csLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.csLRU.swapForced(); + } + // Write pending chunks to disk + written = clusters.streamLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.streamLRU.swapForced(); + } + // Write pending files to disk + written = clusters.fileLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.fileLRU.swapForced(); + } + // Write pending clusters to disk + written = clusters.clusterLRU.swapForced(); + while(written) { + if(!operationQueue.isEmpty()) break; + written = clusters.clusterLRU.swapForced(); + } + + client.tryMakeSnapshot(); + + } + + } + + + /* + * This schedules tasks to be run in MainProgram thread + * Called from other threads than MainProgram thread + * + */ + void runIdle(MainProgramRunnable task) { + operationQueue.scheduleTask(task); + } + /* * Mutex for streamLRU is assumed here - * + * */ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException { @@ -309,45 +296,34 @@ public class MainProgram implements Runnable, Closeable { while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE)); } - public synchronized void committed() { + /* + * Called by DB client write threads + */ + void committed() { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + ClusterStreamChunk last = operationQueue.commitLast(); if (!alive) { LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last); -// return; } - if(last != null) { - last.commit(); - notifyAll(); - } } - public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { - if (!alive) { + /* + * Called by DB client write threads + */ + void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException { + + if (!alive) { LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation); -// return; } + clusters.streamLRU.acquireMutex(); try { - ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); - if(last == null || last.isCommitted()) { - String id = "" + currentChangeSetId + "-" + nextChunkId++; - last = new ClusterStreamChunk(clusters, clusters.streamLRU, id); - operations.add(last); - } - - String chunkId = last.getKey(); - int chunkOffset = last.operations.size(); - operation.scheduled(chunkId + "." + chunkOffset); - - last.addOperation(operation); - + operationQueue.scheduleUpdate(operation); swapChunks(); - notifyAll(); } catch (IllegalAcornStateException e) { throw e; } catch (Throwable t) { @@ -355,14 +331,17 @@ public class MainProgram implements Runnable, Closeable { } finally { clusters.streamLRU.releaseMutex(); } + } @Override public void close() { + alive = false; - synchronized (this) { - notifyAll(); - } + + // This will wake up the sleeping beauty + operationQueue.scheduleTask(() -> {}); + try { deathBarrier.acquire(); } catch (InterruptedException e) { @@ -382,4 +361,12 @@ public class MainProgram implements Runnable, Closeable { } } + void assertMainProgramThread() { + assert(Thread.currentThread().equals(mainProgramThread)); + } + + void assertNoMainProgramThread() { + assert(!Thread.currentThread().equals(mainProgramThread)); + } + } diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java new file mode 100644 index 000000000..7a9d8aba2 --- /dev/null +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java @@ -0,0 +1,175 @@ +package org.simantics.acorn; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; + +import org.simantics.acorn.MainProgram.MainProgramRunnable; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; +import org.simantics.acorn.lru.ClusterStreamChunk; +import org.simantics.acorn.lru.ClusterUpdateOperation; +import org.simantics.db.service.ClusterUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Provides work for MainProgram thread + * -cluster stream updates from client + * -tasks from other threads + * + * Synchronized via monitor. + * + */ +class OperationQueue { + + private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class); + + private final MainProgram mainProgram; + + private final LinkedList operations = new LinkedList<>(); + + private final LinkedList tasks = new LinkedList<>(); + + private long currentChangeSetId = -1; + private int nextChunkId = 0; + + OperationQueue(MainProgram mainProgram) { + this.mainProgram = mainProgram; + } + + /* + * Called by other Acorn threads than MainProgram thread + */ + synchronized void scheduleTask(MainProgramRunnable task) { + + mainProgram.assertNoMainProgramThread(); + + tasks.add(task); + + // The only thread waiting for this is MainProgram thread + notify(); + + } + + /* + * Called by DB client write threads + */ + synchronized void startTransaction(long id) { + currentChangeSetId = id; + nextChunkId = 0; + } + + /* + * Called by DB client write threads + */ + synchronized ClusterStreamChunk commitLast() { + ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + if(last != null) { + last.commit(); + notifyAll(); + } + return last; + } + + /* + * Called by DB client write threads + */ + synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException { + + ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + if(last == null || last.isCommitted()) { + String id = "" + currentChangeSetId + "-" + nextChunkId++; + last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id); + operations.add(last); + } + + String chunkId = last.getKey(); + int chunkOffset = last.operations.size(); + operation.scheduled(chunkId + "." + chunkOffset); + + last.addOperation(operation); + + // The only thread waiting for this is MainProgram thread + notify(); + + } + + /* + * Called by MainProgram thread + */ + synchronized void pumpTasks(List todo) { + + mainProgram.assertMainProgramThread(); + + todo.addAll(tasks); + tasks.clear(); + + } + + /* + * Called by MainProgram thread + */ + synchronized void pumpUpdates(TreeMap> updates) { + + mainProgram.assertMainProgramThread(); + + while(!operations.isEmpty() && updates.size() < 100) { + + ClusterStreamChunk chunk = operations.pollFirst(); + + for(int i=chunk.nextToProcess;i ops = updates.get(uid); + if(ops == null) { + ops = new ArrayList(); + updates.put(uid, ops); + } + ops.add(o); + } + + chunk.nextToProcess = chunk.operations.size(); + + if(!chunk.isCommitted()) { + assert(operations.isEmpty()); + operations.add(chunk); + break; + } + + } + + } + + /* + * Called by MainProgram thread + */ + synchronized boolean isEmpty() { + + mainProgram.assertMainProgramThread(); + + return operations.isEmpty(); + + } + + /* + * Wake up when new operations are scheduled or the last operation is committed + * Called by MainProgram thread + */ + synchronized void waitFor() { + + mainProgram.assertMainProgramThread(); + + // One last check within the monitor + if(!operations.isEmpty() || !tasks.isEmpty()) return; + + try { + wait(5000); + } catch (InterruptedException e) { + LOGGER.error("Unexpected interruption", e); + } + + } + +} diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/backup/AcornBackupProvider.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/backup/AcornBackupProvider.java index 3977ad73d..a5aa4f2ef 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/backup/AcornBackupProvider.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/backup/AcornBackupProvider.java @@ -39,11 +39,11 @@ public class AcornBackupProvider implements IBackupProvider { public AcornBackupProvider() { this.client = AcornSessionManagerImpl.getInstance().getClient(); } - - private static Path getAcornMetadataFile(Path dbFolder) { + + public static Path getAcornMetadataFile(Path dbFolder) { return dbFolder.getParent().resolve(IDENTIFIER); } - + @Override public void lock() throws BackupException { try { @@ -60,25 +60,9 @@ public class AcornBackupProvider implements IBackupProvider { boolean releaseLock = true; try { lock.acquire(); - - client.makeSnapshot(true); - - Path dbDir = client.getDbFolder(); - int newestFolder = client.clusters.mainState.headDir - 1; - int latestFolder = -2; - Path AcornMetadataFile = getAcornMetadataFile(dbDir); - if (Files.exists(AcornMetadataFile)) { - try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) { - latestFolder = Integer.parseInt( br.readLine() ); - } - } - - AcornBackupRunnable r = new AcornBackupRunnable( - lock, targetPath, revision, dbDir, latestFolder, newestFolder); - new Thread(r, "Acorn backup thread").start(); - - releaseLock = false; - return r; + Future r = client.getBackupRunnable(lock, targetPath, revision); + releaseLock = false; + return r; } catch (InterruptedException e) { releaseLock = false; throw new BackupException("Failed to lock Acorn for backup.", e); @@ -127,7 +111,7 @@ public class AcornBackupProvider implements IBackupProvider { if (dbRoot != restorePath) { FileUtils.deleteAll(dbRoot.toFile()); Files.move(restorePath, dbRoot); - } + } } catch (IOException e) { e.printStackTrace(); } @@ -178,7 +162,7 @@ public class AcornBackupProvider implements IBackupProvider { } } - private static class AcornBackupRunnable implements Runnable, Future { + public static class AcornBackupRunnable implements Runnable, Future { private final Semaphore lock; private final Path targetPath;