X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FGraphClientImpl2.java;h=172fdd103150f28b88fac4ce604a74723e49b504;hp=05f9c8de00b66c315f9a6c5317026f28c0c664cb;hb=0df63f38249cbbc584d3a130773680c309fb1f71;hpb=e67e00d01d30f362e7a4f599cbcf24770ce13e22 diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java index 05f9c8de0..172fdd103 100644 --- a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java @@ -11,46 +11,62 @@ *******************************************************************************/ package org.simantics.acorn; +import java.io.BufferedReader; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.simantics.acorn.MainProgram.MainProgramRunnable; +import org.simantics.acorn.backup.AcornBackupProvider; +import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterChange; import org.simantics.acorn.internal.ClusterUpdateProcessorBase; import org.simantics.acorn.internal.UndoClusterUpdateProcessor; +import org.simantics.acorn.lru.ClusterChangeSet.Entry; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; -import org.simantics.acorn.lru.ClusterChangeSet.Entry; +import org.simantics.backup.BackupException; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.ServiceLocator; -import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.SDBException; import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.LifecycleSupport; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.logging.TimeLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import fi.vtt.simantics.procore.internal.EventSupportImpl; import gnu.trove.map.hash.TLongObjectHashMap; public class GraphClientImpl2 implements Database.Session { - - public static final boolean DEBUG = false; - public final ClusterManager clusters; + private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class); + public static final boolean DEBUG = false; + public static final String CLOSE = "close"; + public static final String PURGE = "purge"; + + final ClusterManager clusters; + private TransactionManager transactionManager = new TransactionManager(); private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false)); private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true)); @@ -58,18 +74,20 @@ public class GraphClientImpl2 implements Database.Session { private Path dbFolder; private final Database database; private ServiceLocator locator; + private FileCache fileCache; private MainProgram mainProgram; + private EventSupportImpl eventSupport; + + private static class ClientThreadFactory implements ThreadFactory { - static class ClientThreadFactory implements ThreadFactory { - final String name; final boolean daemon; - + public ClientThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } - + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); @@ -82,23 +100,33 @@ public class GraphClientImpl2 implements Database.Session { this.database = database; this.dbFolder = dbFolder; this.locator = locator; - this.clusters = new ClusterManager(dbFolder); + this.fileCache = new FileCache(); + // This disposes the cache when the session is shut down + locator.registerService(FileCache.class, fileCache); + this.clusters = new ClusterManager(dbFolder, fileCache); load(); - ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); + ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.setReadDirectory(clusters.lastSessionDirectory); + cssi.updateWriteDirectory(clusters.workingDirectory); mainProgram = new MainProgram(this, clusters); executor.execute(mainProgram); + eventSupport = (EventSupportImpl)locator.getService(EventSupport.class); + } public Path getDbFolder() { return dbFolder; } - public void tryMakeSnapshot() throws IOException { - - if (isClosing || unexpectedClose) - return; - + /* + * This method schedules snapshotting. + * No lock and thread restrictions. + */ + void tryMakeSnapshot() throws IOException { + + if (isClosing || unexpectedClose) + return; + saver.execute(new Runnable() { @Override @@ -108,48 +136,38 @@ public class GraphClientImpl2 implements Database.Session { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling - mainProgram.mutex.acquire(); - try { - synchronized(mainProgram) { - if(mainProgram.operations.isEmpty()) { - makeSnapshot(false); - } else { - // MainProgram is becoming busy again - delay snapshotting - return; - } - } - } finally { - mainProgram.mutex.release(); - } + synchronizeWithIdleMainProgram(() -> makeSnapshot(false)); } catch (IllegalAcornStateException | ProCoreException e) { - Logger.defaultLogError(e); + LOGGER.error("Snapshotting failed", e); + unexpectedClose = true; + } catch (SDBException e) { + LOGGER.error("Snapshotting failed", e); unexpectedClose = true; - } catch (InterruptedException e) { - Logger.defaultLogError(e); } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { - LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); - try { - support.close(); - } catch (DatabaseException e1) { - Logger.defaultLogError(e1); - } + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); + } } } catch (ProCoreException e) { - Logger.defaultLogError(e); + LOGGER.error("Failed to end snapshotting write transaction", e); } } } }); } - - public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { - clusters.makeSnapshot(locator, fullSave); - } - + + private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { + clusters.makeSnapshot(locator, fullSave); + } + + @Override public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { try { return clusters.clone(uid, creator); @@ -159,17 +177,9 @@ public class GraphClientImpl2 implements Database.Session { } } -// private void save() throws IOException { -// clusters.save(); -// } - - public void load() throws IOException { + private void load() throws IOException { clusters.load(); } - -// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) { -// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support); -// } @Override public Database getDatabase() { @@ -179,33 +189,44 @@ public class GraphClientImpl2 implements Database.Session { private boolean closed = false; private boolean isClosing = false; private boolean unexpectedClose = false; - + @Override public void close() throws ProCoreException { - System.err.println("Closing " + this + " and mainProgram " + mainProgram); + LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); if(!closed && !isClosing) { - isClosing = true; + isClosing = true; try { - if (!unexpectedClose) - makeSnapshot(true); - + + if (!unexpectedClose) + synchronizeWithIdleMainProgram(() -> makeSnapshot(true)); + mainProgram.close(); clusters.shutdown(); executor.shutdown(); saver.shutdown(); + boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS); boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS); - - System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); - + + LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); + + try { + clusters.mainState.save(dbFolder); + } catch (IOException e) { + LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder); + } + mainProgram = null; executor = null; saver = null; - + } catch (IllegalAcornStateException | InterruptedException e) { throw new ProCoreException(e); + } catch (SDBException e1) { + throw new ProCoreException(e1); } closed = true; + eventSupport.fireEvent(CLOSE, null); } //impl.close(); } @@ -219,19 +240,16 @@ public class GraphClientImpl2 implements Database.Session { public boolean isClosed() throws ProCoreException { return closed; } - + @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { clusters.state.headChangeSetId++; long committedChangeSetId = changeSetId + 1; try { - clusters.commitChangeSet(committedChangeSetId, metadata); - - clusters.state.transactionId = transactionId; - - mainProgram.committed(); - - TimeLogger.log("Accepted commit"); + clusters.commitChangeSet(committedChangeSetId, metadata); + clusters.state.transactionId = transactionId; + mainProgram.committed(); + TimeLogger.log("Accepted commit"); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } @@ -239,18 +257,17 @@ public class GraphClientImpl2 implements Database.Session { @Override public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { - UnsupportedOperationException e = new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc"); - clusters.notSafeToMakeSnapshot(new IllegalAcornStateException(e)); - throw e; -// System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!"); -// try { -// undo(new long[] {changeSetId}, onChangeSetUpdate); -// } catch (SDBException e) { -// e.printStackTrace(); -// throw new ProCoreException(e); -// } -// clusters.state.headChangeSetId++; -// return clusters.state.headChangeSetId; + // Accept and finalize current transaction and then undo it + acceptCommit(transactionId, changeSetId, metadata); + + try { + undo(new long[] {changeSetId+1}, onChangeSetUpdate); + clusters.state.headChangeSetId++; + return clusters.state.headChangeSetId; + } catch (SDBException e) { + LOGGER.error("Failed to undo cancelled transaction", e); + throw new ProCoreException(e); + } } @Override @@ -258,11 +275,11 @@ public class GraphClientImpl2 implements Database.Session { return transactionManager.askReadTransaction(); } - enum TransactionState { + private enum TransactionState { IDLE,WRITE,READ } - - class TransactionRequest { + + private class TransactionRequest { public TransactionState state; public Semaphore semaphore; public TransactionRequest(TransactionState state, Semaphore semaphore) { @@ -271,51 +288,51 @@ public class GraphClientImpl2 implements Database.Session { } } - class TransactionManager { + private class TransactionManager { private TransactionState currentTransactionState = TransactionState.IDLE; - + private int reads = 0; - - LinkedList requests = new LinkedList(); - - TLongObjectHashMap requestMap = new TLongObjectHashMap(); - + + private LinkedList requests = new LinkedList<>(); + + private TLongObjectHashMap requestMap = new TLongObjectHashMap<>(); + private synchronized Transaction makeTransaction(TransactionRequest req) { - + final int csId = clusters.state.headChangeSetId; final long trId = clusters.state.transactionId+1; requestMap.put(trId, req); return new Transaction() { - + @Override public long getTransactionId() { return trId; } - + @Override public long getHeadChangeSetId() { return csId; } }; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askReadTransaction() throws ProCoreException { - + private Transaction askReadTransaction() throws ProCoreException { + Semaphore semaphore = new Semaphore(0); - + TransactionRequest req = queue(TransactionState.READ, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { throw new ProCoreException(e); } - + return makeTransaction(req); } @@ -325,61 +342,61 @@ public class GraphClientImpl2 implements Database.Session { if(r.state == TransactionState.READ) reads++; r.semaphore.release(); } - + private synchronized void processRequests() { - + while(true) { if(requests.isEmpty()) return; TransactionRequest req = requests.peek(); if(currentTransactionState == TransactionState.IDLE) { - + // Accept anything while IDLE currentTransactionState = req.state; dispatch(); - + } else if (currentTransactionState == TransactionState.READ) { - + if(req.state == currentTransactionState) { // Allow other reads dispatch(); } else { - + // Wait return; - + } - + } else if (currentTransactionState == TransactionState.WRITE) { // Wait return; - + } - + } - + } - + private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) { - TransactionRequest req = new TransactionRequest(state, semaphore); + TransactionRequest req = new TransactionRequest(state, semaphore); requests.addLast(req); processRequests(); return req; } - + /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ - public Transaction askWriteTransaction() throws IllegalAcornStateException { - + private Transaction askWriteTransaction() throws IllegalAcornStateException { + Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.WRITE, semaphore); - + try { semaphore.acquire(); } catch (InterruptedException e) { @@ -388,9 +405,9 @@ public class GraphClientImpl2 implements Database.Session { mainProgram.startTransaction(clusters.state.headChangeSetId+1); return makeTransaction(req); } - - public synchronized long endTransaction(long transactionId) throws ProCoreException { - + + private synchronized long endTransaction(long transactionId) throws ProCoreException { + TransactionRequest req = requestMap.remove(transactionId); if(req.state == TransactionState.WRITE) { currentTransactionState = TransactionState.IDLE; @@ -406,7 +423,7 @@ public class GraphClientImpl2 implements Database.Session { } } - + @Override public Transaction askWriteTransaction(final long transactionId) throws ProCoreException { try { @@ -444,10 +461,10 @@ public class GraphClientImpl2 implements Database.Session { public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate onChangeSetupate) throws ProCoreException { - + new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();; return null; - + } @Override @@ -498,15 +515,15 @@ public class GraphClientImpl2 implements Database.Session { public long getFirstChangeSetId() { return 0; } - + }; } @Override public Refresh getRefresh(long changeSetId) throws ProCoreException { - + final ClusterIds ids = getClusterIds(); - + return new Refresh() { @Override @@ -523,15 +540,15 @@ public class GraphClientImpl2 implements Database.Session { public long[] getSecond() { return ids.getSecond(); } - + }; - - } - public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { - return clusters.getResourceFile(clusterUID, resourceIndex); } +// public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { +// return clusters.getResourceFile(clusterUID, resourceIndex); +// } + @Override public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException { try { @@ -583,7 +600,7 @@ public class GraphClientImpl2 implements Database.Session { chunk.releaseMutex(); } } - + private void performUndo(String ccsId, ArrayList> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { UndoClusterUpdateProcessor proc = getUndoCSS(ccsId); @@ -594,7 +611,7 @@ public class GraphClientImpl2 implements Database.Session { ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID()); for(int i=0;i> clusterChanges = new ArrayList>(); - - UndoClusterSupport support = new UndoClusterSupport(clusters); - - final int changeSetId = clusters.state.headChangeSetId; - - if(ClusterUpdateProcessorBase.DEBUG) - System.err.println(" === BEGIN UNDO ==="); - - for(int i=0;i ccss = clusters.getChanges(id); - - for(int j=0;j pair = clusterChanges.get(i); - - final ClusterUID cuid = pair.first; - final byte[] data = pair.second; - - onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { - - @Override - public long getChangeSetId() { - return changeSetId; - } - - @Override - public int getChangeSetIndex() { - return 0; - } - - @Override - public int getNumberOfClusterChangeSets() { - return clusterChanges.size(); - } - - @Override - public int getIndexOfClusterChangeSet() { - return changeSetIndex; - } - - @Override - public byte[] getClusterId() { - return cuid.asBytes(); - } - - @Override - public boolean getNewCluster() { - return false; - } - - @Override - public byte[] getData() { - return data; - } - - }); - } - } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { - throw new ProCoreException(e1); - } + + synchronizeWithIdleMainProgram(new MainProgramRunnable() { + + @Override + public void run() throws Exception { + + try { + + final ArrayList> clusterChanges = new ArrayList>(); + + UndoClusterSupport support = new UndoClusterSupport(clusters); + + final int changeSetId = clusters.state.headChangeSetId; + + if(ClusterUpdateProcessorBase.DEBUG) + LOGGER.info(" === BEGIN UNDO ==="); + + for(int i=0;i ccss = clusters.getChanges(id); + + for(int j=0;j pair = clusterChanges.get(i); + + final ClusterUID cuid = pair.first; + final byte[] data = pair.second; + + onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { + + @Override + public long getChangeSetId() { + return changeSetId; + } + + @Override + public int getChangeSetIndex() { + return 0; + } + + @Override + public int getNumberOfClusterChangeSets() { + return clusterChanges.size(); + } + + @Override + public int getIndexOfClusterChangeSet() { + return changeSetIndex; + } + + @Override + public byte[] getClusterId() { + return cuid.asBytes(); + } + + @Override + public boolean getNewCluster() { + return false; + } + + @Override + public byte[] getData() { + return data; + } + + }); + } + } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { + throw new ProCoreException(e1); + } + + } + + }); + return false; + } - - public ServiceLocator getServiceLocator() { + + ServiceLocator getServiceLocator() { return locator; } @@ -703,28 +781,78 @@ public class GraphClientImpl2 implements Database.Session { return clusters.rolledback(); } - - - - - - - - - - - //////////////////////// - - - - - - - - - - - - + private void purge() throws IllegalAcornStateException { + clusters.purge(locator); + } + + public void purgeDatabase() { + + if (isClosing || unexpectedClose) + return; + + saver.execute(new Runnable() { + + @Override + public void run() { + Transaction tr = null; + try { + // First take a write transaction + tr = askWriteTransaction(-1); + // Then make sure that MainProgram is idling + synchronizeWithIdleMainProgram(() -> purge()); + } catch (IllegalAcornStateException | ProCoreException e) { + LOGGER.error("Purge failed", e); + unexpectedClose = true; + } catch (SDBException e) { + LOGGER.error("Purge failed", e); + unexpectedClose = true; + } finally { + try { + if(tr != null) { + endTransaction(tr.getTransactionId()); + eventSupport.fireEvent(PURGE, null); + } + if (unexpectedClose) { + LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); + try { + support.close(); + } catch (DatabaseException e1) { + LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); + } + } + } catch (ProCoreException e) { + LOGGER.error("Failed to end purge write transaction", e); + } + } + } + }); + + } + + public long getTailChangeSetId() { + return clusters.getTailChangeSetId(); + } + + public Future getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException { + + makeSnapshot(true); + + Path dbDir = getDbFolder(); + int newestFolder = clusters.mainState.headDir - 1; + int latestFolder = -2; + Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir); + if (Files.exists(AcornMetadataFile)) { + try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) { + latestFolder = Integer.parseInt( br.readLine() ); + } + } + + AcornBackupRunnable r = new AcornBackupRunnable( + lock, targetPath, revision, dbDir, latestFolder, newestFolder); + new Thread(r, "Acorn backup thread").start(); + return r; + + } + }