/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.acorn; import java.io.BufferedReader; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.MainProgram.MainProgramRunnable; import org.simantics.acorn.backup.AcornBackupProvider; import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterChange; import org.simantics.acorn.internal.ClusterUpdateProcessorBase; import org.simantics.acorn.internal.UndoClusterUpdateProcessor; import org.simantics.acorn.lru.ClusterChangeSet.Entry; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.backup.BackupException; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.ServiceLocator; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.SDBException; import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.EventSupport; import org.simantics.db.service.LifecycleSupport; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.logging.TimeLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import fi.vtt.simantics.procore.internal.EventSupportImpl; import gnu.trove.map.hash.TLongObjectHashMap; public class GraphClientImpl2 implements Database.Session { private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class); public static final boolean DEBUG = false; public static final String CLOSE = "close"; public static final String PURGE = "purge"; final ClusterManager clusters; private TransactionManager transactionManager = new TransactionManager(); private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false)); private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true)); private Path dbFolder; private final Database database; private ServiceLocator locator; private FileCache fileCache; private MainProgram mainProgram; private EventSupportImpl eventSupport; private static class ClientThreadFactory implements ThreadFactory { final String name; final boolean daemon; public ClientThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); thread.setDaemon(daemon); return thread; } } public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException { this.database = database; this.dbFolder = dbFolder; this.locator = locator; this.fileCache = new FileCache(); // This disposes the cache when the session is shut down locator.registerService(FileCache.class, fileCache); this.clusters = new ClusterManager(dbFolder, fileCache); load(); ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.setReadDirectory(clusters.lastSessionDirectory); cssi.updateWriteDirectory(clusters.workingDirectory); mainProgram = new MainProgram(this, clusters); executor.execute(mainProgram); eventSupport = (EventSupportImpl)locator.getService(EventSupport.class); } public Path getDbFolder() { return dbFolder; } /* * This method schedules snapshotting. * No lock and thread restrictions. */ void tryMakeSnapshot() throws IOException { if (isClosing || unexpectedClose) return; saver.execute(new Runnable() { @Override public void run() { Transaction tr = null; try { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling synchronizeWithIdleMainProgram(() -> makeSnapshot(false)); } catch (IllegalAcornStateException | ProCoreException e) { LOGGER.error("Snapshotting failed", e); unexpectedClose = true; } catch (SDBException e) { LOGGER.error("Snapshotting failed", e); unexpectedClose = true; } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); try { support.close(); } catch (DatabaseException e1) { LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1); } } } catch (ProCoreException e) { LOGGER.error("Failed to end snapshotting write transaction", e); } } } }); } private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { clusters.makeSnapshot(locator, fullSave); } @Override public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { try { return clusters.clone(uid, creator); } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) { unexpectedClose = true; throw new DatabaseException(e); } } private void load() throws IOException { clusters.load(); } @Override public Database getDatabase() { return database; } private boolean closed = false; private boolean isClosing = false; private boolean unexpectedClose = false; @Override public void close() throws ProCoreException { LOGGER.info("Closing " + this + " and mainProgram " + mainProgram); if(!closed && !isClosing) { isClosing = true; try { if (!unexpectedClose) synchronizeWithIdleMainProgram(() -> makeSnapshot(true)); mainProgram.close(); clusters.shutdown(); executor.shutdown(); saver.shutdown(); boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS); boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS); LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); try { clusters.mainState.save(dbFolder); } catch (IOException e) { LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder); } mainProgram = null; executor = null; saver = null; } catch (IllegalAcornStateException | InterruptedException e) { throw new ProCoreException(e); } catch (SDBException e1) { throw new ProCoreException(e1); } closed = true; eventSupport.fireEvent(CLOSE, null); } //impl.close(); } @Override public void open() throws ProCoreException { throw new UnsupportedOperationException(); } @Override public boolean isClosed() throws ProCoreException { return closed; } @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { clusters.state.headChangeSetId++; long committedChangeSetId = changeSetId + 1; try { clusters.commitChangeSet(committedChangeSetId, metadata); clusters.state.transactionId = transactionId; mainProgram.committed(); TimeLogger.log("Accepted commit"); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { // Accept and finalize current transaction and then undo it acceptCommit(transactionId, changeSetId, metadata); try { undo(new long[] {changeSetId+1}, onChangeSetUpdate); clusters.state.headChangeSetId++; return clusters.state.headChangeSetId; } catch (SDBException e) { LOGGER.error("Failed to undo cancelled transaction", e); throw new ProCoreException(e); } } @Override public Transaction askReadTransaction() throws ProCoreException { return transactionManager.askReadTransaction(); } private enum TransactionState { IDLE,WRITE,READ } private class TransactionRequest { public TransactionState state; public Semaphore semaphore; public TransactionRequest(TransactionState state, Semaphore semaphore) { this.state = state; this.semaphore = semaphore; } } private class TransactionManager { private TransactionState currentTransactionState = TransactionState.IDLE; private int reads = 0; private LinkedList requests = new LinkedList<>(); private TLongObjectHashMap requestMap = new TLongObjectHashMap<>(); private synchronized Transaction makeTransaction(TransactionRequest req) { final int csId = clusters.state.headChangeSetId; final long trId = clusters.state.transactionId+1; requestMap.put(trId, req); return new Transaction() { @Override public long getTransactionId() { return trId; } @Override public long getHeadChangeSetId() { return csId; } }; } /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ private Transaction askReadTransaction() throws ProCoreException { Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.READ, semaphore); try { semaphore.acquire(); } catch (InterruptedException e) { throw new ProCoreException(e); } return makeTransaction(req); } private synchronized void dispatch() { TransactionRequest r = requests.removeFirst(); if(r.state == TransactionState.READ) reads++; r.semaphore.release(); } private synchronized void processRequests() { while(true) { if(requests.isEmpty()) return; TransactionRequest req = requests.peek(); if(currentTransactionState == TransactionState.IDLE) { // Accept anything while IDLE currentTransactionState = req.state; dispatch(); } else if (currentTransactionState == TransactionState.READ) { if(req.state == currentTransactionState) { // Allow other reads dispatch(); } else { // Wait return; } } else if (currentTransactionState == TransactionState.WRITE) { // Wait return; } } } private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) { TransactionRequest req = new TransactionRequest(state, semaphore); requests.addLast(req); processRequests(); return req; } /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ private Transaction askWriteTransaction() throws IllegalAcornStateException { Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.WRITE, semaphore); try { semaphore.acquire(); } catch (InterruptedException e) { throw new IllegalAcornStateException(e); } mainProgram.startTransaction(clusters.state.headChangeSetId+1); return makeTransaction(req); } private synchronized long endTransaction(long transactionId) throws ProCoreException { TransactionRequest req = requestMap.remove(transactionId); if(req.state == TransactionState.WRITE) { currentTransactionState = TransactionState.IDLE; processRequests(); } else { reads--; if(reads == 0) { currentTransactionState = TransactionState.IDLE; processRequests(); } } return clusters.state.transactionId; } } @Override public Transaction askWriteTransaction(final long transactionId) throws ProCoreException { try { if (isClosing || unexpectedClose || closed) { throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!"); } return transactionManager.askWriteTransaction(); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long endTransaction(long transactionId) throws ProCoreException { return transactionManager.endTransaction(transactionId); } @Override public String execute(String command) throws ProCoreException { // This is called only by WriteGraphImpl.commitAccessorChanges // We can ignore this in Acorn return ""; } @Override public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { try { return clusters.getMetadata(changeSetId); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate onChangeSetupate) throws ProCoreException { new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();; return null; } @Override public ChangeSetIds getChangeSetIds() throws ProCoreException { throw new UnsupportedOperationException(); } @Override public Cluster getCluster(byte[] clusterId) throws ProCoreException { throw new UnsupportedOperationException(); } @Override public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { throw new UnsupportedOperationException(); } @Override public ClusterIds getClusterIds() throws ProCoreException { try { return clusters.getClusterIds(); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public Information getInformation() throws ProCoreException { return new Information() { @Override public String getServerId() { return "server"; } @Override public String getProtocolId() { return ""; } @Override public String getDatabaseId() { return "database"; } @Override public long getFirstChangeSetId() { return 0; } }; } @Override public Refresh getRefresh(long changeSetId) throws ProCoreException { final ClusterIds ids = getClusterIds(); return new Refresh() { @Override public long getHeadChangeSetId() { return clusters.state.headChangeSetId; } @Override public long[] getFirst() { return ids.getFirst(); } @Override public long[] getSecond() { return ids.getSecond(); } }; } // public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { // return clusters.getResourceFile(clusterUID, resourceIndex); // } @Override public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException { try { return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long reserveIds(int count) throws ProCoreException { return clusters.state.reservedIds++; } @Override public void updateCluster(byte[] operations) throws ProCoreException { ClusterInfo info = null; try { ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations); info = clusters.clusterLRU.getOrCreate(operation.uid, true); if(info == null) throw new IllegalAcornStateException("info == null for operation " + operation); info.acquireMutex(); info.scheduleUpdate(); mainProgram.schedule(operation); } catch (IllegalAcornStateException | AcornAccessVerificationException e) { throw new ProCoreException(e); } finally { if (info != null) info.releaseMutex(); } } private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { String[] ss = ccsId.split("\\."); String chunkKey = ss[0]; int chunkOffset = Integer.parseInt(ss[1]); ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey); if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found."); chunk.acquireMutex(); try { return chunk.getUndoProcessor(clusters, chunkOffset, ccsId); } catch (DatabaseException e) { throw e; } catch (Throwable t) { throw new IllegalStateException(t); } finally { chunk.releaseMutex(); } } private void performUndo(String ccsId, ArrayList> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { UndoClusterUpdateProcessor proc = getUndoCSS(ccsId); int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID()); clusters.clusterLRU.acquireMutex(); try { ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID()); for(int i=0;i> clusterChanges = new ArrayList>(); UndoClusterSupport support = new UndoClusterSupport(clusters); final int changeSetId = clusters.state.headChangeSetId; if(ClusterUpdateProcessorBase.DEBUG) LOGGER.info(" === BEGIN UNDO ==="); for(int i=0;i ccss = clusters.getChanges(id); for(int j=0;j pair = clusterChanges.get(i); final ClusterUID cuid = pair.first; final byte[] data = pair.second; onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { @Override public long getChangeSetId() { return changeSetId; } @Override public int getChangeSetIndex() { return 0; } @Override public int getNumberOfClusterChangeSets() { return clusterChanges.size(); } @Override public int getIndexOfClusterChangeSet() { return changeSetIndex; } @Override public byte[] getClusterId() { return cuid.asBytes(); } @Override public boolean getNewCluster() { return false; } @Override public byte[] getData() { return data; } }); } } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { throw new ProCoreException(e1); } } }); return false; } ServiceLocator getServiceLocator() { return locator; } @Override public boolean refreshEnabled() { return false; } @Override public boolean rolledback() { return clusters.rolledback(); } private void purge() throws IllegalAcornStateException { clusters.purge(locator); } public void purgeDatabase() { if (isClosing || unexpectedClose) return; saver.execute(new Runnable() { @Override public void run() { Transaction tr = null; try { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling synchronizeWithIdleMainProgram(() -> purge()); } catch (IllegalAcornStateException | ProCoreException e) { LOGGER.error("Purge failed", e); unexpectedClose = true; } catch (SDBException e) { LOGGER.error("Purge failed", e); unexpectedClose = true; } finally { try { if(tr != null) { endTransaction(tr.getTransactionId()); eventSupport.fireEvent(PURGE, null); } if (unexpectedClose) { LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); try { support.close(); } catch (DatabaseException e1) { LOGGER.error("Failed to close database as a safety measure due to failed purge", e1); } } } catch (ProCoreException e) { LOGGER.error("Failed to end purge write transaction", e); } } } }); } public long getTailChangeSetId() { return clusters.getTailChangeSetId(); } public Future getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException { makeSnapshot(true); Path dbDir = getDbFolder(); int newestFolder = clusters.mainState.headDir - 1; int latestFolder = -2; Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir); if (Files.exists(AcornMetadataFile)) { try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) { latestFolder = Integer.parseInt( br.readLine() ); } } AcornBackupRunnable r = new AcornBackupRunnable( lock, targetPath, revision, dbDir, latestFolder, newestFolder); new Thread(r, "Acorn backup thread").start(); return r; } }