/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.acorn; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterChange; import org.simantics.acorn.internal.ClusterUpdateProcessorBase; import org.simantics.acorn.internal.UndoClusterUpdateProcessor; import org.simantics.acorn.lru.ClusterInfo; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.acorn.lru.ClusterChangeSet.Entry; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.ServiceLocator; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.SDBException; import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.LifecycleSupport; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.logging.TimeLogger; import gnu.trove.map.hash.TLongObjectHashMap; public class GraphClientImpl2 implements Database.Session { public static final boolean DEBUG = false; public final ClusterManager clusters; private TransactionManager transactionManager = new TransactionManager(); private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false)); private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true)); private Path dbFolder; private final Database database; private ServiceLocator locator; private MainProgram mainProgram; static class ClientThreadFactory implements ThreadFactory { final String name; final boolean daemon; public ClientThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); thread.setDaemon(daemon); return thread; } } public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException { this.database = database; this.dbFolder = dbFolder; this.locator = locator; this.clusters = new ClusterManager(dbFolder); load(); ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); cssi.setReadDirectory(clusters.lastSessionDirectory); mainProgram = new MainProgram(this, clusters); executor.execute(mainProgram); } public Path getDbFolder() { return dbFolder; } public void tryMakeSnapshot() throws IOException { if (isClosing || unexpectedClose) return; saver.execute(new Runnable() { @Override public void run() { Transaction tr = null; try { // First take a write transaction tr = askWriteTransaction(-1); // Then make sure that MainProgram is idling mainProgram.mutex.acquire(); try { synchronized(mainProgram) { if(mainProgram.operations.isEmpty()) { makeSnapshot(false); } else { // MainProgram is becoming busy again - delay snapshotting return; } } } finally { mainProgram.mutex.release(); } } catch (IllegalAcornStateException | ProCoreException e) { Logger.defaultLogError(e); unexpectedClose = true; } catch (InterruptedException e) { Logger.defaultLogError(e); } finally { try { if(tr != null) endTransaction(tr.getTransactionId()); if (unexpectedClose) { LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class); try { support.close(); } catch (DatabaseException e1) { Logger.defaultLogError(e1); } } } catch (ProCoreException e) { Logger.defaultLogError(e); } } } }); } public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException { clusters.makeSnapshot(locator, fullSave); } public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { try { return clusters.clone(uid, creator); } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) { unexpectedClose = true; throw new DatabaseException(e); } } // private void save() throws IOException { // clusters.save(); // } public void load() throws IOException { clusters.load(); } // public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) { // clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support); // } @Override public Database getDatabase() { return database; } private boolean closed = false; private boolean isClosing = false; private boolean unexpectedClose = false; @Override public void close() throws ProCoreException { System.err.println("Closing " + this + " and mainProgram " + mainProgram); if(!closed && !isClosing) { isClosing = true; try { if (!unexpectedClose) makeSnapshot(true); mainProgram.close(); clusters.shutdown(); executor.shutdown(); saver.shutdown(); boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS); boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS); System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated); mainProgram = null; executor = null; saver = null; } catch (IllegalAcornStateException | InterruptedException e) { throw new ProCoreException(e); } closed = true; } //impl.close(); } @Override public void open() throws ProCoreException { throw new UnsupportedOperationException(); } @Override public boolean isClosed() throws ProCoreException { return closed; } @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { clusters.state.headChangeSetId++; long committedChangeSetId = changeSetId + 1; try { clusters.commitChangeSet(committedChangeSetId, metadata); clusters.state.transactionId = transactionId; mainProgram.committed(); TimeLogger.log("Accepted commit"); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { UnsupportedOperationException e = new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc"); clusters.notSafeToMakeSnapshot(new IllegalAcornStateException(e)); throw e; // System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!"); // try { // undo(new long[] {changeSetId}, onChangeSetUpdate); // } catch (SDBException e) { // e.printStackTrace(); // throw new ProCoreException(e); // } // clusters.state.headChangeSetId++; // return clusters.state.headChangeSetId; } @Override public Transaction askReadTransaction() throws ProCoreException { return transactionManager.askReadTransaction(); } enum TransactionState { IDLE,WRITE,READ } class TransactionRequest { public TransactionState state; public Semaphore semaphore; public TransactionRequest(TransactionState state, Semaphore semaphore) { this.state = state; this.semaphore = semaphore; } } class TransactionManager { private TransactionState currentTransactionState = TransactionState.IDLE; private int reads = 0; LinkedList requests = new LinkedList(); TLongObjectHashMap requestMap = new TLongObjectHashMap(); private synchronized Transaction makeTransaction(TransactionRequest req) { final int csId = clusters.state.headChangeSetId; final long trId = clusters.state.transactionId+1; requestMap.put(trId, req); return new Transaction() { @Override public long getTransactionId() { return trId; } @Override public long getHeadChangeSetId() { return csId; } }; } /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ public Transaction askReadTransaction() throws ProCoreException { Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.READ, semaphore); try { semaphore.acquire(); } catch (InterruptedException e) { throw new ProCoreException(e); } return makeTransaction(req); } private synchronized void dispatch() { TransactionRequest r = requests.removeFirst(); if(r.state == TransactionState.READ) reads++; r.semaphore.release(); } private synchronized void processRequests() { while(true) { if(requests.isEmpty()) return; TransactionRequest req = requests.peek(); if(currentTransactionState == TransactionState.IDLE) { // Accept anything while IDLE currentTransactionState = req.state; dispatch(); } else if (currentTransactionState == TransactionState.READ) { if(req.state == currentTransactionState) { // Allow other reads dispatch(); } else { // Wait return; } } else if (currentTransactionState == TransactionState.WRITE) { // Wait return; } } } private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) { TransactionRequest req = new TransactionRequest(state, semaphore); requests.addLast(req); processRequests(); return req; } /* * This method cannot be synchronized since it waits and must support multiple entries * by query thread(s) and internal transactions such as snapshot saver */ public Transaction askWriteTransaction() throws IllegalAcornStateException { Semaphore semaphore = new Semaphore(0); TransactionRequest req = queue(TransactionState.WRITE, semaphore); try { semaphore.acquire(); } catch (InterruptedException e) { throw new IllegalAcornStateException(e); } mainProgram.startTransaction(clusters.state.headChangeSetId+1); return makeTransaction(req); } public synchronized long endTransaction(long transactionId) throws ProCoreException { TransactionRequest req = requestMap.remove(transactionId); if(req.state == TransactionState.WRITE) { currentTransactionState = TransactionState.IDLE; processRequests(); } else { reads--; if(reads == 0) { currentTransactionState = TransactionState.IDLE; processRequests(); } } return clusters.state.transactionId; } } @Override public Transaction askWriteTransaction(final long transactionId) throws ProCoreException { try { if (isClosing || unexpectedClose || closed) { throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!"); } return transactionManager.askWriteTransaction(); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long endTransaction(long transactionId) throws ProCoreException { return transactionManager.endTransaction(transactionId); } @Override public String execute(String command) throws ProCoreException { // This is called only by WriteGraphImpl.commitAccessorChanges // We can ignore this in Acorn return ""; } @Override public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { try { return clusters.getMetadata(changeSetId); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate onChangeSetupate) throws ProCoreException { new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();; return null; } @Override public ChangeSetIds getChangeSetIds() throws ProCoreException { throw new UnsupportedOperationException(); } @Override public Cluster getCluster(byte[] clusterId) throws ProCoreException { throw new UnsupportedOperationException(); } @Override public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { throw new UnsupportedOperationException(); } @Override public ClusterIds getClusterIds() throws ProCoreException { try { return clusters.getClusterIds(); } catch (IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public Information getInformation() throws ProCoreException { return new Information() { @Override public String getServerId() { return "server"; } @Override public String getProtocolId() { return ""; } @Override public String getDatabaseId() { return "database"; } @Override public long getFirstChangeSetId() { return 0; } }; } @Override public Refresh getRefresh(long changeSetId) throws ProCoreException { final ClusterIds ids = getClusterIds(); return new Refresh() { @Override public long getHeadChangeSetId() { return clusters.state.headChangeSetId; } @Override public long[] getFirst() { return ids.getFirst(); } @Override public long[] getSecond() { return ids.getSecond(); } }; } public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException { return clusters.getResourceFile(clusterUID, resourceIndex); } @Override public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException { try { return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize); } catch (AcornAccessVerificationException | IllegalAcornStateException e) { throw new ProCoreException(e); } } @Override public long reserveIds(int count) throws ProCoreException { return clusters.state.reservedIds++; } @Override public void updateCluster(byte[] operations) throws ProCoreException { ClusterInfo info = null; try { ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations); info = clusters.clusterLRU.getOrCreate(operation.uid, true); if(info == null) throw new IllegalAcornStateException("info == null for operation " + operation); info.acquireMutex(); info.scheduleUpdate(); mainProgram.schedule(operation); } catch (IllegalAcornStateException | AcornAccessVerificationException e) { throw new ProCoreException(e); } finally { if (info != null) info.releaseMutex(); } } private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException { String[] ss = ccsId.split("\\."); String chunkKey = ss[0]; int chunkOffset = Integer.parseInt(ss[1]); ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey); if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found."); chunk.acquireMutex(); try { return chunk.getUndoProcessor(clusters, chunkOffset, ccsId); } catch (DatabaseException e) { throw e; } catch (Throwable t) { throw new IllegalStateException(t); } finally { chunk.releaseMutex(); } } private void performUndo(String ccsId, ArrayList> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException { UndoClusterUpdateProcessor proc = getUndoCSS(ccsId); int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID()); clusters.clusterLRU.acquireMutex(); try { ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID()); for(int i=0;i> clusterChanges = new ArrayList>(); UndoClusterSupport support = new UndoClusterSupport(clusters); final int changeSetId = clusters.state.headChangeSetId; if(ClusterUpdateProcessorBase.DEBUG) System.err.println(" === BEGIN UNDO ==="); for(int i=0;i ccss = clusters.getChanges(id); for(int j=0;j pair = clusterChanges.get(i); final ClusterUID cuid = pair.first; final byte[] data = pair.second; onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() { @Override public long getChangeSetId() { return changeSetId; } @Override public int getChangeSetIndex() { return 0; } @Override public int getNumberOfClusterChangeSets() { return clusterChanges.size(); } @Override public int getIndexOfClusterChangeSet() { return changeSetIndex; } @Override public byte[] getClusterId() { return cuid.asBytes(); } @Override public boolean getNewCluster() { return false; } @Override public byte[] getData() { return data; } }); } } catch (AcornAccessVerificationException | IllegalAcornStateException e1) { throw new ProCoreException(e1); } return false; } public ServiceLocator getServiceLocator() { return locator; } @Override public boolean refreshEnabled() { return false; } @Override public boolean rolledback() { return clusters.rolledback(); } //////////////////////// }