1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterInfo;
31 import org.simantics.acorn.lru.ClusterStreamChunk;
32 import org.simantics.acorn.lru.ClusterUpdateOperation;
33 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.common.utils.Logger;
38 import org.simantics.db.exception.DatabaseException;
39 import org.simantics.db.exception.SDBException;
40 import org.simantics.db.server.ProCoreException;
41 import org.simantics.db.service.ClusterSetsSupport;
42 import org.simantics.db.service.ClusterUID;
43 import org.simantics.db.service.LifecycleSupport;
44 import org.simantics.utils.datastructures.Pair;
45 import org.simantics.utils.logging.TimeLogger;
47 import gnu.trove.map.hash.TLongObjectHashMap;
49 public class GraphClientImpl2 implements Database.Session {
51 public static final boolean DEBUG = false;
53 public final ClusterManager clusters;
55 private TransactionManager transactionManager = new TransactionManager();
56 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
57 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
59 private Path dbFolder;
60 private final Database database;
61 private ServiceLocator locator;
62 private MainProgram mainProgram;
64 static class ClientThreadFactory implements ThreadFactory {
69 public ClientThreadFactory(String name, boolean daemon) {
75 public Thread newThread(Runnable r) {
76 Thread thread = new Thread(r, name);
77 thread.setDaemon(daemon);
82 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
83 this.database = database;
84 this.dbFolder = dbFolder;
85 this.locator = locator;
86 this.clusters = new ClusterManager(dbFolder);
88 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
89 cssi.setReadDirectory(clusters.lastSessionDirectory);
90 cssi.updateWriteDirectory(clusters.workingDirectory);
91 mainProgram = new MainProgram(this, clusters);
92 executor.execute(mainProgram);
95 public Path getDbFolder() {
99 public void tryMakeSnapshot() throws IOException {
101 if (isClosing || unexpectedClose)
104 saver.execute(new Runnable() {
108 Transaction tr = null;
110 // First take a write transaction
111 tr = askWriteTransaction(-1);
112 // Then make sure that MainProgram is idling
113 mainProgram.mutex.acquire();
115 synchronized(mainProgram) {
116 if(mainProgram.operations.isEmpty()) {
119 // MainProgram is becoming busy again - delay snapshotting
124 mainProgram.mutex.release();
126 } catch (IllegalAcornStateException | ProCoreException e) {
127 Logger.defaultLogError("Snapshotting failed", e);
128 unexpectedClose = true;
129 } catch (InterruptedException e) {
130 Logger.defaultLogError("Snapshotting interrupted", e);
134 endTransaction(tr.getTransactionId());
135 if (unexpectedClose) {
136 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
139 } catch (DatabaseException e1) {
140 Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
143 } catch (ProCoreException e) {
144 Logger.defaultLogError("Failed to end snapshotting write transaction", e);
151 public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
152 clusters.makeSnapshot(locator, fullSave);
155 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
157 return clusters.clone(uid, creator);
158 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
159 unexpectedClose = true;
160 throw new DatabaseException(e);
164 // private void save() throws IOException {
168 public void load() throws IOException {
172 // public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
173 // clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
177 public Database getDatabase() {
181 private boolean closed = false;
182 private boolean isClosing = false;
183 private boolean unexpectedClose = false;
186 public void close() throws ProCoreException {
187 System.err.println("Closing " + this + " and mainProgram " + mainProgram);
188 if(!closed && !isClosing) {
191 if (!unexpectedClose)
198 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
199 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
201 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
207 } catch (IllegalAcornStateException | InterruptedException e) {
208 throw new ProCoreException(e);
216 public void open() throws ProCoreException {
217 throw new UnsupportedOperationException();
221 public boolean isClosed() throws ProCoreException {
226 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
227 clusters.state.headChangeSetId++;
228 long committedChangeSetId = changeSetId + 1;
230 clusters.commitChangeSet(committedChangeSetId, metadata);
232 clusters.state.transactionId = transactionId;
234 mainProgram.committed();
236 TimeLogger.log("Accepted commit");
237 } catch (IllegalAcornStateException e) {
238 throw new ProCoreException(e);
243 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
244 // Accept and finalize current transaction and then undo it
245 acceptCommit(transactionId, changeSetId, metadata);
248 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
249 clusters.state.headChangeSetId++;
250 return clusters.state.headChangeSetId;
251 } catch (SDBException e) {
252 Logger.defaultLogError("Failed to undo cancelled transaction", e);
253 throw new ProCoreException(e);
258 public Transaction askReadTransaction() throws ProCoreException {
259 return transactionManager.askReadTransaction();
262 enum TransactionState {
266 class TransactionRequest {
267 public TransactionState state;
268 public Semaphore semaphore;
269 public TransactionRequest(TransactionState state, Semaphore semaphore) {
271 this.semaphore = semaphore;
275 class TransactionManager {
277 private TransactionState currentTransactionState = TransactionState.IDLE;
279 private int reads = 0;
281 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
283 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
285 private synchronized Transaction makeTransaction(TransactionRequest req) {
287 final int csId = clusters.state.headChangeSetId;
288 final long trId = clusters.state.transactionId+1;
289 requestMap.put(trId, req);
290 return new Transaction() {
293 public long getTransactionId() {
298 public long getHeadChangeSetId() {
305 * This method cannot be synchronized since it waits and must support multiple entries
306 * by query thread(s) and internal transactions such as snapshot saver
308 public Transaction askReadTransaction() throws ProCoreException {
310 Semaphore semaphore = new Semaphore(0);
312 TransactionRequest req = queue(TransactionState.READ, semaphore);
316 } catch (InterruptedException e) {
317 throw new ProCoreException(e);
320 return makeTransaction(req);
324 private synchronized void dispatch() {
325 TransactionRequest r = requests.removeFirst();
326 if(r.state == TransactionState.READ) reads++;
327 r.semaphore.release();
330 private synchronized void processRequests() {
334 if(requests.isEmpty()) return;
335 TransactionRequest req = requests.peek();
337 if(currentTransactionState == TransactionState.IDLE) {
339 // Accept anything while IDLE
340 currentTransactionState = req.state;
343 } else if (currentTransactionState == TransactionState.READ) {
345 if(req.state == currentTransactionState) {
357 } else if (currentTransactionState == TransactionState.WRITE) {
368 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
369 TransactionRequest req = new TransactionRequest(state, semaphore);
370 requests.addLast(req);
376 * This method cannot be synchronized since it waits and must support multiple entries
377 * by query thread(s) and internal transactions such as snapshot saver
379 public Transaction askWriteTransaction() throws IllegalAcornStateException {
381 Semaphore semaphore = new Semaphore(0);
382 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
386 } catch (InterruptedException e) {
387 throw new IllegalAcornStateException(e);
389 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
390 return makeTransaction(req);
393 public synchronized long endTransaction(long transactionId) throws ProCoreException {
395 TransactionRequest req = requestMap.remove(transactionId);
396 if(req.state == TransactionState.WRITE) {
397 currentTransactionState = TransactionState.IDLE;
402 currentTransactionState = TransactionState.IDLE;
406 return clusters.state.transactionId;
412 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
414 if (isClosing || unexpectedClose || closed) {
415 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
417 return transactionManager.askWriteTransaction();
418 } catch (IllegalAcornStateException e) {
419 throw new ProCoreException(e);
424 public long endTransaction(long transactionId) throws ProCoreException {
425 return transactionManager.endTransaction(transactionId);
429 public String execute(String command) throws ProCoreException {
430 // This is called only by WriteGraphImpl.commitAccessorChanges
431 // We can ignore this in Acorn
436 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
438 return clusters.getMetadata(changeSetId);
439 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
440 throw new ProCoreException(e);
445 public ChangeSetData getChangeSetData(long minChangeSetId,
446 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
447 throws ProCoreException {
449 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
455 public ChangeSetIds getChangeSetIds() throws ProCoreException {
456 throw new UnsupportedOperationException();
460 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
461 throw new UnsupportedOperationException();
465 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
466 throws ProCoreException {
467 throw new UnsupportedOperationException();
471 public ClusterIds getClusterIds() throws ProCoreException {
473 return clusters.getClusterIds();
474 } catch (IllegalAcornStateException e) {
475 throw new ProCoreException(e);
480 public Information getInformation() throws ProCoreException {
481 return new Information() {
484 public String getServerId() {
489 public String getProtocolId() {
494 public String getDatabaseId() {
499 public long getFirstChangeSetId() {
507 public Refresh getRefresh(long changeSetId) throws ProCoreException {
509 final ClusterIds ids = getClusterIds();
511 return new Refresh() {
514 public long getHeadChangeSetId() {
515 return clusters.state.headChangeSetId;
519 public long[] getFirst() {
520 return ids.getFirst();
524 public long[] getSecond() {
525 return ids.getSecond();
532 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
533 return clusters.getResourceFile(clusterUID, resourceIndex);
537 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
539 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
540 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
541 throw new ProCoreException(e);
546 public long reserveIds(int count) throws ProCoreException {
547 return clusters.state.reservedIds++;
551 public void updateCluster(byte[] operations) throws ProCoreException {
552 ClusterInfo info = null;
554 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
555 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
557 throw new IllegalAcornStateException("info == null for operation " + operation);
559 info.scheduleUpdate();
560 mainProgram.schedule(operation);
561 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
562 throw new ProCoreException(e);
569 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
571 String[] ss = ccsId.split("\\.");
572 String chunkKey = ss[0];
573 int chunkOffset = Integer.parseInt(ss[1]);
574 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
575 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
576 chunk.acquireMutex();
578 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
579 } catch (DatabaseException e) {
581 } catch (Throwable t) {
582 throw new IllegalStateException(t);
584 chunk.releaseMutex();
588 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
589 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
591 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
593 clusters.clusterLRU.acquireMutex();
596 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
597 for(int i=0;i<proc.entries.size();i++) {
599 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
600 e.process(clusters, cs, clusterKey);
605 clusters.clusterLRU.releaseMutex();
610 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
612 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
615 public void run() throws Exception {
619 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
621 UndoClusterSupport support = new UndoClusterSupport(clusters);
623 final int changeSetId = clusters.state.headChangeSetId;
625 if(ClusterUpdateProcessorBase.DEBUG)
626 System.err.println(" === BEGIN UNDO ===");
628 for(int i=0;i<changeSetIds.length;i++) {
629 final long id = changeSetIds[changeSetIds.length-1-i];
630 ArrayList<String> ccss = clusters.getChanges(id);
632 for(int j=0;j<ccss.size();j++) {
633 String ccsid = ccss.get(ccss.size()-j-1);
635 if(ClusterUpdateProcessorBase.DEBUG)
636 System.err.println("performUndo " + ccsid);
637 performUndo(ccsid, clusterChanges, support);
638 } catch (DatabaseException e) {
644 if(ClusterUpdateProcessorBase.DEBUG)
645 System.err.println(" === END UNDO ===");
647 for(int i=0;i<clusterChanges.size();i++) {
649 final int changeSetIndex = i;
651 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
653 final ClusterUID cuid = pair.first;
654 final byte[] data = pair.second;
656 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
659 public long getChangeSetId() {
664 public int getChangeSetIndex() {
669 public int getNumberOfClusterChangeSets() {
670 return clusterChanges.size();
674 public int getIndexOfClusterChangeSet() {
675 return changeSetIndex;
679 public byte[] getClusterId() {
680 return cuid.asBytes();
684 public boolean getNewCluster() {
689 public byte[] getData() {
695 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
696 throw new ProCoreException(e1);
708 if(exception instanceof SDBException) throw (SDBException)exception;
709 else if(exception != null) throw new IllegalAcornStateException(exception);
715 public ServiceLocator getServiceLocator() {
720 public boolean refreshEnabled() {
725 public boolean rolledback() {
726 return clusters.rolledback();
739 ////////////////////////