1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.BufferedReader;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 import org.simantics.acorn.MainProgram.MainProgramRunnable;
30 import org.simantics.acorn.backup.AcornBackupProvider;
31 import org.simantics.acorn.backup.AcornBackupProvider.AcornBackupRunnable;
32 import org.simantics.acorn.exception.AcornAccessVerificationException;
33 import org.simantics.acorn.exception.IllegalAcornStateException;
34 import org.simantics.acorn.internal.ClusterChange;
35 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
36 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
37 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
38 import org.simantics.acorn.lru.ClusterInfo;
39 import org.simantics.acorn.lru.ClusterStreamChunk;
40 import org.simantics.acorn.lru.ClusterUpdateOperation;
41 import org.simantics.backup.BackupException;
42 import org.simantics.db.ClusterCreator;
43 import org.simantics.db.Database;
44 import org.simantics.db.ServiceLocator;
45 import org.simantics.db.exception.DatabaseException;
46 import org.simantics.db.exception.SDBException;
47 import org.simantics.db.server.ProCoreException;
48 import org.simantics.db.service.ClusterSetsSupport;
49 import org.simantics.db.service.ClusterUID;
50 import org.simantics.db.service.EventSupport;
51 import org.simantics.db.service.LifecycleSupport;
52 import org.simantics.utils.datastructures.Pair;
53 import org.simantics.utils.logging.TimeLogger;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import fi.vtt.simantics.procore.internal.EventSupportImpl;
58 import gnu.trove.map.hash.TLongObjectHashMap;
60 public class GraphClientImpl2 implements Database.Session {
62 private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
63 public static final boolean DEBUG = false;
65 public static final String CLOSE = "close";
66 public static final String PURGE = "purge";
68 final ClusterManager clusters;
70 private TransactionManager transactionManager = new TransactionManager();
71 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
72 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
74 private Path dbFolder;
75 private final Database database;
76 private ServiceLocator locator;
77 private FileCache fileCache;
78 private MainProgram mainProgram;
79 private EventSupportImpl eventSupport;
81 private static class ClientThreadFactory implements ThreadFactory {
86 public ClientThreadFactory(String name, boolean daemon) {
92 public Thread newThread(Runnable r) {
93 Thread thread = new Thread(r, name);
94 thread.setDaemon(daemon);
99 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
100 this.database = database;
101 this.dbFolder = dbFolder;
102 this.locator = locator;
103 this.fileCache = new FileCache();
104 // This disposes the cache when the session is shut down
105 locator.registerService(FileCache.class, fileCache);
106 this.clusters = new ClusterManager(dbFolder, fileCache);
108 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
109 cssi.setReadDirectory(clusters.lastSessionDirectory);
110 cssi.updateWriteDirectory(clusters.workingDirectory);
111 mainProgram = new MainProgram(this, clusters);
112 executor.execute(mainProgram);
113 eventSupport = (EventSupportImpl)locator.getService(EventSupport.class);
117 public Path getDbFolder() {
122 * This method schedules snapshotting.
123 * No lock and thread restrictions.
125 void tryMakeSnapshot() throws IOException {
127 if (isClosing || unexpectedClose)
130 saver.execute(new Runnable() {
134 Transaction tr = null;
136 // First take a write transaction
137 tr = askWriteTransaction(-1);
138 // Then make sure that MainProgram is idling
139 synchronizeWithIdleMainProgram(() -> makeSnapshot(false));
140 } catch (IllegalAcornStateException | ProCoreException e) {
141 LOGGER.error("Snapshotting failed", e);
142 unexpectedClose = true;
143 } catch (SDBException e) {
144 LOGGER.error("Snapshotting failed", e);
145 unexpectedClose = true;
149 endTransaction(tr.getTransactionId());
150 if (unexpectedClose) {
151 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
154 } catch (DatabaseException e1) {
155 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
158 } catch (ProCoreException e) {
159 LOGGER.error("Failed to end snapshotting write transaction", e);
166 private void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
167 clusters.makeSnapshot(locator, fullSave);
171 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
173 return clusters.clone(uid, creator);
174 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
175 unexpectedClose = true;
176 throw new DatabaseException(e);
180 private void load() throws IOException {
185 public Database getDatabase() {
189 private boolean closed = false;
190 private boolean isClosing = false;
191 private boolean unexpectedClose = false;
194 public void close() throws ProCoreException {
195 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
196 if(!closed && !isClosing) {
200 if (!unexpectedClose)
201 synchronizeWithIdleMainProgram(() -> makeSnapshot(true));
208 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
209 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
211 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
214 clusters.mainState.save(dbFolder);
215 } catch (IOException e) {
216 LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
223 } catch (IllegalAcornStateException | InterruptedException e) {
224 throw new ProCoreException(e);
225 } catch (SDBException e1) {
226 throw new ProCoreException(e1);
229 eventSupport.fireEvent(CLOSE, null);
235 public void open() throws ProCoreException {
236 throw new UnsupportedOperationException();
240 public boolean isClosed() throws ProCoreException {
245 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
246 clusters.state.headChangeSetId++;
247 long committedChangeSetId = changeSetId + 1;
249 clusters.commitChangeSet(committedChangeSetId, metadata);
250 clusters.state.transactionId = transactionId;
251 mainProgram.committed();
252 TimeLogger.log("Accepted commit");
253 } catch (IllegalAcornStateException e) {
254 throw new ProCoreException(e);
259 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
260 // Accept and finalize current transaction and then undo it
261 acceptCommit(transactionId, changeSetId, metadata);
264 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
265 clusters.state.headChangeSetId++;
266 return clusters.state.headChangeSetId;
267 } catch (SDBException e) {
268 LOGGER.error("Failed to undo cancelled transaction", e);
269 throw new ProCoreException(e);
274 public Transaction askReadTransaction() throws ProCoreException {
275 return transactionManager.askReadTransaction();
278 private enum TransactionState {
282 private class TransactionRequest {
283 public TransactionState state;
284 public Semaphore semaphore;
285 public TransactionRequest(TransactionState state, Semaphore semaphore) {
287 this.semaphore = semaphore;
291 private class TransactionManager {
293 private TransactionState currentTransactionState = TransactionState.IDLE;
295 private int reads = 0;
297 private LinkedList<TransactionRequest> requests = new LinkedList<>();
299 private TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<>();
301 private synchronized Transaction makeTransaction(TransactionRequest req) {
303 final int csId = clusters.state.headChangeSetId;
304 final long trId = clusters.state.transactionId+1;
305 requestMap.put(trId, req);
306 return new Transaction() {
309 public long getTransactionId() {
314 public long getHeadChangeSetId() {
321 * This method cannot be synchronized since it waits and must support multiple entries
322 * by query thread(s) and internal transactions such as snapshot saver
324 private Transaction askReadTransaction() throws ProCoreException {
326 Semaphore semaphore = new Semaphore(0);
328 TransactionRequest req = queue(TransactionState.READ, semaphore);
332 } catch (InterruptedException e) {
333 throw new ProCoreException(e);
336 return makeTransaction(req);
340 private synchronized void dispatch() {
341 TransactionRequest r = requests.removeFirst();
342 if(r.state == TransactionState.READ) reads++;
343 r.semaphore.release();
346 private synchronized void processRequests() {
350 if(requests.isEmpty()) return;
351 TransactionRequest req = requests.peek();
353 if(currentTransactionState == TransactionState.IDLE) {
355 // Accept anything while IDLE
356 currentTransactionState = req.state;
359 } else if (currentTransactionState == TransactionState.READ) {
361 if(req.state == currentTransactionState) {
373 } else if (currentTransactionState == TransactionState.WRITE) {
384 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
385 TransactionRequest req = new TransactionRequest(state, semaphore);
386 requests.addLast(req);
392 * This method cannot be synchronized since it waits and must support multiple entries
393 * by query thread(s) and internal transactions such as snapshot saver
395 private Transaction askWriteTransaction() throws IllegalAcornStateException {
397 Semaphore semaphore = new Semaphore(0);
398 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
402 } catch (InterruptedException e) {
403 throw new IllegalAcornStateException(e);
405 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
406 return makeTransaction(req);
409 private synchronized long endTransaction(long transactionId) throws ProCoreException {
411 TransactionRequest req = requestMap.remove(transactionId);
412 if(req.state == TransactionState.WRITE) {
413 currentTransactionState = TransactionState.IDLE;
418 currentTransactionState = TransactionState.IDLE;
422 return clusters.state.transactionId;
428 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
430 if (isClosing || unexpectedClose || closed) {
431 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
433 return transactionManager.askWriteTransaction();
434 } catch (IllegalAcornStateException e) {
435 throw new ProCoreException(e);
440 public long endTransaction(long transactionId) throws ProCoreException {
441 return transactionManager.endTransaction(transactionId);
445 public String execute(String command) throws ProCoreException {
446 // This is called only by WriteGraphImpl.commitAccessorChanges
447 // We can ignore this in Acorn
452 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
454 return clusters.getMetadata(changeSetId);
455 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
456 throw new ProCoreException(e);
461 public ChangeSetData getChangeSetData(long minChangeSetId,
462 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
463 throws ProCoreException {
465 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
471 public ChangeSetIds getChangeSetIds() throws ProCoreException {
472 throw new UnsupportedOperationException();
476 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
477 throw new UnsupportedOperationException();
481 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
482 throws ProCoreException {
483 throw new UnsupportedOperationException();
487 public ClusterIds getClusterIds() throws ProCoreException {
489 return clusters.getClusterIds();
490 } catch (IllegalAcornStateException e) {
491 throw new ProCoreException(e);
496 public Information getInformation() throws ProCoreException {
497 return new Information() {
500 public String getServerId() {
505 public String getProtocolId() {
510 public String getDatabaseId() {
515 public long getFirstChangeSetId() {
523 public Refresh getRefresh(long changeSetId) throws ProCoreException {
525 final ClusterIds ids = getClusterIds();
527 return new Refresh() {
530 public long getHeadChangeSetId() {
531 return clusters.state.headChangeSetId;
535 public long[] getFirst() {
536 return ids.getFirst();
540 public long[] getSecond() {
541 return ids.getSecond();
548 // public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
549 // return clusters.getResourceFile(clusterUID, resourceIndex);
553 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
555 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
556 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
557 throw new ProCoreException(e);
562 public long reserveIds(int count) throws ProCoreException {
563 return clusters.state.reservedIds++;
567 public void updateCluster(byte[] operations) throws ProCoreException {
568 ClusterInfo info = null;
570 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
571 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
573 throw new IllegalAcornStateException("info == null for operation " + operation);
575 info.scheduleUpdate();
576 mainProgram.schedule(operation);
577 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
578 throw new ProCoreException(e);
585 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
587 String[] ss = ccsId.split("\\.");
588 String chunkKey = ss[0];
589 int chunkOffset = Integer.parseInt(ss[1]);
590 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
591 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
592 chunk.acquireMutex();
594 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
595 } catch (DatabaseException e) {
597 } catch (Throwable t) {
598 throw new IllegalStateException(t);
600 chunk.releaseMutex();
604 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
605 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
607 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
609 clusters.clusterLRU.acquireMutex();
612 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
613 for(int i=0;i<proc.entries.size();i++) {
615 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
616 e.process(clusters, cs, clusterKey);
621 clusters.clusterLRU.releaseMutex();
625 private void synchronizeWithIdleMainProgram(MainProgramRunnable runnable) throws SDBException {
627 Exception[] exception = { null };
628 Semaphore s = new Semaphore(0);
630 mainProgram.runIdle(new MainProgramRunnable() {
633 public void success() {
642 public void error(Exception e) {
652 public void run() throws Exception {
660 } catch (InterruptedException e) {
661 throw new IllegalAcornStateException("Unhandled interruption.", e);
664 Exception e = exception[0];
666 if(e instanceof SDBException) throw (SDBException)e;
667 else if(e != null) throw new IllegalAcornStateException(e);
673 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
675 synchronizeWithIdleMainProgram(new MainProgramRunnable() {
678 public void run() throws Exception {
682 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
684 UndoClusterSupport support = new UndoClusterSupport(clusters);
686 final int changeSetId = clusters.state.headChangeSetId;
688 if(ClusterUpdateProcessorBase.DEBUG)
689 LOGGER.info(" === BEGIN UNDO ===");
691 for(int i=0;i<changeSetIds.length;i++) {
692 final long id = changeSetIds[changeSetIds.length-1-i];
693 ArrayList<String> ccss = clusters.getChanges(id);
695 for(int j=0;j<ccss.size();j++) {
696 String ccsid = ccss.get(ccss.size()-j-1);
698 if(ClusterUpdateProcessorBase.DEBUG)
699 LOGGER.info("performUndo " + ccsid);
700 performUndo(ccsid, clusterChanges, support);
701 } catch (DatabaseException e) {
702 LOGGER.error("failed to perform undo for cluster change set {}", ccsid, e);
707 if(ClusterUpdateProcessorBase.DEBUG)
708 LOGGER.info(" === END UNDO ===");
710 for(int i=0;i<clusterChanges.size();i++) {
712 final int changeSetIndex = i;
714 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
716 final ClusterUID cuid = pair.first;
717 final byte[] data = pair.second;
719 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
722 public long getChangeSetId() {
727 public int getChangeSetIndex() {
732 public int getNumberOfClusterChangeSets() {
733 return clusterChanges.size();
737 public int getIndexOfClusterChangeSet() {
738 return changeSetIndex;
742 public byte[] getClusterId() {
743 return cuid.asBytes();
747 public boolean getNewCluster() {
752 public byte[] getData() {
758 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
759 throw new ProCoreException(e1);
770 ServiceLocator getServiceLocator() {
775 public boolean refreshEnabled() {
780 public boolean rolledback() {
781 return clusters.rolledback();
784 private void purge() throws IllegalAcornStateException {
785 clusters.purge(locator);
788 public void purgeDatabase() {
790 if (isClosing || unexpectedClose)
793 saver.execute(new Runnable() {
797 Transaction tr = null;
799 // First take a write transaction
800 tr = askWriteTransaction(-1);
801 // Then make sure that MainProgram is idling
802 synchronizeWithIdleMainProgram(() -> purge());
803 } catch (IllegalAcornStateException | ProCoreException e) {
804 LOGGER.error("Purge failed", e);
805 unexpectedClose = true;
806 } catch (SDBException e) {
807 LOGGER.error("Purge failed", e);
808 unexpectedClose = true;
812 endTransaction(tr.getTransactionId());
813 eventSupport.fireEvent(PURGE, null);
815 if (unexpectedClose) {
816 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
819 } catch (DatabaseException e1) {
820 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
823 } catch (ProCoreException e) {
824 LOGGER.error("Failed to end purge write transaction", e);
832 public long getTailChangeSetId() {
833 return clusters.getTailChangeSetId();
836 public Future<BackupException> getBackupRunnable(Semaphore lock, Path targetPath, int revision) throws IllegalAcornStateException, IOException {
840 Path dbDir = getDbFolder();
841 int newestFolder = clusters.mainState.headDir - 1;
842 int latestFolder = -2;
843 Path AcornMetadataFile = AcornBackupProvider.getAcornMetadataFile(dbDir);
844 if (Files.exists(AcornMetadataFile)) {
845 try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
846 latestFolder = Integer.parseInt( br.readLine() );
850 AcornBackupRunnable r = new AcornBackupRunnable(
851 lock, targetPath, revision, dbDir, latestFolder, newestFolder);
852 new Thread(r, "Acorn backup thread").start();