X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FGraphClientImpl2.java;h=7a57053bcf6aa9f1af09548424bfe00c59456751;hp=dce477ebeae0fdf35d1db078acfd7e6e885ea825;hb=751ee12501d220832b672dd433655a4d65806fd9;hpb=c26409b1caf2f1e560d37c5befd11b442399c3fe diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java index dce477ebe..7a57053bc 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java @@ -11,17 +11,22 @@ *******************************************************************************/ package org.simantics.acorn; +import java.io.BufferedReader; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.MainProgram.MainProgramRunnable; +import org.simantics.acorn.backup.AcornBackupProvider; +import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterChange; @@ -31,6 +36,7 @@ import org.simantics.acorn.lru.ClusterChangeSet.Entry; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; +import org.simantics.backup.BackupException; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.ServiceLocator; @@ -40,6 +46,7 @@ import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.LifecycleSupport; +import org.simantics.utils.DataContainer; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.logging.TimeLogger; import org.slf4j.Logger; @@ -52,8 +59,8 @@ public class GraphClientImpl2 implements Database.Session { private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class); public static final boolean DEBUG = false; - public final ClusterManager clusters; - + final ClusterManager clusters; + private TransactionManager transactionManager = new TransactionManager(); private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false)); private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true)); @@ -63,16 +70,16 @@ public class GraphClientImpl2 implements Database.Session { private ServiceLocator locator; private MainProgram mainProgram; - static class ClientThreadFactory implements ThreadFactory { - + private static class ClientThreadFactory implements ThreadFactory { + final String name; final boolean daemon; - + public ClientThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } - + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); @@ -87,7 +94,7 @@ public class GraphClientImpl2 implements Database.Session { this.locator = locator; this.clusters = new ClusterManager(dbFolder); load(); - ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); + ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.setReadDirectory(clusters.lastSessionDirectory); cssi.updateWriteDirectory(clusters.workingDirectory); mainProgram = new MainProgram(this, clusters); @@ -98,11 +105,15 @@ public class GraphClientImpl2 implements Database.Session { return dbFolder; } - public void tryMakeSnapshot() throws IOException { - - if (isClosing || unexpectedClose) - return; - + /* + * This method schedules snapshotting. + * No lock and thread restrictions. + */ + void tryMakeSnapshot() throws IOException { + + if (isClosing || unexpectedClose) + return; + saver.execute(new Runnable() { @Override @@ -112,35 +123,24 @@ public class GraphClientImpl2 implements Database.Session { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling - mainProgram.mutex.acquire(); - try { - synchronized(mainProgram) { - if(mainProgram.operations.isEmpty()) { - makeSnapshot(false); - } else { - // MainProgram is becoming busy again - delay snapshotting - return; - } - } - } finally { - mainProgram.mutex.release(); - } + synchronizeWithIdleMainProgram(() -> makeSnapshot(false)); } catch (IllegalAcornStateException | ProCoreException e) { LOGGER.error("Snapshotting failed", e); unexpectedClose = true; - } catch (InterruptedException e) { - LOGGER.error("Snapshotting interrupted", e); + } catch (SDBException e) { + LOGGER.error("Snapshotting failed", e); + unexpectedClose = true; } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { - LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); - try { - support.close(); - } catch (DatabaseException e1) { - LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); - } + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); + } } } catch (ProCoreException e) { LOGGER.error("Failed to end snapshotting write transaction", e); @@ -149,11 +149,12 @@ public class GraphClientImpl2 implements Database.Session { } }); } - - public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { - clusters.makeSnapshot(locator, fullSave); - } - + + private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { + clusters.makeSnapshot(locator, fullSave); + } + + @Override public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { try { return clusters.clone(uid, creator); @@ -163,17 +164,9 @@ public class GraphClientImpl2 implements Database.Session { } } -// private void save() throws IOException { -// clusters.save(); -// } - - public void load() throws IOException { + private void load() throws IOException { clusters.load(); } - -// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) { -// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support); -// } @Override public Database getDatabase() { @@ -183,31 +176,41 @@ public class GraphClientImpl2 implements Database.Session { private boolean closed = false; private boolean isClosing = false; private boolean unexpectedClose = false; - + @Override public void close() throws ProCoreException { - LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); + LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); if(!closed && !isClosing) { - isClosing = true; + isClosing = true; try { - if (!unexpectedClose) - makeSnapshot(true); - + + if (!unexpectedClose) + synchronizeWithIdleMainProgram(() -> makeSnapshot(true)); + mainProgram.close(); clusters.shutdown(); executor.shutdown(); saver.shutdown(); + boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS); boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS); - - System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); - + + LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); + + try { + clusters.mainState.save(dbFolder); + } catch (IOException e) { + LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder); + } + mainProgram = null; executor = null; saver = null; - + } catch (IllegalAcornStateException | InterruptedException e) { throw new ProCoreException(e); + } catch (SDBException e1) { + throw new ProCoreException(e1); } closed = true; } @@ -223,19 +226,16 @@ public class GraphClientImpl2 implements Database.Session { public boolean isClosed() throws ProCoreException { return closed; } - + @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { clusters.state.headChangeSetId++; long committedChangeSetId = changeSetId + 1; try { - clusters.commitChangeSet(committedChangeSetId, metadata); - - clusters.state.transactionId = transactionId; - - mainProgram.committed(); - - TimeLogger.log("Accepted commit"); + clusters.commitChangeSet(committedChangeSetId, metadata); + clusters.state.transactionId = transactionId; + mainProgram.committed(); + TimeLogger.log("Accepted commit"); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } @@ -251,7 +251,7 @@ public class GraphClientImpl2 implements Database.Session { clusters.state.headChangeSetId++; return clusters.state.headChangeSetId; } catch (SDBException e) { - LOGGER.error("Failed to undo cancelled transaction", e); + LOGGER.error("Failed to undo cancelled transaction", e); throw new ProCoreException(e); } } @@ -261,11 +261,11 @@ public class GraphClientImpl2 implements Database.Session { return transactionManager.askReadTransaction(); } - enum TransactionState { + private enum TransactionState { IDLE,WRITE,READ } - - class TransactionRequest { + + private class TransactionRequest { public TransactionState state; public Semaphore semaphore; public TransactionRequest(TransactionState state, Semaphore semaphore) { @@ -274,51 +274,51 @@ public class GraphClientImpl2 implements Database.Session { } } - class TransactionManager { + private class TransactionManager { private TransactionState currentTransactionState = TransactionState.IDLE; - + private int reads = 0; - - LinkedList requests = new LinkedList(); - - TLongObjectHashMap requestMap = new TLongObjectHashMap(); - + + private LinkedList requests = new LinkedList<>(); + + private TLongObjectHashMap requestMap = new TLongObjectHashMap<>(); + private synchronized Transaction makeTransaction(TransactionRequest req) { - + final int csId = clusters.state.headChangeSetId; final long trId = clusters.state.transactionId+1; requestMap.put(trId, req); return new Transaction() { - + @Override public long getTransactionId() { return trId; } - + @Override public long getHeadChangeSetId() { return csId; } }; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askReadTransaction() throws ProCoreException { - + private Transaction askReadTransaction() throws ProCoreException { + Semaphore semaphore = new Semaphore(0); - + TransactionRequest req = queue(TransactionState.READ, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { throw new ProCoreException(e); } - + return makeTransaction(req); } @@ -328,61 +328,61 @@ public class GraphClientImpl2 implements Database.Session { if(r.state == TransactionState.READ) reads++; r.semaphore.release(); } - + private synchronized void processRequests() { - + while(true) { if(requests.isEmpty()) return; TransactionRequest req = requests.peek(); if(currentTransactionState == TransactionState.IDLE) { - + // Accept anything while IDLE currentTransactionState = req.state; dispatch(); - + } else if (currentTransactionState == TransactionState.READ) { - + if(req.state == currentTransactionState) { // Allow other reads dispatch(); } else { - + // Wait return; - + } - + } else if (currentTransactionState == TransactionState.WRITE) { // Wait return; - + } - + } - + } - + private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) { - TransactionRequest req = new TransactionRequest(state, semaphore); + TransactionRequest req = new TransactionRequest(state, semaphore); requests.addLast(req); processRequests(); return req; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askWriteTransaction() throws IllegalAcornStateException { - + private Transaction askWriteTransaction() throws IllegalAcornStateException { + Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.WRITE, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { @@ -391,9 +391,9 @@ public class GraphClientImpl2 implements Database.Session { mainProgram.startTransaction(clusters.state.headChangeSetId+1); return makeTransaction(req); } - - public synchronized long endTransaction(long transactionId) throws ProCoreException { - + + private synchronized long endTransaction(long transactionId) throws ProCoreException { + TransactionRequest req = requestMap.remove(transactionId); if(req.state == TransactionState.WRITE) { currentTransactionState = TransactionState.IDLE; @@ -409,7 +409,7 @@ public class GraphClientImpl2 implements Database.Session { } } - + @Override public Transaction askWriteTransaction(final long transactionId) throws ProCoreException { try { @@ -447,10 +447,10 @@ public class GraphClientImpl2 implements Database.Session { public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate onChangeSetupate) throws ProCoreException { - + new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();; return null; - + } @Override @@ -501,15 +501,15 @@ public class GraphClientImpl2 implements Database.Session { public long getFirstChangeSetId() { return 0; } - + }; } @Override public Refresh getRefresh(long changeSetId) throws ProCoreException { - + final ClusterIds ids = getClusterIds(); - + return new Refresh() { @Override @@ -526,15 +526,15 @@ public class GraphClientImpl2 implements Database.Session { public long[] getSecond() { return ids.getSecond(); } - + }; - - } - public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { - return clusters.getResourceFile(clusterUID, resourceIndex); } +// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { +// return clusters.getResourceFile(clusterUID, resourceIndex); +// } + @Override public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException { try { @@ -586,7 +586,7 @@ public class GraphClientImpl2 implements Database.Session { chunk.releaseMutex(); } } - + private void performUndo(String ccsId, ArrayList> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { UndoClusterUpdateProcessor proc = getUndoCSS(ccsId); @@ -597,7 +597,7 @@ public class GraphClientImpl2 implements Database.Session { ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID()); for(int i=0;i> clusterChanges = new ArrayList>(); - + UndoClusterSupport support = new UndoClusterSupport(clusters); - + final int changeSetId = clusters.state.headChangeSetId; - + if(ClusterUpdateProcessorBase.DEBUG) - System.err.println(" === BEGIN UNDO ==="); - + LOGGER.info(" === BEGIN UNDO ==="); + for(int i=0;i ccss = clusters.getChanges(id); - + for(int j=0;j pair = clusterChanges.get(i); - + final ClusterUID cuid = pair.first; final byte[] data = pair.second; - + onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { - + @Override public long getChangeSetId() { return changeSetId; } - + @Override public int getChangeSetIndex() { return 0; } - + @Override public int getNumberOfClusterChangeSets() { return clusterChanges.size(); } - + @Override public int getIndexOfClusterChangeSet() { return changeSetIndex; } - + @Override public byte[] getClusterId() { return cuid.asBytes(); } - + @Override public boolean getNewCluster() { return false; } - + @Override public byte[] getData() { return data; } - + }); } } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { @@ -700,21 +747,13 @@ public class GraphClientImpl2 implements Database.Session { } - @Override - public void done() { - - } - }); - if(exception instanceof SDBException) throw (SDBException)exception; - else if(exception != null) throw new IllegalAcornStateException(exception); - return false; - + } - - public ServiceLocator getServiceLocator() { + + ServiceLocator getServiceLocator() { return locator; } @@ -727,16 +766,16 @@ public class GraphClientImpl2 implements Database.Session { public boolean rolledback() { return clusters.rolledback(); } - - public void purge() throws IllegalAcornStateException { + + private void purge() throws IllegalAcornStateException { clusters.purge(locator); } public void purgeDatabase() { - + if (isClosing || unexpectedClose) return; - + saver.execute(new Runnable() { @Override @@ -746,35 +785,24 @@ public class GraphClientImpl2 implements Database.Session { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling - mainProgram.mutex.acquire(); - try { - synchronized(mainProgram) { - if(mainProgram.operations.isEmpty()) { - purge(); - } else { - // MainProgram is becoming busy again - delay snapshotting - return; - } - } - } finally { - mainProgram.mutex.release(); - } + synchronizeWithIdleMainProgram(() -> purge()); } catch (IllegalAcornStateException | ProCoreException e) { - LOGGER.error("Purge failed", e); + LOGGER.error("Purge failed", e); + unexpectedClose = true; + } catch (SDBException e) { + LOGGER.error("Purge failed", e); unexpectedClose = true; - } catch (InterruptedException e) { - LOGGER.error("Purge interrupted", e); } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { - LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); - try { - support.close(); - } catch (DatabaseException e1) { - LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); - } + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); + } } } catch (ProCoreException e) { LOGGER.error("Failed to end purge write transaction", e); @@ -782,12 +810,33 @@ public class GraphClientImpl2 implements Database.Session { } } }); - + } - + public long getTailChangeSetId() { return clusters.getTailChangeSetId(); } - + + public Future getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException { + + makeSnapshot(true); + + Path dbDir = getDbFolder(); + int newestFolder = clusters.mainState.headDir - 1; + int latestFolder = -2; + Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir); + if (Files.exists(AcornMetadataFile)) { + try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) { + latestFolder = Integer.parseInt( br.readLine() ); + } + } + + AcornBackupRunnable r = new AcornBackupRunnable( + lock, targetPath, revision, dbDir, latestFolder, newestFolder); + new Thread(r, "Acorn backup thread").start(); + return r; + + } + }