1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 import org.simantics.acorn.MainProgram.MainProgramRunnable;
30 import org.simantics.acorn.backup.AcornBackupProvider;
31 import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
32 import org.simantics.acorn.exception.AcornAccessVerificationException;
33 import org.simantics.acorn.exception.IllegalAcornStateException;
34 import org.simantics.acorn.internal.ClusterChange;
35 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
36 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
37 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
38 import org.simantics.acorn.lru.ClusterInfo;
39 import org.simantics.acorn.lru.ClusterStreamChunk;
40 import org.simantics.acorn.lru.ClusterUpdateOperation;
41 import org.simantics.backup.BackupException;
42 import org.simantics.db.ClusterCreator;
43 import org.simantics.db.Database;
44 import org.simantics.db.ServiceLocator;
45 import org.simantics.db.exception.DatabaseException;
46 import org.simantics.db.exception.SDBException;
47 import org.simantics.db.server.ProCoreException;
48 import org.simantics.db.service.ClusterSetsSupport;
49 import org.simantics.db.service.ClusterUID;
50 import org.simantics.db.service.EventSupport;
51 import org.simantics.db.service.LifecycleSupport;
52 import org.simantics.utils.DataContainer;
53 import org.simantics.utils.datastructures.Pair;
54 import org.simantics.utils.logging.TimeLogger;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import fi.vtt.simantics.procore.internal.EventSupportImpl;
59 import gnu.trove.map.hash.TLongObjectHashMap;
61 public class GraphClientImpl2 implements Database.Session {
63 private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
64 public static final boolean DEBUG = false;
66 public static final String CLOSE = "close";
67 public static final String PURGE = "purge";
69 final ClusterManager clusters;
71 private TransactionManager transactionManager = new TransactionManager();
72 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
73 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
75 private Path dbFolder;
76 private final Database database;
77 private ServiceLocator locator;
78 private FileCache fileCache;
79 private MainProgram mainProgram;
80 private EventSupportImpl eventSupport;
82 private static class ClientThreadFactory implements ThreadFactory {
87 public ClientThreadFactory(String name, boolean daemon) {
93 public Thread newThread(Runnable r) {
94 Thread thread = new Thread(r, name);
95 thread.setDaemon(daemon);
100 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
101 this.database = database;
102 this.dbFolder = dbFolder;
103 this.locator = locator;
104 this.fileCache = new FileCache();
105 // This disposes the cache when the session is shut down
106 locator.registerService(FileCache.class, fileCache);
107 this.clusters = new ClusterManager(dbFolder, fileCache);
109 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
110 cssi.setReadDirectory(clusters.lastSessionDirectory);
111 cssi.updateWriteDirectory(clusters.workingDirectory);
112 mainProgram = new MainProgram(this, clusters);
113 executor.execute(mainProgram);
114 eventSupport = (EventSupportImpl)locator.getService(EventSupport.class);
118 public Path getDbFolder() {
123 * This method schedules snapshotting.
124 * No lock and thread restrictions.
126 void tryMakeSnapshot() throws IOException {
128 if (isClosing || unexpectedClose)
131 saver.execute(new Runnable() {
135 Transaction tr = null;
137 // First take a write transaction
138 tr = askWriteTransaction(-1);
139 // Then make sure that MainProgram is idling
140 synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
141 } catch (IllegalAcornStateException | ProCoreException e) {
142 LOGGER.error("Snapshotting failed", e);
143 unexpectedClose = true;
144 } catch (SDBException e) {
145 LOGGER.error("Snapshotting failed", e);
146 unexpectedClose = true;
150 endTransaction(tr.getTransactionId());
151 if (unexpectedClose) {
152 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
155 } catch (DatabaseException e1) {
156 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
159 } catch (ProCoreException e) {
160 LOGGER.error("Failed to end snapshotting write transaction", e);
167 private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
168 clusters.makeSnapshot(locator, fullSave);
172 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
174 return clusters.clone(uid, creator);
175 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
176 unexpectedClose = true;
177 throw new DatabaseException(e);
181 private void load() throws IOException {
186 public Database getDatabase() {
190 private boolean closed = false;
191 private boolean isClosing = false;
192 private boolean unexpectedClose = false;
195 public void close() throws ProCoreException {
196 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
197 if(!closed && !isClosing) {
201 if (!unexpectedClose)
202 synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
209 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
210 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
212 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
215 clusters.mainState.save(dbFolder);
216 } catch (IOException e) {
217 LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
224 } catch (IllegalAcornStateException | InterruptedException e) {
225 throw new ProCoreException(e);
226 } catch (SDBException e1) {
227 throw new ProCoreException(e1);
230 eventSupport.fireEvent(CLOSE, null);
236 public void open() throws ProCoreException {
237 throw new UnsupportedOperationException();
241 public boolean isClosed() throws ProCoreException {
246 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
247 clusters.state.headChangeSetId++;
248 long committedChangeSetId = changeSetId + 1;
250 clusters.commitChangeSet(committedChangeSetId, metadata);
251 clusters.state.transactionId = transactionId;
252 mainProgram.committed();
253 TimeLogger.log("Accepted commit");
254 } catch (IllegalAcornStateException e) {
255 throw new ProCoreException(e);
260 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
261 // Accept and finalize current transaction and then undo it
262 acceptCommit(transactionId, changeSetId, metadata);
265 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
266 clusters.state.headChangeSetId++;
267 return clusters.state.headChangeSetId;
268 } catch (SDBException e) {
269 LOGGER.error("Failed to undo cancelled transaction", e);
270 throw new ProCoreException(e);
275 public Transaction askReadTransaction() throws ProCoreException {
276 return transactionManager.askReadTransaction();
279 private enum TransactionState {
283 private class TransactionRequest {
284 public TransactionState state;
285 public Semaphore semaphore;
286 public TransactionRequest(TransactionState state, Semaphore semaphore) {
288 this.semaphore = semaphore;
292 private class TransactionManager {
294 private TransactionState currentTransactionState = TransactionState.IDLE;
296 private int reads = 0;
298 private LinkedList<TransactionRequest> requests = new LinkedList<>();
300 private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
302 private synchronized Transaction makeTransaction(TransactionRequest req) {
304 final int csId = clusters.state.headChangeSetId;
305 final long trId = clusters.state.transactionId+1;
306 requestMap.put(trId, req);
307 return new Transaction() {
310 public long getTransactionId() {
315 public long getHeadChangeSetId() {
322 * This method cannot be synchronized since it waits and must support multiple entries
323 * by query thread(s) and internal transactions such as snapshot saver
325 private Transaction askReadTransaction() throws ProCoreException {
327 Semaphore semaphore = new Semaphore(0);
329 TransactionRequest req = queue(TransactionState.READ, semaphore);
333 } catch (InterruptedException e) {
334 throw new ProCoreException(e);
337 return makeTransaction(req);
341 private synchronized void dispatch() {
342 TransactionRequest r = requests.removeFirst();
343 if(r.state == TransactionState.READ) reads++;
344 r.semaphore.release();
347 private synchronized void processRequests() {
351 if(requests.isEmpty()) return;
352 TransactionRequest req = requests.peek();
354 if(currentTransactionState == TransactionState.IDLE) {
356 // Accept anything while IDLE
357 currentTransactionState = req.state;
360 } else if (currentTransactionState == TransactionState.READ) {
362 if(req.state == currentTransactionState) {
374 } else if (currentTransactionState == TransactionState.WRITE) {
385 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
386 TransactionRequest req = new TransactionRequest(state, semaphore);
387 requests.addLast(req);
393 * This method cannot be synchronized since it waits and must support multiple entries
394 * by query thread(s) and internal transactions such as snapshot saver
396 private Transaction askWriteTransaction() throws IllegalAcornStateException {
398 Semaphore semaphore = new Semaphore(0);
399 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
403 } catch (InterruptedException e) {
404 throw new IllegalAcornStateException(e);
406 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
407 return makeTransaction(req);
410 private synchronized long endTransaction(long transactionId) throws ProCoreException {
412 TransactionRequest req = requestMap.remove(transactionId);
413 if(req.state == TransactionState.WRITE) {
414 currentTransactionState = TransactionState.IDLE;
419 currentTransactionState = TransactionState.IDLE;
423 return clusters.state.transactionId;
429 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
431 if (isClosing || unexpectedClose || closed) {
432 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
434 return transactionManager.askWriteTransaction();
435 } catch (IllegalAcornStateException e) {
436 throw new ProCoreException(e);
441 public long endTransaction(long transactionId) throws ProCoreException {
442 return transactionManager.endTransaction(transactionId);
446 public String execute(String command) throws ProCoreException {
447 // This is called only by WriteGraphImpl.commitAccessorChanges
448 // We can ignore this in Acorn
453 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
455 return clusters.getMetadata(changeSetId);
456 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
457 throw new ProCoreException(e);
462 public ChangeSetData getChangeSetData(long minChangeSetId,
463 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
464 throws ProCoreException {
466 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
472 public ChangeSetIds getChangeSetIds() throws ProCoreException {
473 throw new UnsupportedOperationException();
477 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
478 throw new UnsupportedOperationException();
482 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
483 throws ProCoreException {
484 throw new UnsupportedOperationException();
488 public ClusterIds getClusterIds() throws ProCoreException {
490 return clusters.getClusterIds();
491 } catch (IllegalAcornStateException e) {
492 throw new ProCoreException(e);
497 public Information getInformation() throws ProCoreException {
498 return new Information() {
501 public String getServerId() {
506 public String getProtocolId() {
511 public String getDatabaseId() {
516 public long getFirstChangeSetId() {
524 public Refresh getRefresh(long changeSetId) throws ProCoreException {
526 final ClusterIds ids = getClusterIds();
528 return new Refresh() {
531 public long getHeadChangeSetId() {
532 return clusters.state.headChangeSetId;
536 public long[] getFirst() {
537 return ids.getFirst();
541 public long[] getSecond() {
542 return ids.getSecond();
549 // public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
550 // return clusters.getResourceFile(clusterUID, resourceIndex);
554 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
556 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
557 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
558 throw new ProCoreException(e);
563 public long reserveIds(int count) throws ProCoreException {
564 return clusters.state.reservedIds++;
568 public void updateCluster(byte[] operations) throws ProCoreException {
569 ClusterInfo info = null;
571 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
572 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
574 throw new IllegalAcornStateException("info == null for operation " + operation);
576 info.scheduleUpdate();
577 mainProgram.schedule(operation);
578 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
579 throw new ProCoreException(e);
586 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
588 String[] ss = ccsId.split("\\.");
589 String chunkKey = ss[0];
590 int chunkOffset = Integer.parseInt(ss[1]);
591 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
592 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
593 chunk.acquireMutex();
595 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
596 } catch (DatabaseException e) {
598 } catch (Throwable t) {
599 throw new IllegalStateException(t);
601 chunk.releaseMutex();
605 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
606 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
608 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
610 clusters.clusterLRU.acquireMutex();
613 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
614 for(int i=0;i<proc.entries.size();i++) {
616 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
617 e.process(clusters, cs, clusterKey);
622 clusters.clusterLRU.releaseMutex();
626 private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
628 Exception[] exception = { null };
629 Semaphore s = new Semaphore(0);
631 mainProgram.runIdle(new MainProgramRunnable() {
634 public void success() {
643 public void error(Exception e) {
653 public void run() throws Exception {
661 } catch (InterruptedException e) {
662 throw new IllegalAcornStateException("Unhandled interruption.", e);
665 Exception e = exception[0];
667 if(e instanceof SDBException) throw (SDBException)e;
668 else if(e != null) throw new IllegalAcornStateException(e);
674 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
676 synchronizeWithIdleMainProgram(new MainProgramRunnable() {
679 public void run() throws Exception {
683 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
685 UndoClusterSupport support = new UndoClusterSupport(clusters);
687 final int changeSetId = clusters.state.headChangeSetId;
689 if(ClusterUpdateProcessorBase.DEBUG)
690 LOGGER.info(" === BEGIN UNDO ===");
692 for(int i=0;i<changeSetIds.length;i++) {
693 final long id = changeSetIds[changeSetIds.length-1-i];
694 ArrayList<String> ccss = clusters.getChanges(id);
696 for(int j=0;j<ccss.size();j++) {
697 String ccsid = ccss.get(ccss.size()-j-1);
699 if(ClusterUpdateProcessorBase.DEBUG)
700 LOGGER.info("performUndo " + ccsid);
701 performUndo(ccsid, clusterChanges, support);
702 } catch (DatabaseException e) {
708 if(ClusterUpdateProcessorBase.DEBUG)
709 LOGGER.info(" === END UNDO ===");
711 for(int i=0;i<clusterChanges.size();i++) {
713 final int changeSetIndex = i;
715 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
717 final ClusterUID cuid = pair.first;
718 final byte[] data = pair.second;
720 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
723 public long getChangeSetId() {
728 public int getChangeSetIndex() {
733 public int getNumberOfClusterChangeSets() {
734 return clusterChanges.size();
738 public int getIndexOfClusterChangeSet() {
739 return changeSetIndex;
743 public byte[] getClusterId() {
744 return cuid.asBytes();
748 public boolean getNewCluster() {
753 public byte[] getData() {
759 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
760 throw new ProCoreException(e1);
771 ServiceLocator getServiceLocator() {
776 public boolean refreshEnabled() {
781 public boolean rolledback() {
782 return clusters.rolledback();
785 private void purge() throws IllegalAcornStateException {
786 clusters.purge(locator);
789 public void purgeDatabase() {
791 if (isClosing || unexpectedClose)
794 saver.execute(new Runnable() {
798 Transaction tr = null;
800 // First take a write transaction
801 tr = askWriteTransaction(-1);
802 // Then make sure that MainProgram is idling
803 synchronizeWithIdleMainProgram(() -> purge());
804 } catch (IllegalAcornStateException | ProCoreException e) {
805 LOGGER.error("Purge failed", e);
806 unexpectedClose = true;
807 } catch (SDBException e) {
808 LOGGER.error("Purge failed", e);
809 unexpectedClose = true;
813 endTransaction(tr.getTransactionId());
814 eventSupport.fireEvent(PURGE, null);
816 if (unexpectedClose) {
817 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
820 } catch (DatabaseException e1) {
821 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
824 } catch (ProCoreException e) {
825 LOGGER.error("Failed to end purge write transaction", e);
833 public long getTailChangeSetId() {
834 return clusters.getTailChangeSetId();
837 public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
841 Path dbDir = getDbFolder();
842 int newestFolder = clusters.mainState.headDir - 1;
843 int latestFolder = -2;
844 Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
845 if (Files.exists(AcornMetadataFile)) {
846 try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
847 latestFolder = Integer.parseInt( br.readLine() );
851 AcornBackupRunnable r = new AcornBackupRunnable(
852 lock, targetPath, revision, dbDir, latestFolder, newestFolder);
853 new Thread(r, "Acorn backup thread").start();