--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
+import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.SDBException;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.logging.TimeLogger;
+
+import gnu.trove.map.hash.TLongObjectHashMap;
+
+public class GraphClientImpl2 implements Database.Session {
+
+ public static final boolean DEBUG = false;
+
+ public final ClusterManager clusters;
+
+ private TransactionManager transactionManager = new TransactionManager();
+ private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
+ private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
+
+ private static GraphClientImpl2 INSTANCE;
+ private Path dbFolder;
+ private final Database database;
+ private ServiceLocator locator;
+ private MainProgram mainProgram;
+
+ static class ClientThreadFactory implements ThreadFactory {
+
+ final String name;
+ final boolean daemon;
+
+ public ClientThreadFactory(String name, boolean daemon) {
+ this.name = name;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name);
+ thread.setDaemon(daemon);
+ return thread;
+ }
+ }
+
+ public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
+ this.database = database;
+ this.dbFolder = dbFolder;
+ this.locator = locator;
+ this.clusters = new ClusterManager(dbFolder);
+ load();
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
+ mainProgram = new MainProgram(this, clusters);
+ executor.execute(mainProgram);
+ INSTANCE = this;
+ }
+
+ public Path getDbFolder() {
+ return dbFolder;
+ }
+
+ public void tryMakeSnapshot() throws IOException {
+
+ if (isClosing)
+ return;
+
+ saver.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Transaction tr = null;
+ try {
+ // First take a write transaction
+ tr = askWriteTransaction(-1);
+ // Then make sure that MainProgram is idling
+ mainProgram.mutex.acquire();
+ try {
+ synchronized(mainProgram) {
+ if(mainProgram.operations.isEmpty()) {
+ makeSnapshot(false);
+ } else {
+ // MainProgram is becoming busy again - delay snapshotting
+ return;
+ }
+ }
+ } finally {
+ mainProgram.mutex.release();
+ }
+ } catch (IOException e) {
+ Logger.defaultLogError(e);
+ } catch (ProCoreException e) {
+ Logger.defaultLogError(e);
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ } finally {
+ try {
+ if(tr != null)
+ endTransaction(tr.getTransactionId());
+ } catch (ProCoreException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ }
+
+ });
+ }
+
+ public void makeSnapshot(boolean force) throws IOException {
+ clusters.makeSnapshot(locator, force);
+ }
+
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+ return clusters.clone(uid, creator);
+ }
+
+// private void save() throws IOException {
+// clusters.save();
+// }
+
+ public void load() throws IOException {
+ clusters.load();
+ }
+
+// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
+// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
+// }
+
+ @Override
+ public Database getDatabase() {
+ return database;
+ }
+
+ private boolean closed = false;
+ private boolean isClosing = false;
+
+ @Override
+ public void close() throws ProCoreException {
+ System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+ if(!closed && !isClosing) {
+ isClosing = true;
+ try {
+ makeSnapshot(true);
+
+ mainProgram.close();
+ clusters.shutdown();
+ executor.shutdown();
+ saver.shutdown();
+ boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+ System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
+
+ INSTANCE = null;
+ mainProgram = null;
+ executor = null;
+ saver = null;
+
+ } catch (IOException | InterruptedException e) {
+ throw new ProCoreException(e);
+ }
+ }
+ closed = true;
+ //impl.close();
+ }
+
+ @Override
+ public void open() throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isClosed() throws ProCoreException {
+ return closed;
+ }
+
+ @Override
+ public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
+
+ clusters.state.headChangeSetId++;
+
+ long committedChangeSetId = changeSetId + 1;
+
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+
+ clusters.state.transactionId = transactionId;
+
+ mainProgram.committed();
+
+ TimeLogger.log("Accepted commit");
+
+ }
+
+ @Override
+ public long cancelCommit(long transactionId, long changeSetId,
+ byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
+ throws ProCoreException {
+ System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
+ try {
+ undo(new long[] {changeSetId}, onChangeSetUpdate);
+ } catch (SDBException e) {
+ e.printStackTrace();
+ throw new ProCoreException(e);
+ }
+ clusters.state.headChangeSetId++;
+ return clusters.state.headChangeSetId;
+ }
+
+ @Override
+ public Transaction askReadTransaction() throws ProCoreException {
+ return transactionManager.askReadTransaction();
+ }
+
+ enum TransactionState {
+ IDLE,WRITE,READ
+ }
+
+ class TransactionRequest {
+ public TransactionState state;
+ public Semaphore semaphore;
+ public TransactionRequest(TransactionState state, Semaphore semaphore) {
+ this.state = state;
+ this.semaphore = semaphore;
+ }
+ }
+
+ class TransactionManager {
+
+ private TransactionState currentTransactionState = TransactionState.IDLE;
+
+ private int reads = 0;
+
+ LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
+
+ TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
+
+ private synchronized Transaction makeTransaction(TransactionRequest req) {
+
+ final int csId = clusters.state.headChangeSetId;
+ final long trId = clusters.state.transactionId+1;
+ requestMap.put(trId, req);
+ return new Transaction() {
+
+ @Override
+ public long getTransactionId() {
+ return trId;
+ }
+
+ @Override
+ public long getHeadChangeSetId() {
+ return csId;
+ }
+ };
+ }
+
+ /*
+ * This method cannot be synchronized since it waits and must support multiple entries
+ * by query thread(s) and internal transactions such as snapshot saver
+ */
+ public Transaction askReadTransaction() throws ProCoreException {
+
+ Semaphore semaphore = new Semaphore(0);
+
+ TransactionRequest req = queue(TransactionState.READ, semaphore);
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new ProCoreException(e);
+ }
+
+ return makeTransaction(req);
+
+ }
+
+ private synchronized void dispatch() {
+ TransactionRequest r = requests.removeFirst();
+ if(r.state == TransactionState.READ) reads++;
+ r.semaphore.release();
+ }
+
+ private synchronized void processRequests() {
+
+ while(true) {
+
+ if(requests.isEmpty()) return;
+ TransactionRequest req = requests.peek();
+
+ if(currentTransactionState == TransactionState.IDLE) {
+
+ // Accept anything while IDLE
+ currentTransactionState = req.state;
+ dispatch();
+
+ } else if (currentTransactionState == TransactionState.READ) {
+
+ if(req.state == currentTransactionState) {
+
+ // Allow other reads
+ dispatch();
+
+ } else {
+
+ // Wait
+ return;
+
+ }
+
+ } else if (currentTransactionState == TransactionState.WRITE) {
+
+ // Wait
+ return;
+
+ }
+
+ }
+
+ }
+
+ private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
+ TransactionRequest req = new TransactionRequest(state, semaphore);
+ requests.addLast(req);
+ processRequests();
+ return req;
+ }
+
+ /*
+ * This method cannot be synchronized since it waits and must support multiple entries
+ * by query thread(s) and internal transactions such as snapshot saver
+ */
+ public Transaction askWriteTransaction()
+ throws ProCoreException {
+
+ Semaphore semaphore = new Semaphore(0);
+
+ TransactionRequest req = queue(TransactionState.WRITE, semaphore);
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new ProCoreException(e);
+ }
+
+ mainProgram.startTransaction(clusters.state.headChangeSetId+1);
+
+ return makeTransaction(req);
+
+ }
+
+ public synchronized long endTransaction(long transactionId) throws ProCoreException {
+
+ TransactionRequest req = requestMap.remove(transactionId);
+ if(req.state == TransactionState.WRITE) {
+ currentTransactionState = TransactionState.IDLE;
+ processRequests();
+ } else {
+ reads--;
+ if(reads == 0) {
+ currentTransactionState = TransactionState.IDLE;
+ processRequests();
+ }
+ }
+ return clusters.state.transactionId;
+ }
+
+ }
+
+ @Override
+ public Transaction askWriteTransaction(final long transactionId)
+ throws ProCoreException {
+ return transactionManager.askWriteTransaction();
+ }
+
+ @Override
+ public long endTransaction(long transactionId) throws ProCoreException {
+ return transactionManager.endTransaction(transactionId);
+ }
+
+ @Override
+ public String execute(String command) throws ProCoreException {
+ // This is called only by WriteGraphImpl.commitAccessorChanges
+ // We can ignore this in Acorn
+ return "";
+ }
+
+ @Override
+ public byte[] getChangeSetMetadata(long changeSetId)
+ throws ProCoreException {
+ return clusters.getMetadata(changeSetId);
+ }
+
+ @Override
+ public ChangeSetData getChangeSetData(long minChangeSetId,
+ long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
+ throws ProCoreException {
+
+ new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
+ return null;
+
+ }
+
+ @Override
+ public ChangeSetIds getChangeSetIds() throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cluster getCluster(byte[] clusterId) throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
+ throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterIds getClusterIds() throws ProCoreException {
+ return clusters.getClusterIds();
+ }
+
+ @Override
+ public Information getInformation() throws ProCoreException {
+ return new Information() {
+
+ @Override
+ public String getServerId() {
+ return "server";
+ }
+
+ @Override
+ public String getProtocolId() {
+ return "";
+ }
+
+ @Override
+ public String getDatabaseId() {
+ return "database";
+ }
+
+ @Override
+ public long getFirstChangeSetId() {
+ return 0;
+ }
+
+ };
+ }
+
+ @Override
+ public Refresh getRefresh(long changeSetId) throws ProCoreException {
+
+ final ClusterIds ids = getClusterIds();
+
+ return new Refresh() {
+
+ @Override
+ public long getHeadChangeSetId() {
+ return clusters.state.headChangeSetId;
+ }
+
+ @Override
+ public long[] getFirst() {
+ return ids.getFirst();
+ }
+
+ @Override
+ public long[] getSecond() {
+ return ids.getSecond();
+ }
+
+ };
+
+ }
+
+ public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
+ return clusters.getResourceFile(clusterUID, resourceIndex);
+ }
+
+ @Override
+ public ResourceSegment getResourceSegment(final byte[] clusterUID,
+ final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
+
+ return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+
+ }
+
+ @Override
+ public long reserveIds(int count) throws ProCoreException {
+ return clusters.state.reservedIds++;
+ }
+
+ @Override
+ public void updateCluster(byte[] operations) throws ProCoreException {
+
+ ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
+ ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
+ if(info == null) throw new IllegalStateException();
+ info.acquireMutex();
+ try {
+ info.scheduleUpdate();
+ mainProgram.schedule(operation);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+
+ }
+
+ private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
+
+ String[] ss = ccsId.split("\\.");
+ String chunkKey = ss[0];
+ int chunkOffset = Integer.parseInt(ss[1]);
+ ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
+ if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
+ chunk.acquireMutex();
+ try {
+ return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ chunk.releaseMutex();
+ }
+
+ }
+
+ private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
+
+ UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
+
+ int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
+
+ clusters.clusterLRU.acquireMutex();
+ try {
+
+ ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
+ for(int i=0;i<proc.entries.size();i++) {
+
+ Entry e = proc.entries.get(proc.entries.size() - 1 - i);
+ e.process(clusters, cs, clusterKey);
+
+ }
+
+ cs.flush();
+
+ } finally {
+ clusters.clusterLRU.releaseMutex();
+ }
+
+ }
+
+ @Override
+ public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
+
+ final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
+
+ UndoClusterSupport support = new UndoClusterSupport(clusters);
+
+ final int changeSetId = clusters.state.headChangeSetId;
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === BEGIN UNDO ===");
+
+ for(int i=0;i<changeSetIds.length;i++) {
+ final long id = changeSetIds[changeSetIds.length-1-i];
+ ArrayList<String> ccss = clusters.getChanges(id);
+ for(int j=0;j<ccss.size();j++) {
+ try {
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
+ performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === END UNDO ===");
+
+ for(int i=0;i<clusterChanges.size();i++) {
+
+ final int changeSetIndex = i;
+
+ final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+
+ final ClusterUID cuid = pair.first;
+ final byte[] data = pair.second;
+
+ onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+ @Override
+ public long getChangeSetId() {
+ return changeSetId;
+ }
+
+ @Override
+ public int getChangeSetIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return clusterChanges.size();
+ }
+
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return changeSetIndex;
+ }
+
+ @Override
+ public byte[] getClusterId() {
+ return cuid.asBytes();
+ }
+
+ @Override
+ public boolean getNewCluster() {
+ return false;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ });
+
+ }
+
+
+ return false;
+
+ }
+
+ public static GraphClientImpl2 getInstance() {
+ return INSTANCE;
+ }
+
+ public ServiceLocator getServiceLocator() {
+ return locator;
+ }
+
+ @Override
+ public boolean refreshEnabled() {
+ return false;
+ }
+
+
+
+
+
+
+
+
+
+
+
+ ////////////////////////
+
+
+
+
+
+
+
+
+
+
+
+
+}
+