1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
27 import org.simantics.acorn.MainProgram.MainProgramRunnable;
28 import org.simantics.acorn.backup.AcornBackupProvider;
29 import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
30 import org.simantics.acorn.exception.AcornAccessVerificationException;
31 import org.simantics.acorn.exception.IllegalAcornStateException;
32 import org.simantics.acorn.internal.ClusterChange;
33 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
34 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
35 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
36 import org.simantics.acorn.lru.ClusterInfo;
37 import org.simantics.acorn.lru.ClusterStreamChunk;
38 import org.simantics.acorn.lru.ClusterUpdateOperation;
39 import org.simantics.backup.BackupException;
40 import org.simantics.db.ClusterCreator;
41 import org.simantics.db.Database;
42 import org.simantics.db.ServiceLocator;
43 import org.simantics.db.exception.DatabaseException;
44 import org.simantics.db.exception.SDBException;
45 import org.simantics.db.server.ProCoreException;
46 import org.simantics.db.service.ClusterSetsSupport;
47 import org.simantics.db.service.ClusterUID;
48 import org.simantics.db.service.EventSupport;
49 import org.simantics.db.service.LifecycleSupport;
50 import org.simantics.utils.datastructures.Pair;
51 import org.simantics.utils.logging.TimeLogger;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import fi.vtt.simantics.procore.internal.EventSupportImpl;
56 import gnu.trove.map.hash.TLongObjectHashMap;
58 public class GraphClientImpl2 implements Database.Session {
60 private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
61 public static final boolean DEBUG = false;
63 public static final String CLOSE = "close";
64 public static final String PURGE = "purge";
66 final ClusterManager clusters;
68 private TransactionManager transactionManager = new TransactionManager();
69 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
70 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
72 private Path dbFolder;
73 private final Database database;
74 private ServiceLocator locator;
75 private FileCache fileCache;
76 private MainProgram mainProgram;
77 private EventSupportImpl eventSupport;
79 private static class ClientThreadFactory implements ThreadFactory {
84 public ClientThreadFactory(String name, boolean daemon) {
90 public Thread newThread(Runnable r) {
91 Thread thread = new Thread(r, name);
92 thread.setDaemon(daemon);
97 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
98 this.database = database;
99 this.dbFolder = dbFolder;
100 this.locator = locator;
101 this.fileCache = new FileCache();
102 // This disposes the cache when the session is shut down
103 locator.registerService(FileCache.class, fileCache);
104 this.clusters = new ClusterManager(dbFolder, fileCache);
106 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
107 cssi.setReadDirectory(clusters.lastSessionDirectory);
108 cssi.updateWriteDirectory(clusters.workingDirectory);
109 mainProgram = new MainProgram(this, clusters);
110 executor.execute(mainProgram);
111 eventSupport = (EventSupportImpl)locator.getService(EventSupport.class);
115 public Path getDbFolder() {
120 * This method schedules snapshotting.
121 * No lock and thread restrictions.
123 void tryMakeSnapshot() throws IOException {
125 if (isClosing || unexpectedClose)
128 saver.execute(new Runnable() {
132 Transaction tr = null;
134 // First take a write transaction
135 tr = askWriteTransaction(-1);
136 // Then make sure that MainProgram is idling
137 synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
138 } catch (IllegalAcornStateException | ProCoreException e) {
139 LOGGER.error("Snapshotting failed", e);
140 unexpectedClose = true;
141 } catch (SDBException e) {
142 LOGGER.error("Snapshotting failed", e);
143 unexpectedClose = true;
147 endTransaction(tr.getTransactionId());
148 if (unexpectedClose) {
149 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
152 } catch (DatabaseException e1) {
153 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
156 } catch (ProCoreException e) {
157 LOGGER.error("Failed to end snapshotting write transaction", e);
164 private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
165 clusters.makeSnapshot(locator, fullSave);
169 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
171 return clusters.clone(uid, creator);
172 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
173 unexpectedClose = true;
174 throw new DatabaseException(e);
178 private void load() throws IOException {
183 public Database getDatabase() {
187 private boolean closed = false;
188 private boolean isClosing = false;
189 private boolean unexpectedClose = false;
192 public void close() throws ProCoreException {
193 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
194 if(!closed && !isClosing) {
198 if (!unexpectedClose)
199 synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
206 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
207 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
209 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
212 clusters.mainState.save(dbFolder);
213 } catch (IOException e) {
214 LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
221 } catch (IllegalAcornStateException | InterruptedException e) {
222 throw new ProCoreException(e);
223 } catch (SDBException e1) {
224 throw new ProCoreException(e1);
227 eventSupport.fireEvent(CLOSE, null);
233 public void open() throws ProCoreException {
234 throw new UnsupportedOperationException();
238 public boolean isClosed() throws ProCoreException {
243 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
244 clusters.state.headChangeSetId++;
245 long committedChangeSetId = changeSetId + 1;
247 clusters.commitChangeSet(committedChangeSetId, metadata);
248 clusters.state.transactionId = transactionId;
249 mainProgram.committed();
250 TimeLogger.log("Accepted commit");
251 } catch (IllegalAcornStateException e) {
252 throw new ProCoreException(e);
257 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
258 // Accept and finalize current transaction and then undo it
259 acceptCommit(transactionId, changeSetId, metadata);
262 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
263 clusters.state.headChangeSetId++;
264 return clusters.state.headChangeSetId;
265 } catch (SDBException e) {
266 LOGGER.error("Failed to undo cancelled transaction", e);
267 throw new ProCoreException(e);
272 public Transaction askReadTransaction() throws ProCoreException {
273 return transactionManager.askReadTransaction();
276 private enum TransactionState {
280 private class TransactionRequest {
281 public TransactionState state;
282 public Semaphore semaphore;
283 public TransactionRequest(TransactionState state, Semaphore semaphore) {
285 this.semaphore = semaphore;
289 public String toString() {
290 return getClass().getSimpleName() + "[state=" + state + ", semaphore=" + semaphore + "]";
294 private class TransactionManager {
296 private TransactionState currentTransactionState = TransactionState.IDLE;
298 private int reads = 0;
300 private LinkedList<TransactionRequest> requests = new LinkedList<>();
302 private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
304 private synchronized Transaction makeTransaction(TransactionRequest req) {
306 final int csId = clusters.state.headChangeSetId;
307 final long trId = clusters.state.transactionId+1;
308 requestMap.put(trId, req);
309 return new Transaction() {
312 public long getTransactionId() {
317 public long getHeadChangeSetId() {
324 * This method cannot be synchronized since it waits and must support multiple entries
325 * by query thread(s) and internal transactions such as snapshot saver
327 private Transaction askReadTransaction() throws ProCoreException {
329 Semaphore semaphore = new Semaphore(0);
331 TransactionRequest req = queue(TransactionState.READ, semaphore);
335 } catch (InterruptedException e) {
336 throw new ProCoreException(e);
339 return makeTransaction(req);
343 private synchronized void dispatch() {
344 TransactionRequest r = requests.removeFirst();
345 if(r.state == TransactionState.READ) reads++;
346 r.semaphore.release();
349 private synchronized void processRequests() {
353 if(requests.isEmpty()) return;
354 TransactionRequest req = requests.peek();
356 if(currentTransactionState == TransactionState.IDLE) {
358 // Accept anything while IDLE
359 currentTransactionState = req.state;
362 } else if (currentTransactionState == TransactionState.READ) {
364 if(req.state == currentTransactionState) {
376 } else if (currentTransactionState == TransactionState.WRITE) {
387 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
388 TransactionRequest req = new TransactionRequest(state, semaphore);
389 requests.addLast(req);
394 private long askWriteTransactionCount = 0;
396 * This method cannot be synchronized since it waits and must support multiple entries
397 * by query thread(s) and internal transactions such as snapshot saver
399 private Transaction askWriteTransaction() throws IllegalAcornStateException {
401 Semaphore semaphore = new Semaphore(0);
402 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
406 boolean acquired = semaphore.tryAcquire(1, TimeUnit.MINUTES);
408 if (askWriteTransactionCount < 10) {
409 LOGGER.error("Could not acquire semaphore for askWriteTransaction for TransactionRequest {}", req);
410 LOGGER.error("Current clusters.state.headChangeSetId is {}", clusters.state.headChangeSetId);
411 LOGGER.error("Current clusters.state.transactionId is {}", clusters.state.transactionId);
412 LOGGER.error("Current amount of requests is {}", requests.size());
413 if (requests.size() < 100) {
414 LOGGER.error("Current requests {}", requests);
416 LOGGER.error("Current transaction state is {}", currentTransactionState);
417 askWriteTransactionCount++;
420 askWriteTransactionCount = 0;
424 } catch (InterruptedException e) {
425 throw new IllegalAcornStateException(e);
427 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
428 return makeTransaction(req);
431 private synchronized long endTransaction(long transactionId) throws ProCoreException {
433 TransactionRequest req = requestMap.remove(transactionId);
434 if(req.state == TransactionState.WRITE) {
435 currentTransactionState = TransactionState.IDLE;
440 currentTransactionState = TransactionState.IDLE;
444 return clusters.state.transactionId;
450 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
452 if (isClosing || unexpectedClose || closed) {
453 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
455 return transactionManager.askWriteTransaction();
456 } catch (IllegalAcornStateException e) {
457 throw new ProCoreException(e);
462 public long endTransaction(long transactionId) throws ProCoreException {
463 return transactionManager.endTransaction(transactionId);
467 public String execute(String command) throws ProCoreException {
468 // This is called only by WriteGraphImpl.commitAccessorChanges
469 // We can ignore this in Acorn
474 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
476 return clusters.getMetadata(changeSetId);
477 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
478 throw new ProCoreException(e);
483 public ChangeSetData getChangeSetData(long minChangeSetId,
484 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
485 throws ProCoreException {
487 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
493 public ChangeSetIds getChangeSetIds() throws ProCoreException {
494 throw new UnsupportedOperationException();
498 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
499 throw new UnsupportedOperationException();
503 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
504 throws ProCoreException {
505 throw new UnsupportedOperationException();
509 public ClusterIds getClusterIds() throws ProCoreException {
511 return clusters.getClusterIds();
512 } catch (IllegalAcornStateException e) {
513 throw new ProCoreException(e);
518 public Information getInformation() throws ProCoreException {
519 return new Information() {
522 public String getServerId() {
527 public String getProtocolId() {
532 public String getDatabaseId() {
537 public long getFirstChangeSetId() {
545 public Refresh getRefresh(long changeSetId) throws ProCoreException {
547 final ClusterIds ids = getClusterIds();
549 return new Refresh() {
552 public long getHeadChangeSetId() {
553 return clusters.state.headChangeSetId;
557 public long[] getFirst() {
558 return ids.getFirst();
562 public long[] getSecond() {
563 return ids.getSecond();
570 // public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
571 // return clusters.getResourceFile(clusterUID, resourceIndex);
575 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
577 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
578 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
579 throw new ProCoreException(e);
584 public long reserveIds(int count) throws ProCoreException {
585 return clusters.state.reservedIds++;
589 public void updateCluster(byte[] operations) throws ProCoreException {
590 ClusterInfo info = null;
592 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
593 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
595 throw new IllegalAcornStateException("info == null for operation " + operation);
597 info.scheduleUpdate();
598 mainProgram.schedule(operation);
599 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
600 throw new ProCoreException(e);
607 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
609 String[] ss = ccsId.split("\\.");
610 String chunkKey = ss[0];
611 int chunkOffset = Integer.parseInt(ss[1]);
612 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
613 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
614 chunk.acquireMutex();
616 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
617 } catch (DatabaseException e) {
619 } catch (Throwable t) {
620 throw new IllegalStateException(t);
622 chunk.releaseMutex();
626 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
627 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
629 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
631 clusters.clusterLRU.acquireMutex();
634 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
635 for(int i=0;i<proc.entries.size();i++) {
637 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
638 e.process(clusters, cs, clusterKey);
643 clusters.clusterLRU.releaseMutex();
647 private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
649 Exception[] exception = { null };
650 Semaphore s = new Semaphore(0);
652 mainProgram.runIdle(new MainProgramRunnable() {
655 public void success() {
664 public void error(Exception e) {
674 public void run() throws Exception {
682 } catch (InterruptedException e) {
683 throw new IllegalAcornStateException("Unhandled interruption.", e);
686 Exception e = exception[0];
688 if(e instanceof SDBException) throw (SDBException)e;
689 else if(e != null) throw new IllegalAcornStateException(e);
695 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
697 synchronizeWithIdleMainProgram(new MainProgramRunnable() {
700 public void run() throws Exception {
704 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
706 UndoClusterSupport support = new UndoClusterSupport(clusters);
708 final int changeSetId = clusters.state.headChangeSetId;
710 if(ClusterUpdateProcessorBase.DEBUG)
711 LOGGER.info(" === BEGIN UNDO ===");
713 for(int i=0;i<changeSetIds.length;i++) {
714 final long id = changeSetIds[changeSetIds.length-1-i];
715 ArrayList<String> ccss = clusters.getChanges(id);
717 for(int j=0;j<ccss.size();j++) {
718 String ccsid = ccss.get(ccss.size()-j-1);
720 if(ClusterUpdateProcessorBase.DEBUG)
721 LOGGER.info("performUndo " + ccsid);
722 performUndo(ccsid, clusterChanges, support);
723 } catch (DatabaseException e) {
724 LOGGER.error("failed to perform undo for cluster change set {}", ccsid, e);
729 if(ClusterUpdateProcessorBase.DEBUG)
730 LOGGER.info(" === END UNDO ===");
732 for(int i=0;i<clusterChanges.size();i++) {
734 final int changeSetIndex = i;
736 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
738 final ClusterUID cuid = pair.first;
739 final byte[] data = pair.second;
741 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
744 public long getChangeSetId() {
749 public int getChangeSetIndex() {
754 public int getNumberOfClusterChangeSets() {
755 return clusterChanges.size();
759 public int getIndexOfClusterChangeSet() {
760 return changeSetIndex;
764 public byte[] getClusterId() {
765 return cuid.asBytes();
769 public boolean getNewCluster() {
774 public byte[] getData() {
780 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
781 throw new ProCoreException(e1);
792 ServiceLocator getServiceLocator() {
797 public boolean refreshEnabled() {
802 public boolean rolledback() {
803 return clusters.rolledback();
806 private void purge() throws IllegalAcornStateException {
807 clusters.purge(locator);
810 public void purgeDatabase() {
812 if (isClosing || unexpectedClose)
815 saver.execute(new Runnable() {
819 Transaction tr = null;
821 // First take a write transaction
822 tr = askWriteTransaction(-1);
823 // Then make sure that MainProgram is idling
824 synchronizeWithIdleMainProgram(() -> purge());
825 } catch (IllegalAcornStateException | ProCoreException e) {
826 LOGGER.error("Purge failed", e);
827 unexpectedClose = true;
828 } catch (SDBException e) {
829 LOGGER.error("Purge failed", e);
830 unexpectedClose = true;
834 endTransaction(tr.getTransactionId());
835 eventSupport.fireEvent(PURGE, null);
837 if (unexpectedClose) {
838 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
841 } catch (DatabaseException e1) {
842 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
845 } catch (ProCoreException e) {
846 LOGGER.error("Failed to end purge write transaction", e);
854 public long getTailChangeSetId() {
855 return clusters.getTailChangeSetId();
858 public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
862 Path dbDir = getDbFolder();
863 int newestFolder = clusters.mainState.headDir - 1;
864 int latestFolder = -2;
865 Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
866 if (Files.exists(AcornMetadataFile)) {
867 try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
868 latestFolder = Integer.parseInt( br.readLine() );
872 AcornBackupRunnable r = new AcornBackupRunnable(
873 lock, targetPath, revision, dbDir, latestFolder, newestFolder);
874 new Thread(r, "Acorn backup thread").start();