1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.acorn;
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
24 import org.simantics.acorn.internal.ClusterChange;
25 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
26 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
27 import org.simantics.acorn.lru.ClusterInfo;
28 import org.simantics.acorn.lru.ClusterStreamChunk;
29 import org.simantics.acorn.lru.ClusterUpdateOperation;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.db.ClusterCreator;
32 import org.simantics.db.Database;
33 import org.simantics.db.ServiceLocator;
34 import org.simantics.db.common.utils.Logger;
35 import org.simantics.db.exception.DatabaseException;
36 import org.simantics.db.exception.SDBException;
37 import org.simantics.db.server.ProCoreException;
38 import org.simantics.db.service.ClusterSetsSupport;
39 import org.simantics.db.service.ClusterUID;
40 import org.simantics.utils.datastructures.Pair;
41 import org.simantics.utils.logging.TimeLogger;
43 import gnu.trove.map.hash.TLongObjectHashMap;
45 public class GraphClientImpl2 implements Database.Session {
47 public static final boolean DEBUG = false;
49 public final ClusterManager clusters;
51 private TransactionManager transactionManager = new TransactionManager();
52 private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
53 private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
55 private static GraphClientImpl2 INSTANCE;
56 private Path dbFolder;
57 private final Database database;
58 private ServiceLocator locator;
59 private MainProgram mainProgram;
61 static class ClientThreadFactory implements ThreadFactory {
66 public ClientThreadFactory(String name, boolean daemon) {
72 public Thread newThread(Runnable r) {
73 Thread thread = new Thread(r, name);
74 thread.setDaemon(daemon);
79 public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
80 this.database = database;
81 this.dbFolder = dbFolder;
82 this.locator = locator;
83 this.clusters = new ClusterManager(dbFolder);
85 ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
86 cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
87 mainProgram = new MainProgram(this, clusters);
88 executor.execute(mainProgram);
92 public Path getDbFolder() {
96 public void tryMakeSnapshot() throws IOException {
101 saver.execute(new Runnable() {
105 Transaction tr = null;
107 // First take a write transaction
108 tr = askWriteTransaction(-1);
109 // Then make sure that MainProgram is idling
110 mainProgram.mutex.acquire();
112 synchronized(mainProgram) {
113 if(mainProgram.operations.isEmpty()) {
116 // MainProgram is becoming busy again - delay snapshotting
121 mainProgram.mutex.release();
123 } catch (IOException e) {
124 Logger.defaultLogError(e);
125 } catch (ProCoreException e) {
126 Logger.defaultLogError(e);
127 } catch (InterruptedException e) {
128 Logger.defaultLogError(e);
132 endTransaction(tr.getTransactionId());
133 } catch (ProCoreException e) {
134 Logger.defaultLogError(e);
142 public void makeSnapshot(boolean force) throws IOException {
143 if (safeToMakeSnapshot)
144 clusters.makeSnapshot(locator, force);
147 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
148 return clusters.clone(uid, creator);
151 // private void save() throws IOException {
155 public void load() throws IOException {
159 // public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
160 // clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
164 public Database getDatabase() {
168 private boolean closed = false;
169 private boolean isClosing = false;
170 // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
171 private boolean safeToMakeSnapshot = true;
174 public void close() throws ProCoreException {
175 System.err.println("Closing " + this + " and mainProgram " + mainProgram);
176 if(!closed && !isClosing) {
185 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
186 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
188 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
195 } catch (IOException | InterruptedException e) {
196 throw new ProCoreException(e);
204 public void open() throws ProCoreException {
205 throw new UnsupportedOperationException();
209 public boolean isClosed() throws ProCoreException {
214 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
216 clusters.state.headChangeSetId++;
218 long committedChangeSetId = changeSetId + 1;
220 clusters.commitChangeSet(committedChangeSetId, metadata);
222 clusters.state.transactionId = transactionId;
224 mainProgram.committed();
226 TimeLogger.log("Accepted commit");
231 public long cancelCommit(long transactionId, long changeSetId,
232 byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
233 throws ProCoreException {
234 safeToMakeSnapshot = false;
235 throw new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
236 // System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
238 // undo(new long[] {changeSetId}, onChangeSetUpdate);
239 // } catch (SDBException e) {
240 // e.printStackTrace();
241 // throw new ProCoreException(e);
243 // clusters.state.headChangeSetId++;
244 // return clusters.state.headChangeSetId;
248 public Transaction askReadTransaction() throws ProCoreException {
249 return transactionManager.askReadTransaction();
252 enum TransactionState {
256 class TransactionRequest {
257 public TransactionState state;
258 public Semaphore semaphore;
259 public TransactionRequest(TransactionState state, Semaphore semaphore) {
261 this.semaphore = semaphore;
265 class TransactionManager {
267 private TransactionState currentTransactionState = TransactionState.IDLE;
269 private int reads = 0;
271 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
273 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
275 private synchronized Transaction makeTransaction(TransactionRequest req) {
277 final int csId = clusters.state.headChangeSetId;
278 final long trId = clusters.state.transactionId+1;
279 requestMap.put(trId, req);
280 return new Transaction() {
283 public long getTransactionId() {
288 public long getHeadChangeSetId() {
295 * This method cannot be synchronized since it waits and must support multiple entries
296 * by query thread(s) and internal transactions such as snapshot saver
298 public Transaction askReadTransaction() throws ProCoreException {
300 Semaphore semaphore = new Semaphore(0);
302 TransactionRequest req = queue(TransactionState.READ, semaphore);
306 } catch (InterruptedException e) {
307 throw new ProCoreException(e);
310 return makeTransaction(req);
314 private synchronized void dispatch() {
315 TransactionRequest r = requests.removeFirst();
316 if(r.state == TransactionState.READ) reads++;
317 r.semaphore.release();
320 private synchronized void processRequests() {
324 if(requests.isEmpty()) return;
325 TransactionRequest req = requests.peek();
327 if(currentTransactionState == TransactionState.IDLE) {
329 // Accept anything while IDLE
330 currentTransactionState = req.state;
333 } else if (currentTransactionState == TransactionState.READ) {
335 if(req.state == currentTransactionState) {
347 } else if (currentTransactionState == TransactionState.WRITE) {
358 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
359 TransactionRequest req = new TransactionRequest(state, semaphore);
360 requests.addLast(req);
366 * This method cannot be synchronized since it waits and must support multiple entries
367 * by query thread(s) and internal transactions such as snapshot saver
369 public Transaction askWriteTransaction()
370 throws ProCoreException {
372 Semaphore semaphore = new Semaphore(0);
374 TransactionRequest req = queue(TransactionState.WRITE, semaphore);
378 } catch (InterruptedException e) {
379 throw new ProCoreException(e);
382 mainProgram.startTransaction(clusters.state.headChangeSetId+1);
384 return makeTransaction(req);
388 public synchronized long endTransaction(long transactionId) throws ProCoreException {
390 TransactionRequest req = requestMap.remove(transactionId);
391 if(req.state == TransactionState.WRITE) {
392 currentTransactionState = TransactionState.IDLE;
397 currentTransactionState = TransactionState.IDLE;
401 return clusters.state.transactionId;
407 public Transaction askWriteTransaction(final long transactionId)
408 throws ProCoreException {
409 return transactionManager.askWriteTransaction();
413 public long endTransaction(long transactionId) throws ProCoreException {
414 return transactionManager.endTransaction(transactionId);
418 public String execute(String command) throws ProCoreException {
419 // This is called only by WriteGraphImpl.commitAccessorChanges
420 // We can ignore this in Acorn
425 public byte[] getChangeSetMetadata(long changeSetId)
426 throws ProCoreException {
427 return clusters.getMetadata(changeSetId);
431 public ChangeSetData getChangeSetData(long minChangeSetId,
432 long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
433 throws ProCoreException {
435 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
441 public ChangeSetIds getChangeSetIds() throws ProCoreException {
442 throw new UnsupportedOperationException();
446 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
447 throw new UnsupportedOperationException();
451 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
452 throws ProCoreException {
453 throw new UnsupportedOperationException();
457 public ClusterIds getClusterIds() throws ProCoreException {
458 return clusters.getClusterIds();
462 public Information getInformation() throws ProCoreException {
463 return new Information() {
466 public String getServerId() {
471 public String getProtocolId() {
476 public String getDatabaseId() {
481 public long getFirstChangeSetId() {
489 public Refresh getRefresh(long changeSetId) throws ProCoreException {
491 final ClusterIds ids = getClusterIds();
493 return new Refresh() {
496 public long getHeadChangeSetId() {
497 return clusters.state.headChangeSetId;
501 public long[] getFirst() {
502 return ids.getFirst();
506 public long[] getSecond() {
507 return ids.getSecond();
514 public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
515 return clusters.getResourceFile(clusterUID, resourceIndex);
519 public ResourceSegment getResourceSegment(final byte[] clusterUID,
520 final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
522 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
527 public long reserveIds(int count) throws ProCoreException {
528 return clusters.state.reservedIds++;
532 public void updateCluster(byte[] operations) throws ProCoreException {
534 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
535 ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
536 if(info == null) throw new IllegalStateException();
539 info.scheduleUpdate();
540 mainProgram.schedule(operation);
541 } catch (Throwable t) {
542 throw new IllegalStateException(t);
549 private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
551 String[] ss = ccsId.split("\\.");
552 String chunkKey = ss[0];
553 int chunkOffset = Integer.parseInt(ss[1]);
554 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
555 if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
556 chunk.acquireMutex();
558 return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
559 } catch (Throwable t) {
560 throw new IllegalStateException(t);
562 chunk.releaseMutex();
567 private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
569 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
571 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
573 clusters.clusterLRU.acquireMutex();
576 ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
577 for(int i=0;i<proc.entries.size();i++) {
579 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
580 e.process(clusters, cs, clusterKey);
587 clusters.clusterLRU.releaseMutex();
593 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
595 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
597 UndoClusterSupport support = new UndoClusterSupport(clusters);
599 final int changeSetId = clusters.state.headChangeSetId;
601 if(ClusterUpdateProcessorBase.DEBUG)
602 System.err.println(" === BEGIN UNDO ===");
604 for(int i=0;i<changeSetIds.length;i++) {
605 final long id = changeSetIds[changeSetIds.length-1-i];
606 ArrayList<String> ccss = clusters.getChanges(id);
607 for(int j=0;j<ccss.size();j++) {
609 if(ClusterUpdateProcessorBase.DEBUG)
610 System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
611 performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
612 } catch (DatabaseException e) {
618 if(ClusterUpdateProcessorBase.DEBUG)
619 System.err.println(" === END UNDO ===");
621 for(int i=0;i<clusterChanges.size();i++) {
623 final int changeSetIndex = i;
625 final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
627 final ClusterUID cuid = pair.first;
628 final byte[] data = pair.second;
630 onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
633 public long getChangeSetId() {
638 public int getChangeSetIndex() {
643 public int getNumberOfClusterChangeSets() {
644 return clusterChanges.size();
648 public int getIndexOfClusterChangeSet() {
649 return changeSetIndex;
653 public byte[] getClusterId() {
654 return cuid.asBytes();
658 public boolean getNewCluster() {
663 public byte[] getData() {
676 public static GraphClientImpl2 getInstance() {
680 public ServiceLocator getServiceLocator() {
685 public boolean refreshEnabled() {
699 ////////////////////////