1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.acorn.lru.ClusterInfo;
32 import org.simantics.acorn.lru.ClusterStreamChunk;
33 import org.simantics.acorn.lru.ClusterUpdateOperation;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import gnu.trove.map.hash.TLongObjectHashMap;
50 public class GraphClientImpl2 implements Database.Session {
52 private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
53 public static final boolean DEBUG = false;
55 public final ClusterManager clusters;
57 private TransactionManager transactionManager = new TransactionManager();
58 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
59 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
61 private Path dbFolder;
62 private final Database database;
63 private ServiceLocator locator;
64 private FileCache fileCache;
65 private MainProgram mainProgram;
67 static class ClientThreadFactory implements ThreadFactory {
72 public ClientThreadFactory(String name, boolean daemon) {
78 public Thread newThread(Runnable r) {
79 Thread thread = new Thread(r, name);
80 thread.setDaemon(daemon);
85 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
86 this.database = database;
87 this.dbFolder = dbFolder;
88 this.locator = locator;
89 this.fileCache = new FileCache();
90 // This disposes the cache when the session is shut down
91 locator.registerService(FileCache.class, fileCache);
92 this.clusters = new ClusterManager(dbFolder, fileCache);
94 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
95 cssi.setReadDirectory(clusters.lastSessionDirectory);
96 cssi.updateWriteDirectory(clusters.workingDirectory);
97 mainProgram = new MainProgram(this, clusters);
98 executor.execute(mainProgram);
101 public Path getDbFolder() {
105 public void tryMakeSnapshot() throws IOException {
107 if (isClosing || unexpectedClose)
110 saver.execute(new Runnable() {
114 Transaction tr = null;
116 // First take a write transaction
117 tr = askWriteTransaction(-1);
118 // Then make sure that MainProgram is idling
119 mainProgram.mutex.acquire();
121 synchronized(mainProgram) {
122 if(mainProgram.operations.isEmpty()) {
125 // MainProgram is becoming busy again - delay snapshotting
130 mainProgram.mutex.release();
132 } catch (IllegalAcornStateException | ProCoreException e) {
133 LOGGER.error("Snapshotting failed", e);
134 unexpectedClose = true;
135 } catch (InterruptedException e) {
136 LOGGER.error("Snapshotting interrupted", e);
140 endTransaction(tr.getTransactionId());
141 if (unexpectedClose) {
142 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
145 } catch (DatabaseException e1) {
146 LOGGER.error("Failed to close database as a safety measure due to failed snapshotting", e1);
149 } catch (ProCoreException e) {
150 LOGGER.error("Failed to end snapshotting write transaction", e);
157 public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
158 clusters.makeSnapshot(locator, fullSave);
161 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
163 return clusters.clone(uid, creator);
164 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
165 unexpectedClose = true;
166 throw new DatabaseException(e);
170 // private void save() throws IOException {
174 public void load() throws IOException {
178 // public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
179 // clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
183 public Database getDatabase() {
187 private boolean closed = false;
188 private boolean isClosing = false;
189 private boolean unexpectedClose = false;
192 public void close() throws ProCoreException {
193 LOGGER.info("Closing " + this + " and mainProgram " + mainProgram);
194 if(!closed && !isClosing) {
197 if (!unexpectedClose)
204 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
205 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
207 LOGGER.info("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
210 clusters.mainState.save(dbFolder);
211 } catch (IOException e) {
212 LOGGER.error("Failed to save " + MainState.MAIN_STATE + " file in database folder " + dbFolder);
219 } catch (IllegalAcornStateException | InterruptedException e) {
220 throw new ProCoreException(e);
228 public void open() throws ProCoreException {
229 throw new UnsupportedOperationException();
233 public boolean isClosed() throws ProCoreException {
238 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
239 clusters.state.headChangeSetId++;
240 long committedChangeSetId = changeSetId + 1;
242 clusters.commitChangeSet(committedChangeSetId, metadata);
244 clusters.state.transactionId = transactionId;
246 mainProgram.committed();
248 TimeLogger.log("Accepted commit");
249 } catch (IllegalAcornStateException e) {
250 throw new ProCoreException(e);
255 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
256 // Accept and finalize current transaction and then undo it
257 acceptCommit(transactionId, changeSetId, metadata);
260 undo(new long[] {changeSetId+1}, onChangeSetUpdate);
261 clusters.state.headChangeSetId++;
262 return clusters.state.headChangeSetId;
263 } catch (SDBException e) {
264 LOGGER.error("Failed to undo cancelled transaction", e);
265 throw new ProCoreException(e);
270 public Transaction askReadTransaction() throws ProCoreException {
271 return transactionManager.askReadTransaction();
274 enum TransactionState {
278 class TransactionRequest {
279 public TransactionState state;
280 public Semaphore semaphore;
281 public TransactionRequest(TransactionState state, Semaphore semaphore) {
283 this.semaphore = semaphore;
287 class TransactionManager {
289 private TransactionState currentTransactionState = TransactionState.IDLE;
291 private int reads = 0;
293 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
295 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
297 private synchronized Transaction makeTransaction(TransactionRequest req) {
299 final int csId = clusters.state.headChangeSetId;
300 final long trId = clusters.state.transactionId+1;
301 requestMap.put(trId, req);
302 return new Transaction() {
305 public long getTransactionId() {
310 public long getHeadChangeSetId() {
317 * This method cannot be synchronized since it waits and must support multiple entries
318 * by query thread(s) and internal transactions such as snapshot saver
320 public Transaction askReadTransaction() throws ProCoreException {
322 Semaphore semaphore = new Semaphore(0);
324 TransactionRequest req = queue(TransactionState.READ, semaphore);
328 } catch (InterruptedException e) {
329 throw new ProCoreException(e);
332 return makeTransaction(req);
336 private synchronized void dispatch() {
337 TransactionRequest r = requests.removeFirst();
338 if(r.state == TransactionState.READ) reads++;
339 r.semaphore.release();
342 private synchronized void processRequests() {
346 if(requests.isEmpty()) return;
347 TransactionRequest req = requests.peek();
349 if(currentTransactionState == TransactionState.IDLE) {
351 // Accept anything while IDLE
352 currentTransactionState = req.state;
355 } else if (currentTransactionState == TransactionState.READ) {
357 if(req.state == currentTransactionState) {
369 } else if (currentTransactionState == TransactionState.WRITE) {
380 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
381 TransactionRequest req = new TransactionRequest(state, semaphore);
382 requests.addLast(req);
388 * This method cannot be synchronized since it waits and must support multiple entries
389 * by query thread(s) and internal transactions such as snapshot saver
391 public Transaction askWriteTransaction() throws IllegalAcornStateException {
393 Semaphore semaphore = new Semaphore(0);
394 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
398 } catch (InterruptedException e) {
399 throw new IllegalAcornStateException(e);
401 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
402 return makeTransaction(req);
405 public synchronized long endTransaction(long transactionId) throws ProCoreException {
407 TransactionRequest req = requestMap.remove(transactionId);
408 if(req.state == TransactionState.WRITE) {
409 currentTransactionState = TransactionState.IDLE;
414 currentTransactionState = TransactionState.IDLE;
418 return clusters.state.transactionId;
424 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
426 if (isClosing || unexpectedClose || closed) {
427 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
429 return transactionManager.askWriteTransaction();
430 } catch (IllegalAcornStateException e) {
431 throw new ProCoreException(e);
436 public long endTransaction(long transactionId) throws ProCoreException {
437 return transactionManager.endTransaction(transactionId);
441 public String execute(String command) throws ProCoreException {
442 // This is called only by WriteGraphImpl.commitAccessorChanges
443 // We can ignore this in Acorn
448 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
450 return clusters.getMetadata(changeSetId);
451 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
452 throw new ProCoreException(e);
457 public ChangeSetData getChangeSetData(long minChangeSetId,
458 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
459 throws ProCoreException {
461 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
467 public ChangeSetIds getChangeSetIds() throws ProCoreException {
468 throw new UnsupportedOperationException();
472 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
473 throw new UnsupportedOperationException();
477 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
478 throws ProCoreException {
479 throw new UnsupportedOperationException();
483 public ClusterIds getClusterIds() throws ProCoreException {
485 return clusters.getClusterIds();
486 } catch (IllegalAcornStateException e) {
487 throw new ProCoreException(e);
492 public Information getInformation() throws ProCoreException {
493 return new Information() {
496 public String getServerId() {
501 public String getProtocolId() {
506 public String getDatabaseId() {
511 public long getFirstChangeSetId() {
519 public Refresh getRefresh(long changeSetId) throws ProCoreException {
521 final ClusterIds ids = getClusterIds();
523 return new Refresh() {
526 public long getHeadChangeSetId() {
527 return clusters.state.headChangeSetId;
531 public long[] getFirst() {
532 return ids.getFirst();
536 public long[] getSecond() {
537 return ids.getSecond();
544 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
545 return clusters.getResourceFile(clusterUID, resourceIndex);
549 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
551 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
552 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
553 throw new ProCoreException(e);
558 public long reserveIds(int count) throws ProCoreException {
559 return clusters.state.reservedIds++;
563 public void updateCluster(byte[] operations) throws ProCoreException {
564 ClusterInfo info = null;
566 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
567 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
569 throw new IllegalAcornStateException("info == null for operation " + operation);
571 info.scheduleUpdate();
572 mainProgram.schedule(operation);
573 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
574 throw new ProCoreException(e);
581 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
583 String[] ss = ccsId.split("\\.");
584 String chunkKey = ss[0];
585 int chunkOffset = Integer.parseInt(ss[1]);
586 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
587 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
588 chunk.acquireMutex();
590 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
591 } catch (DatabaseException e) {
593 } catch (Throwable t) {
594 throw new IllegalStateException(t);
596 chunk.releaseMutex();
600 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
601 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
603 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
605 clusters.clusterLRU.acquireMutex();
608 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
609 for(int i=0;i<proc.entries.size();i++) {
611 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
612 e.process(clusters, cs, clusterKey);
617 clusters.clusterLRU.releaseMutex();
622 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
624 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
627 public void run() throws Exception {
631 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
633 UndoClusterSupport support = new UndoClusterSupport(clusters);
635 final int changeSetId = clusters.state.headChangeSetId;
637 if(ClusterUpdateProcessorBase.DEBUG)
638 LOGGER.info(" === BEGIN UNDO ===");
640 for(int i=0;i<changeSetIds.length;i++) {
641 final long id = changeSetIds[changeSetIds.length-1-i];
642 ArrayList<String> ccss = clusters.getChanges(id);
644 for(int j=0;j<ccss.size();j++) {
645 String ccsid = ccss.get(ccss.size()-j-1);
647 if(ClusterUpdateProcessorBase.DEBUG)
648 LOGGER.info("performUndo " + ccsid);
649 performUndo(ccsid, clusterChanges, support);
650 } catch (DatabaseException e) {
656 if(ClusterUpdateProcessorBase.DEBUG)
657 LOGGER.info(" === END UNDO ===");
659 for(int i=0;i<clusterChanges.size();i++) {
661 final int changeSetIndex = i;
663 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
665 final ClusterUID cuid = pair.first;
666 final byte[] data = pair.second;
668 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
671 public long getChangeSetId() {
676 public int getChangeSetIndex() {
681 public int getNumberOfClusterChangeSets() {
682 return clusterChanges.size();
686 public int getIndexOfClusterChangeSet() {
687 return changeSetIndex;
691 public byte[] getClusterId() {
692 return cuid.asBytes();
696 public boolean getNewCluster() {
701 public byte[] getData() {
707 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
708 throw new ProCoreException(e1);
720 if(exception instanceof SDBException) throw (SDBException)exception;
721 else if(exception != null) throw new IllegalAcornStateException(exception);
727 public ServiceLocator getServiceLocator() {
732 public boolean refreshEnabled() {
737 public boolean rolledback() {
738 return clusters.rolledback();
741 public void purge() throws IllegalAcornStateException {
742 clusters.purge(locator);
745 public void purgeDatabase() {
747 if (isClosing || unexpectedClose)
750 saver.execute(new Runnable() {
754 Transaction tr = null;
756 // First take a write transaction
757 tr = askWriteTransaction(-1);
758 // Then make sure that MainProgram is idling
759 mainProgram.mutex.acquire();
761 synchronized(mainProgram) {
762 if(mainProgram.operations.isEmpty()) {
765 // MainProgram is becoming busy again - delay snapshotting
770 mainProgram.mutex.release();
772 } catch (IllegalAcornStateException | ProCoreException e) {
773 LOGGER.error("Purge failed", e);
774 unexpectedClose = true;
775 } catch (InterruptedException e) {
776 LOGGER.error("Purge interrupted", e);
780 endTransaction(tr.getTransactionId());
781 if (unexpectedClose) {
782 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
785 } catch (DatabaseException e1) {
786 LOGGER.error("Failed to close database as a safety measure due to failed purge", e1);
789 } catch (ProCoreException e) {
790 LOGGER.error("Failed to end purge write transaction", e);
798 public long getTailChangeSetId() {
799 return clusters.getTailChangeSetId();