]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Sharing org.simantics.acorn for everyone to use
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
new file mode 100644 (file)
index 0000000..2796e3e
--- /dev/null
@@ -0,0 +1,708 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
+import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.SDBException;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.logging.TimeLogger;
+
+import gnu.trove.map.hash.TLongObjectHashMap;
+
+public class GraphClientImpl2 implements Database.Session {
+       
+       public static final boolean DEBUG = false;
+
+       public final ClusterManager clusters;
+       
+       private TransactionManager transactionManager = new TransactionManager();
+       private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
+       private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
+
+       private static GraphClientImpl2 INSTANCE;
+       private Path dbFolder;
+       private final Database database;
+       private ServiceLocator locator;
+       private MainProgram mainProgram;
+
+       static class ClientThreadFactory implements ThreadFactory {
+               
+               final String name;
+               final boolean daemon;
+               
+               public ClientThreadFactory(String name, boolean daemon) {
+                       this.name = name;
+                       this.daemon = daemon;
+               }
+               
+               @Override
+               public Thread newThread(Runnable r) {
+                       Thread thread = new Thread(r, name);
+                       thread.setDaemon(daemon);
+                       return thread;
+               }
+       }
+
+       public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
+           this.database = database;
+           this.dbFolder = dbFolder;
+           this.locator = locator;
+           this.clusters = new ClusterManager(dbFolder);
+           load();
+           ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
+           cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
+           mainProgram = new MainProgram(this, clusters);
+           executor.execute(mainProgram);
+           INSTANCE = this;
+       }
+
+       public Path getDbFolder() {
+           return dbFolder;
+       }
+
+       public void tryMakeSnapshot() throws IOException {
+               
+           if (isClosing)
+               return;
+           
+               saver.execute(new Runnable() {
+
+                       @Override
+                       public void run() {
+                               Transaction tr = null;
+                               try {
+                                       // First take a write transaction
+                                       tr = askWriteTransaction(-1);
+                                       // Then make sure that MainProgram is idling
+                                       mainProgram.mutex.acquire();
+                                       try {
+                                               synchronized(mainProgram) {
+                                                       if(mainProgram.operations.isEmpty()) {
+                                                               makeSnapshot(false);
+                                                       } else {
+                                                               // MainProgram is becoming busy again - delay snapshotting
+                                                               return;
+                                                       }
+                                               }
+                                       } finally {
+                                               mainProgram.mutex.release();
+                                       }
+                               } catch (IOException e) {
+                                       Logger.defaultLogError(e);
+                               } catch (ProCoreException e) {
+                                       Logger.defaultLogError(e);
+                               } catch (InterruptedException e) {
+                                       Logger.defaultLogError(e);
+                               } finally {
+                                       try {
+                                               if(tr != null)
+                                                       endTransaction(tr.getTransactionId());
+                                       } catch (ProCoreException e) {
+                                               Logger.defaultLogError(e);
+                                       }
+                               }
+                       }
+                       
+               });
+       }
+       
+    public void makeSnapshot(boolean force) throws IOException {
+        clusters.makeSnapshot(locator, force);
+    }
+       
+       public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+               return clusters.clone(uid, creator);
+       }
+
+//     private void save() throws IOException {
+//             clusters.save();
+//     }
+       
+       public void load() throws IOException {
+               clusters.load();
+       }
+       
+//     public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
+//             clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
+//     }
+
+       @Override
+       public Database getDatabase() {
+               return database;
+       }
+
+       private boolean closed = false;
+       private boolean isClosing = false;
+       
+       @Override
+       public void close() throws ProCoreException {
+           System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+               if(!closed && !isClosing) {
+                   isClosing = true;
+                       try {
+                               makeSnapshot(true);
+                               
+                               mainProgram.close();
+                               clusters.shutdown();
+                               executor.shutdown();
+                               saver.shutdown();
+                               boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+                               boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
+                               
+                               System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
+                               
+                               INSTANCE = null;
+                               mainProgram = null;
+                               executor = null;
+                               saver = null;
+                               
+                       } catch (IOException | InterruptedException e) {
+                               throw new ProCoreException(e);
+                       }
+               }
+               closed = true;
+               //impl.close();
+       }
+
+       @Override
+       public void open() throws ProCoreException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean isClosed() throws ProCoreException {
+               return closed;
+       }
+       
+       @Override
+       public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
+               
+               clusters.state.headChangeSetId++;
+
+               long committedChangeSetId = changeSetId + 1;
+               
+               clusters.commitChangeSet(committedChangeSetId, metadata);
+               
+               clusters.state.transactionId = transactionId;
+               
+               mainProgram.committed();
+               
+               TimeLogger.log("Accepted commit");
+               
+       }
+
+       @Override
+       public long cancelCommit(long transactionId, long changeSetId,
+                       byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
+                       throws ProCoreException {
+           System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
+           try {
+            undo(new long[] {changeSetId}, onChangeSetUpdate);
+        } catch (SDBException e) {
+            e.printStackTrace();
+            throw new ProCoreException(e);
+        }
+           clusters.state.headChangeSetId++;
+           return clusters.state.headChangeSetId;
+       }
+
+       @Override
+       public Transaction askReadTransaction() throws ProCoreException {
+               return transactionManager.askReadTransaction();
+       }
+
+       enum TransactionState {
+               IDLE,WRITE,READ
+       }
+       
+       class TransactionRequest {
+               public TransactionState state;
+               public Semaphore semaphore;
+               public TransactionRequest(TransactionState state, Semaphore semaphore) {
+                       this.state = state;
+                       this.semaphore = semaphore;
+               }
+       }
+
+       class TransactionManager {
+
+               private TransactionState currentTransactionState = TransactionState.IDLE;
+               
+               private int reads = 0;
+               
+               LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
+               
+               TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
+               
+               private synchronized Transaction makeTransaction(TransactionRequest req) {
+                       
+                       final int csId = clusters.state.headChangeSetId;
+                       final long trId = clusters.state.transactionId+1;
+                       requestMap.put(trId, req);
+                       return new Transaction() {
+                               
+                               @Override
+                               public long getTransactionId() {
+                                       return trId;
+                               }
+                               
+                               @Override
+                               public long getHeadChangeSetId() {
+                                       return csId;
+                               }
+                       };
+               }
+               
+               /*
+                * This method cannot be synchronized since it waits and must support multiple entries
+                * by query thread(s) and internal transactions such as snapshot saver
+                */
+               public Transaction askReadTransaction() throws ProCoreException {
+               
+                       Semaphore semaphore = new Semaphore(0);
+                       
+                       TransactionRequest req = queue(TransactionState.READ, semaphore);
+                       
+                       try {
+                               semaphore.acquire();
+                       } catch (InterruptedException e) {
+                               throw new ProCoreException(e);
+                       }
+                       
+                       return makeTransaction(req);
+
+               }
+
+               private synchronized void dispatch() {
+                       TransactionRequest r = requests.removeFirst();
+                       if(r.state == TransactionState.READ) reads++;
+                       r.semaphore.release();
+               }
+               
+               private synchronized void processRequests() {
+                       
+                       while(true) {
+
+                               if(requests.isEmpty()) return;
+                               TransactionRequest req = requests.peek();
+
+                               if(currentTransactionState == TransactionState.IDLE) {
+                               
+                                       // Accept anything while IDLE
+                                       currentTransactionState = req.state;
+                                       dispatch();
+                                       
+                               } else if (currentTransactionState == TransactionState.READ) {
+                                       
+                                       if(req.state == currentTransactionState) {
+
+                                               // Allow other reads
+                                               dispatch();
+
+                                       } else {
+                                               
+                                               // Wait
+                                               return;
+                                               
+                                       }
+                                       
+                               }  else if (currentTransactionState == TransactionState.WRITE) {
+
+                                       // Wait
+                                       return;
+                                       
+                               }
+                               
+                       }
+                       
+               }
+               
+               private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
+                       TransactionRequest req = new TransactionRequest(state, semaphore); 
+                       requests.addLast(req);
+                       processRequests();
+                       return req;
+               }
+               
+               /*
+                * This method cannot be synchronized since it waits and must support multiple entries
+                * by query thread(s) and internal transactions such as snapshot saver
+                */
+               public Transaction askWriteTransaction()
+                               throws ProCoreException {
+                       
+                       Semaphore semaphore = new Semaphore(0);
+                       
+                       TransactionRequest req = queue(TransactionState.WRITE, semaphore);
+                       
+                       try {
+                               semaphore.acquire();
+                       } catch (InterruptedException e) {
+                               throw new ProCoreException(e);
+                       }
+                       
+                       mainProgram.startTransaction(clusters.state.headChangeSetId+1);
+                       
+                       return makeTransaction(req);
+                       
+               }
+               
+               public synchronized long endTransaction(long transactionId) throws ProCoreException {
+                       
+                       TransactionRequest req = requestMap.remove(transactionId);
+                       if(req.state == TransactionState.WRITE) {
+                               currentTransactionState = TransactionState.IDLE;
+                               processRequests();
+                       } else {
+                               reads--;
+                               if(reads == 0) {
+                                       currentTransactionState = TransactionState.IDLE;
+                                       processRequests();
+                               }
+                       }
+                       return clusters.state.transactionId;
+               }
+
+       }
+       
+       @Override
+       public Transaction askWriteTransaction(final long transactionId)
+                       throws ProCoreException {
+               return transactionManager.askWriteTransaction();
+       }
+
+       @Override
+       public long endTransaction(long transactionId) throws ProCoreException {
+               return transactionManager.endTransaction(transactionId);
+       }
+
+       @Override
+       public String execute(String command) throws ProCoreException {
+               // This is called only by WriteGraphImpl.commitAccessorChanges
+               // We can ignore this in Acorn
+               return "";
+       }
+
+       @Override
+       public byte[] getChangeSetMetadata(long changeSetId)
+                       throws ProCoreException {
+               return clusters.getMetadata(changeSetId);
+       }
+
+       @Override
+       public ChangeSetData getChangeSetData(long minChangeSetId,
+                       long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
+                       throws ProCoreException {
+               
+               new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
+               return null;
+               
+       }
+
+       @Override
+       public ChangeSetIds getChangeSetIds() throws ProCoreException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Cluster getCluster(byte[] clusterId) throws ProCoreException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
+                       throws ProCoreException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public ClusterIds getClusterIds() throws ProCoreException {
+               return clusters.getClusterIds();
+       }
+
+       @Override
+       public Information getInformation() throws ProCoreException {
+               return new Information() {
+
+                       @Override
+                       public String getServerId() {
+                               return "server";
+                       }
+
+                       @Override
+                       public String getProtocolId() {
+                               return "";
+                       }
+
+                       @Override
+                       public String getDatabaseId() {
+                               return "database";
+                       }
+
+                       @Override
+                       public long getFirstChangeSetId() {
+                               return 0;
+                       }
+                       
+               };
+       }
+
+       @Override
+       public Refresh getRefresh(long changeSetId) throws ProCoreException {
+               
+               final ClusterIds ids = getClusterIds();
+               
+               return new Refresh() {
+
+                       @Override
+                       public long getHeadChangeSetId() {
+                               return clusters.state.headChangeSetId;
+                       }
+
+                       @Override
+                       public long[] getFirst() {
+                               return ids.getFirst();
+                       }
+
+                       @Override
+                       public long[] getSecond() {
+                               return ids.getSecond();
+                       }
+                       
+               };
+               
+       }
+
+       public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
+               return clusters.getResourceFile(clusterUID, resourceIndex);
+       }
+
+       @Override
+       public ResourceSegment getResourceSegment(final byte[] clusterUID,
+                       final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
+               
+               return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+
+       }
+
+       @Override
+       public long reserveIds(int count) throws ProCoreException {
+               return clusters.state.reservedIds++;
+       }
+
+       @Override
+       public void updateCluster(byte[] operations) throws ProCoreException {
+
+               ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
+               ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
+               if(info == null) throw new IllegalStateException();
+               info.acquireMutex();
+               try {
+                       info.scheduleUpdate();
+                       mainProgram.schedule(operation);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       info.releaseMutex();
+               }
+
+       }
+
+       private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
+
+               String[] ss = ccsId.split("\\.");
+               String chunkKey = ss[0];
+               int chunkOffset = Integer.parseInt(ss[1]);
+               ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
+               if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
+               chunk.acquireMutex();
+               try {
+                       return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
+               } catch (Throwable t) {
+                       throw new IllegalStateException(t);
+               } finally {
+                       chunk.releaseMutex();
+               }
+               
+       }
+       
+       private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
+
+               UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
+
+               int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
+
+               clusters.clusterLRU.acquireMutex();
+               try {
+
+                       ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
+                       for(int i=0;i<proc.entries.size();i++) {
+                               
+                               Entry e = proc.entries.get(proc.entries.size() - 1 - i);
+                               e.process(clusters, cs, clusterKey);
+                               
+                       }
+                       
+                       cs.flush();
+
+               } finally {
+                       clusters.clusterLRU.releaseMutex();
+               }
+               
+       }
+       
+       @Override
+       public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
+               
+               final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
+               
+               UndoClusterSupport support = new UndoClusterSupport(clusters);
+               
+               final int changeSetId = clusters.state.headChangeSetId;
+               
+               if(ClusterUpdateProcessorBase.DEBUG)
+                       System.err.println(" === BEGIN UNDO ===");
+               
+               for(int i=0;i<changeSetIds.length;i++) {
+                       final long id = changeSetIds[changeSetIds.length-1-i];
+                       ArrayList<String> ccss = clusters.getChanges(id);
+                       for(int j=0;j<ccss.size();j++) {
+                               try {
+                                       if(ClusterUpdateProcessorBase.DEBUG)
+                                               System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
+                                       performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
+                               } catch (DatabaseException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+
+               if(ClusterUpdateProcessorBase.DEBUG)
+                       System.err.println(" === END UNDO ===");
+
+               for(int i=0;i<clusterChanges.size();i++) {
+                       
+                       final int changeSetIndex = i;
+                       
+                       final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+                       
+                       final ClusterUID cuid = pair.first;
+                       final byte[] data = pair.second;
+
+                       onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+                               @Override
+                               public long getChangeSetId() {
+                                       return changeSetId;
+                               }
+
+                               @Override
+                               public int getChangeSetIndex() {
+                                       return 0;
+                               }
+
+                               @Override
+                               public int getNumberOfClusterChangeSets() {
+                                       return clusterChanges.size();
+                               }
+
+                               @Override
+                               public int getIndexOfClusterChangeSet() {
+                                       return changeSetIndex;
+                               }
+
+                               @Override
+                               public byte[] getClusterId() {
+                                       return cuid.asBytes();
+                               }
+
+                               @Override
+                               public boolean getNewCluster() {
+                                       return false;
+                               }
+
+                               @Override
+                               public byte[] getData() {
+                                       return data;
+                               }
+
+                       });
+
+               }
+
+               
+               return false;
+               
+       }
+       
+       public static GraphClientImpl2 getInstance() {
+           return INSTANCE;
+       }
+       
+       public ServiceLocator getServiceLocator() {
+           return locator;
+       }
+
+    @Override
+    public boolean refreshEnabled() {
+        return false;
+    }
+
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       ////////////////////////
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+}
+