1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
24 import org.simantics.acorn.exception.AcornAccessVerificationException;
25 import org.simantics.acorn.exception.IllegalAcornStateException;
26 import org.simantics.acorn.internal.ClusterChange;
27 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
28 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
29 import org.simantics.acorn.lru.ClusterInfo;
30 import org.simantics.acorn.lru.ClusterStreamChunk;
31 import org.simantics.acorn.lru.ClusterUpdateOperation;
32 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
33 import org.simantics.db.ClusterCreator;
34 import org.simantics.db.Database;
35 import org.simantics.db.ServiceLocator;
36 import org.simantics.db.common.utils.Logger;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
46 import gnu.trove.map.hash.TLongObjectHashMap;
48 public class GraphClientImpl2 implements Database.Session {
50 public static final boolean DEBUG = false;
52 public final ClusterManager clusters;
54 private TransactionManager transactionManager = new TransactionManager();
55 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
56 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
58 private Path dbFolder;
59 private final Database database;
60 private ServiceLocator locator;
61 private MainProgram mainProgram;
63 static class ClientThreadFactory implements ThreadFactory {
68 public ClientThreadFactory(String name, boolean daemon) {
74 public Thread newThread(Runnable r) {
75 Thread thread = new Thread(r, name);
76 thread.setDaemon(daemon);
81 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
82 this.database = database;
83 this.dbFolder = dbFolder;
84 this.locator = locator;
85 this.clusters = new ClusterManager(dbFolder);
87 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
88 cssi.setReadDirectory(clusters.lastSessionDirectory);
89 mainProgram = new MainProgram(this, clusters);
90 executor.execute(mainProgram);
93 public Path getDbFolder() {
97 public void tryMakeSnapshot() throws IOException {
99 if (isClosing || unexpectedClose)
102 saver.execute(new Runnable() {
106 Transaction tr = null;
108 // First take a write transaction
109 tr = askWriteTransaction(-1);
110 // Then make sure that MainProgram is idling
111 mainProgram.mutex.acquire();
113 synchronized(mainProgram) {
114 if(mainProgram.operations.isEmpty()) {
117 // MainProgram is becoming busy again - delay snapshotting
122 mainProgram.mutex.release();
124 } catch (IllegalAcornStateException | ProCoreException e) {
125 Logger.defaultLogError(e);
126 unexpectedClose = true;
127 } catch (InterruptedException e) {
128 Logger.defaultLogError(e);
132 endTransaction(tr.getTransactionId());
133 if (unexpectedClose) {
134 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
137 } catch (DatabaseException e1) {
138 Logger.defaultLogError(e1);
141 } catch (ProCoreException e) {
142 Logger.defaultLogError(e);
149 public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
150 clusters.makeSnapshot(locator, fullSave);
153 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
155 return clusters.clone(uid, creator);
156 } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
157 unexpectedClose = true;
158 throw new DatabaseException(e);
162 // private void save() throws IOException {
166 public void load() throws IOException {
170 // public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
171 // clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
175 public Database getDatabase() {
179 private boolean closed = false;
180 private boolean isClosing = false;
181 private boolean unexpectedClose = false;
184 public void close() throws ProCoreException {
185 System.err.println("Closing " + this + " and mainProgram " + mainProgram);
186 if(!closed && !isClosing) {
189 if (!unexpectedClose)
196 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
197 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
199 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
205 } catch (IllegalAcornStateException | InterruptedException e) {
206 throw new ProCoreException(e);
214 public void open() throws ProCoreException {
215 throw new UnsupportedOperationException();
219 public boolean isClosed() throws ProCoreException {
224 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
225 clusters.state.headChangeSetId++;
226 long committedChangeSetId = changeSetId + 1;
228 clusters.commitChangeSet(committedChangeSetId, metadata);
230 clusters.state.transactionId = transactionId;
232 mainProgram.committed();
234 TimeLogger.log("Accepted commit");
235 } catch (IllegalAcornStateException e) {
236 throw new ProCoreException(e);
241 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
242 UnsupportedOperationException e = new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
243 clusters.notSafeToMakeSnapshot(new IllegalAcornStateException(e));
245 // System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
247 // undo(new long[] {changeSetId}, onChangeSetUpdate);
248 // } catch (SDBException e) {
249 // e.printStackTrace();
250 // throw new ProCoreException(e);
252 // clusters.state.headChangeSetId++;
253 // return clusters.state.headChangeSetId;
257 public Transaction askReadTransaction() throws ProCoreException {
258 return transactionManager.askReadTransaction();
261 enum TransactionState {
265 class TransactionRequest {
266 public TransactionState state;
267 public Semaphore semaphore;
268 public TransactionRequest(TransactionState state, Semaphore semaphore) {
270 this.semaphore = semaphore;
274 class TransactionManager {
276 private TransactionState currentTransactionState = TransactionState.IDLE;
278 private int reads = 0;
280 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
282 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
284 private synchronized Transaction makeTransaction(TransactionRequest req) {
286 final int csId = clusters.state.headChangeSetId;
287 final long trId = clusters.state.transactionId+1;
288 requestMap.put(trId, req);
289 return new Transaction() {
292 public long getTransactionId() {
297 public long getHeadChangeSetId() {
304 * This method cannot be synchronized since it waits and must support multiple entries
305 * by query thread(s) and internal transactions such as snapshot saver
307 public Transaction askReadTransaction() throws ProCoreException {
309 Semaphore semaphore = new Semaphore(0);
311 TransactionRequest req = queue(TransactionState.READ, semaphore);
315 } catch (InterruptedException e) {
316 throw new ProCoreException(e);
319 return makeTransaction(req);
323 private synchronized void dispatch() {
324 TransactionRequest r = requests.removeFirst();
325 if(r.state == TransactionState.READ) reads++;
326 r.semaphore.release();
329 private synchronized void processRequests() {
333 if(requests.isEmpty()) return;
334 TransactionRequest req = requests.peek();
336 if(currentTransactionState == TransactionState.IDLE) {
338 // Accept anything while IDLE
339 currentTransactionState = req.state;
342 } else if (currentTransactionState == TransactionState.READ) {
344 if(req.state == currentTransactionState) {
356 } else if (currentTransactionState == TransactionState.WRITE) {
367 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
368 TransactionRequest req = new TransactionRequest(state, semaphore);
369 requests.addLast(req);
375 * This method cannot be synchronized since it waits and must support multiple entries
376 * by query thread(s) and internal transactions such as snapshot saver
378 public Transaction askWriteTransaction() throws IllegalAcornStateException {
380 Semaphore semaphore = new Semaphore(0);
381 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
385 } catch (InterruptedException e) {
386 throw new IllegalAcornStateException(e);
388 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
389 return makeTransaction(req);
392 public synchronized long endTransaction(long transactionId) throws ProCoreException {
394 TransactionRequest req = requestMap.remove(transactionId);
395 if(req.state == TransactionState.WRITE) {
396 currentTransactionState = TransactionState.IDLE;
401 currentTransactionState = TransactionState.IDLE;
405 return clusters.state.transactionId;
411 public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
413 if (isClosing || unexpectedClose || closed) {
414 throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
416 return transactionManager.askWriteTransaction();
417 } catch (IllegalAcornStateException e) {
418 throw new ProCoreException(e);
423 public long endTransaction(long transactionId) throws ProCoreException {
424 return transactionManager.endTransaction(transactionId);
428 public String execute(String command) throws ProCoreException {
429 // This is called only by WriteGraphImpl.commitAccessorChanges
430 // We can ignore this in Acorn
435 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
437 return clusters.getMetadata(changeSetId);
438 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
439 throw new ProCoreException(e);
444 public ChangeSetData getChangeSetData(long minChangeSetId,
445 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
446 throws ProCoreException {
448 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
454 public ChangeSetIds getChangeSetIds() throws ProCoreException {
455 throw new UnsupportedOperationException();
459 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
460 throw new UnsupportedOperationException();
464 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
465 throws ProCoreException {
466 throw new UnsupportedOperationException();
470 public ClusterIds getClusterIds() throws ProCoreException {
472 return clusters.getClusterIds();
473 } catch (IllegalAcornStateException e) {
474 throw new ProCoreException(e);
479 public Information getInformation() throws ProCoreException {
480 return new Information() {
483 public String getServerId() {
488 public String getProtocolId() {
493 public String getDatabaseId() {
498 public long getFirstChangeSetId() {
506 public Refresh getRefresh(long changeSetId) throws ProCoreException {
508 final ClusterIds ids = getClusterIds();
510 return new Refresh() {
513 public long getHeadChangeSetId() {
514 return clusters.state.headChangeSetId;
518 public long[] getFirst() {
519 return ids.getFirst();
523 public long[] getSecond() {
524 return ids.getSecond();
531 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
532 return clusters.getResourceFile(clusterUID, resourceIndex);
536 public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
538 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
539 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
540 throw new ProCoreException(e);
545 public long reserveIds(int count) throws ProCoreException {
546 return clusters.state.reservedIds++;
550 public void updateCluster(byte[] operations) throws ProCoreException {
551 ClusterInfo info = null;
553 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
554 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
556 throw new IllegalAcornStateException("info == null for operation " + operation);
558 info.scheduleUpdate();
559 mainProgram.schedule(operation);
560 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
561 throw new ProCoreException(e);
568 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
570 String[] ss = ccsId.split("\\.");
571 String chunkKey = ss[0];
572 int chunkOffset = Integer.parseInt(ss[1]);
573 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
574 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
575 chunk.acquireMutex();
577 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
578 } catch (DatabaseException e) {
580 } catch (Throwable t) {
581 throw new IllegalStateException(t);
583 chunk.releaseMutex();
587 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
588 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
590 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
592 clusters.clusterLRU.acquireMutex();
595 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
596 for(int i=0;i<proc.entries.size();i++) {
598 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
599 e.process(clusters, cs, clusterKey);
604 clusters.clusterLRU.releaseMutex();
609 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
611 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
613 UndoClusterSupport support = new UndoClusterSupport(clusters);
615 final int changeSetId = clusters.state.headChangeSetId;
617 if(ClusterUpdateProcessorBase.DEBUG)
618 System.err.println(" === BEGIN UNDO ===");
620 for(int i=0;i<changeSetIds.length;i++) {
621 final long id = changeSetIds[changeSetIds.length-1-i];
622 ArrayList<String> ccss = clusters.getChanges(id);
624 for(int j=0;j<ccss.size();j++) {
626 if(ClusterUpdateProcessorBase.DEBUG)
627 System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
628 performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
629 } catch (DatabaseException e) {
635 if(ClusterUpdateProcessorBase.DEBUG)
636 System.err.println(" === END UNDO ===");
638 for(int i=0;i<clusterChanges.size();i++) {
640 final int changeSetIndex = i;
642 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
644 final ClusterUID cuid = pair.first;
645 final byte[] data = pair.second;
647 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
650 public long getChangeSetId() {
655 public int getChangeSetIndex() {
660 public int getNumberOfClusterChangeSets() {
661 return clusterChanges.size();
665 public int getIndexOfClusterChangeSet() {
666 return changeSetIndex;
670 public byte[] getClusterId() {
671 return cuid.asBytes();
675 public boolean getNewCluster() {
680 public byte[] getData() {
686 } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
687 throw new ProCoreException(e1);
692 public ServiceLocator getServiceLocator() {
697 public boolean refreshEnabled() {
702 public boolean rolledback() {
703 return clusters.rolledback();
716 ////////////////////////