1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
27 import org.simantics.acorn.MainProgram.MainProgramRunnable;
28 import org.simantics.acorn.backup.AcornBackupProvider;
29 import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
30 import org.simantics.acorn.exception.AcornAccessVerificationException;
31 import org.simantics.acorn.exception.IllegalAcornStateException;
32 import org.simantics.acorn.internal.ClusterChange;
33 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
34 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
35 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
36 import org.simantics.acorn.lru.ClusterInfo;
37 import org.simantics.acorn.lru.ClusterStreamChunk;
38 import org.simantics.acorn.lru.ClusterUpdateOperation;
39 import org.simantics.backup.BackupException;
40 import org.simantics.db.ClusterCreator;
41 import org.simantics.db.Database;
42 import org.simantics.db.ServiceLocator;
43 import org.simantics.db.exception.DatabaseException;
44 import org.simantics.db.exception.SDBException;
45 import org.simantics.db.server.ProCoreException;
46 import org.simantics.db.service.ClusterSetsSupport;
47 import org.simantics.db.service.ClusterUID;
48 import org.simantics.db.service.LifecycleSupport;
49 import org.simantics.utils.DataContainer;
50 import org.simantics.utils.datastructures.Pair;
51 import org.simantics.utils.logging.TimeLogger;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import gnu.trove.map.hash.TLongObjectHashMap;
57 public class GraphClientImpl2 implements Database.Session {
59 private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
60 public static final boolean DEBUG = false;
62 final ClusterManager clusters;
64 private TransactionManager transactionManager = new TransactionManager();
65 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
66 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
68 private Path dbFolder;
69 private final Database database;
70 private ServiceLocator locator;
71 private FileCache fileCache;
72 private MainProgram mainProgram;
74 private static class ClientThreadFactory implements ThreadFactory {
79 public ClientThreadFactory(String name, boolean daemon) {
85 public Thread newThread(Runnable r) {
86 Thread thread = new Thread(r, name);
87 thread.setDaemon(daemon);
92 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
93 this.database = database;
94 this.dbFolder = dbFolder;
95 this.locator = locator;
96 this.fileCache = new FileCache();
97 // This disposes the cache when the session is shut down
98 locator.registerService(FileCache.class, fileCache);
99 this.clusters = new ClusterManager(dbFolder, fileCache);
101 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
102 cssi.setReadDirectory(clusters.lastSessionDirectory);
103 cssi.updateWriteDirectory(clusters.workingDirectory);
104 mainProgram = new MainProgram(this, clusters);
105 executor.execute(mainProgram);
108 public Path getDbFolder() {
113 * This method schedules snapshotting.
114 * No lock and thread restrictions.
116 void tryMakeSnapshot() throws IOException {
118 if (isClosing || unexpectedClose)
121 saver.execute(new Runnable() {
125 Transaction tr = null;
127 // First take a write transaction
128 tr = askWriteTransaction(-1);
129 // Then make sure that MainProgram is idling
130 synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
131 } catch (IllegalAcornStateException | ProCoreException e) {
132 LOGGER.error("Snapshotting failed", e);
133 unexpectedClose = true;
134 } catch (SDBException e) {
135 LOGGER.error("Snapshotting failed", e);
136 unexpectedClose = true;
140 endTransaction(tr.getTransactionId());
141 if (unexpectedClose) {
142 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
145 } catch (DatabaseException e1) {
146 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
149 } catch (ProCoreException e) {
150 LOGGER.error("Failed to end snapshotting write transaction", e);
157 private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
158 clusters.makeSnapshot(locator, fullSave);
162 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
164 return clusters.clone(uid, creator);
165 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
166 unexpectedClose = true;
167 throw new DatabaseException(e);
171 private void load() throws IOException {
176 public Database getDatabase() {
180 private boolean closed = false;
181 private boolean isClosing = false;
182 private boolean unexpectedClose = false;
185 public void close() throws ProCoreException {
186 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
187 if(!closed && !isClosing) {
191 if (!unexpectedClose)
192 synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
199 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
200 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
202 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
205 clusters.mainState.save(dbFolder);
206 } catch (IOException e) {
207 LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
214 } catch (IllegalAcornStateException | InterruptedException e) {
215 throw new ProCoreException(e);
216 } catch (SDBException e1) {
217 throw new ProCoreException(e1);
225 public void open() throws ProCoreException {
226 throw new UnsupportedOperationException();
230 public boolean isClosed() throws ProCoreException {
235 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
236 clusters.state.headChangeSetId++;
237 long committedChangeSetId = changeSetId + 1;
239 clusters.commitChangeSet(committedChangeSetId, metadata);
240 clusters.state.transactionId = transactionId;
241 mainProgram.committed();
242 TimeLogger.log("Accepted commit");
243 } catch (IllegalAcornStateException e) {
244 throw new ProCoreException(e);
249 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
250 // Accept and finalize current transaction and then undo it
251 acceptCommit(transactionId, changeSetId, metadata);
254 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
255 clusters.state.headChangeSetId++;
256 return clusters.state.headChangeSetId;
257 } catch (SDBException e) {
258 LOGGER.error("Failed to undo cancelled transaction", e);
259 throw new ProCoreException(e);
264 public Transaction askReadTransaction() throws ProCoreException {
265 return transactionManager.askReadTransaction();
268 private enum TransactionState {
272 private class TransactionRequest {
273 public TransactionState state;
274 public Semaphore semaphore;
275 public TransactionRequest(TransactionState state, Semaphore semaphore) {
277 this.semaphore = semaphore;
281 private class TransactionManager {
283 private TransactionState currentTransactionState = TransactionState.IDLE;
285 private int reads = 0;
287 private LinkedList<TransactionRequest> requests = new LinkedList<>();
289 private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
291 private synchronized Transaction makeTransaction(TransactionRequest req) {
293 final int csId = clusters.state.headChangeSetId;
294 final long trId = clusters.state.transactionId+1;
295 requestMap.put(trId, req);
296 return new Transaction() {
299 public long getTransactionId() {
304 public long getHeadChangeSetId() {
311 * This method cannot be synchronized since it waits and must support multiple entries
312 * by query thread(s) and internal transactions such as snapshot saver
314 private Transaction askReadTransaction() throws ProCoreException {
316 Semaphore semaphore = new Semaphore(0);
318 TransactionRequest req = queue(TransactionState.READ, semaphore);
322 } catch (InterruptedException e) {
323 throw new ProCoreException(e);
326 return makeTransaction(req);
330 private synchronized void dispatch() {
331 TransactionRequest r = requests.removeFirst();
332 if(r.state == TransactionState.READ) reads++;
333 r.semaphore.release();
336 private synchronized void processRequests() {
340 if(requests.isEmpty()) return;
341 TransactionRequest req = requests.peek();
343 if(currentTransactionState == TransactionState.IDLE) {
345 // Accept anything while IDLE
346 currentTransactionState = req.state;
349 } else if (currentTransactionState == TransactionState.READ) {
351 if(req.state == currentTransactionState) {
363 } else if (currentTransactionState == TransactionState.WRITE) {
374 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
375 TransactionRequest req = new TransactionRequest(state, semaphore);
376 requests.addLast(req);
382 * This method cannot be synchronized since it waits and must support multiple entries
383 * by query thread(s) and internal transactions such as snapshot saver
385 private Transaction askWriteTransaction() throws IllegalAcornStateException {
387 Semaphore semaphore = new Semaphore(0);
388 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
392 } catch (InterruptedException e) {
393 throw new IllegalAcornStateException(e);
395 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
396 return makeTransaction(req);
399 private synchronized long endTransaction(long transactionId) throws ProCoreException {
401 TransactionRequest req = requestMap.remove(transactionId);
402 if(req.state == TransactionState.WRITE) {
403 currentTransactionState = TransactionState.IDLE;
408 currentTransactionState = TransactionState.IDLE;
412 return clusters.state.transactionId;
418 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
420 if (isClosing || unexpectedClose || closed) {
421 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
423 return transactionManager.askWriteTransaction();
424 } catch (IllegalAcornStateException e) {
425 throw new ProCoreException(e);
430 public long endTransaction(long transactionId) throws ProCoreException {
431 return transactionManager.endTransaction(transactionId);
435 public String execute(String command) throws ProCoreException {
436 // This is called only by WriteGraphImpl.commitAccessorChanges
437 // We can ignore this in Acorn
442 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
444 return clusters.getMetadata(changeSetId);
445 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
446 throw new ProCoreException(e);
451 public ChangeSetData getChangeSetData(long minChangeSetId,
452 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
453 throws ProCoreException {
455 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
461 public ChangeSetIds getChangeSetIds() throws ProCoreException {
462 throw new UnsupportedOperationException();
466 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
467 throw new UnsupportedOperationException();
471 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
472 throws ProCoreException {
473 throw new UnsupportedOperationException();
477 public ClusterIds getClusterIds() throws ProCoreException {
479 return clusters.getClusterIds();
480 } catch (IllegalAcornStateException e) {
481 throw new ProCoreException(e);
486 public Information getInformation() throws ProCoreException {
487 return new Information() {
490 public String getServerId() {
495 public String getProtocolId() {
500 public String getDatabaseId() {
505 public long getFirstChangeSetId() {
513 public Refresh getRefresh(long changeSetId) throws ProCoreException {
515 final ClusterIds ids = getClusterIds();
517 return new Refresh() {
520 public long getHeadChangeSetId() {
521 return clusters.state.headChangeSetId;
525 public long[] getFirst() {
526 return ids.getFirst();
530 public long[] getSecond() {
531 return ids.getSecond();
538 // public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
539 // return clusters.getResourceFile(clusterUID, resourceIndex);
543 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
545 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
546 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
547 throw new ProCoreException(e);
552 public long reserveIds(int count) throws ProCoreException {
553 return clusters.state.reservedIds++;
557 public void updateCluster(byte[] operations) throws ProCoreException {
558 ClusterInfo info = null;
560 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
561 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
563 throw new IllegalAcornStateException("info == null for operation " + operation);
565 info.scheduleUpdate();
566 mainProgram.schedule(operation);
567 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
568 throw new ProCoreException(e);
575 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
577 String[] ss = ccsId.split("\\.");
578 String chunkKey = ss[0];
579 int chunkOffset = Integer.parseInt(ss[1]);
580 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
581 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
582 chunk.acquireMutex();
584 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
585 } catch (DatabaseException e) {
587 } catch (Throwable t) {
588 throw new IllegalStateException(t);
590 chunk.releaseMutex();
594 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
595 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
597 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
599 clusters.clusterLRU.acquireMutex();
602 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
603 for(int i=0;i<proc.entries.size();i++) {
605 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
606 e.process(clusters, cs, clusterKey);
611 clusters.clusterLRU.releaseMutex();
615 private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
617 Exception[] exception = { null };
618 Semaphore s = new Semaphore(0);
620 mainProgram.runIdle(new MainProgramRunnable() {
623 public void success() {
632 public void error(Exception e) {
642 public void run() throws Exception {
650 } catch (InterruptedException e) {
651 throw new IllegalAcornStateException("Unhandled interruption.", e);
654 Exception e = exception[0];
656 if(e instanceof SDBException) throw (SDBException)e;
657 else if(e != null) throw new IllegalAcornStateException(e);
663 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
665 synchronizeWithIdleMainProgram(new MainProgramRunnable() {
668 public void run() throws Exception {
672 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
674 UndoClusterSupport support = new UndoClusterSupport(clusters);
676 final int changeSetId = clusters.state.headChangeSetId;
678 if(ClusterUpdateProcessorBase.DEBUG)
679 LOGGER.info(" === BEGIN UNDO ===");
681 for(int i=0;i<changeSetIds.length;i++) {
682 final long id = changeSetIds[changeSetIds.length-1-i];
683 ArrayList<String> ccss = clusters.getChanges(id);
685 for(int j=0;j<ccss.size();j++) {
686 String ccsid = ccss.get(ccss.size()-j-1);
688 if(ClusterUpdateProcessorBase.DEBUG)
689 LOGGER.info("performUndo " + ccsid);
690 performUndo(ccsid, clusterChanges, support);
691 } catch (DatabaseException e) {
697 if(ClusterUpdateProcessorBase.DEBUG)
698 LOGGER.info(" === END UNDO ===");
700 for(int i=0;i<clusterChanges.size();i++) {
702 final int changeSetIndex = i;
704 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
706 final ClusterUID cuid = pair.first;
707 final byte[] data = pair.second;
709 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
712 public long getChangeSetId() {
717 public int getChangeSetIndex() {
722 public int getNumberOfClusterChangeSets() {
723 return clusterChanges.size();
727 public int getIndexOfClusterChangeSet() {
728 return changeSetIndex;
732 public byte[] getClusterId() {
733 return cuid.asBytes();
737 public boolean getNewCluster() {
742 public byte[] getData() {
748 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
749 throw new ProCoreException(e1);
760 ServiceLocator getServiceLocator() {
765 public boolean refreshEnabled() {
770 public boolean rolledback() {
771 return clusters.rolledback();
774 private void purge() throws IllegalAcornStateException {
775 clusters.purge(locator);
778 public void purgeDatabase() {
780 if (isClosing || unexpectedClose)
783 saver.execute(new Runnable() {
787 Transaction tr = null;
789 // First take a write transaction
790 tr = askWriteTransaction(-1);
791 // Then make sure that MainProgram is idling
792 synchronizeWithIdleMainProgram(() -> purge());
793 } catch (IllegalAcornStateException | ProCoreException e) {
794 LOGGER.error("Purge failed", e);
795 unexpectedClose = true;
796 } catch (SDBException e) {
797 LOGGER.error("Purge failed", e);
798 unexpectedClose = true;
802 endTransaction(tr.getTransactionId());
803 if (unexpectedClose) {
804 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
807 } catch (DatabaseException e1) {
808 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
811 } catch (ProCoreException e) {
812 LOGGER.error("Failed to end purge write transaction", e);
820 public long getTailChangeSetId() {
821 return clusters.getTailChangeSetId();
824 public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
828 Path dbDir = getDbFolder();
829 int newestFolder = clusters.mainState.headDir - 1;
830 int latestFolder = -2;
831 Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
832 if (Files.exists(AcornMetadataFile)) {
833 try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
834 latestFolder = Integer.parseInt( br.readLine() );
838 AcornBackupRunnable r = new AcornBackupRunnable(
839 lock, targetPath, revision, dbDir, latestFolder, newestFolder);
840 new Thread(r, "Acorn backup thread").start();