--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.tests.client;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Session;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.common.TransactionPolicyKeep;
+import org.simantics.db.common.TransactionPolicyRelease;
+import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.request.Read;
+import org.simantics.db.service.LifecycleSupport;
+import org.simantics.db.service.TransactionPolicySupport;
+import org.simantics.db.testing.base.ExistingDatabaseTest;
+import org.simantics.db.testing.common.Tests;
+import org.simantics.db.testing.impl.Configuration;
+
+public class TransactionTest1 extends ExistingDatabaseTest {
+ @Test
+ public void testTransaction1() throws DatabaseException {
+ empty1();
+ empty2();
+// empty3();
+ }
+ void empty1() throws DatabaseException {
+ TransactionClient1 client = new TransactionClient1(this);
+ try {
+ client.setPolicyRelease();
+ client.emptySyncReadLoop();
+ client.setPolicyKeep();
+ client.emptySyncReadLoop();
+ } finally {
+ if (null != client)
+ client.close();
+ }
+ }
+ void empty2() throws DatabaseException {
+ TransactionClient1 client = new TransactionClient1(this);
+ TransactionClient2 client2 = new TransactionClient2(this);
+ try {
+ client2.tryToKeepReadTransaction();
+ client.setPolicyRelease();
+ client.emptySyncReadLoop();
+ client.setPolicyKeep();
+ client.emptySyncReadLoop();
+ } finally {
+ client.close();
+ client2.close();
+ }
+ }
+ public void empty3() throws DatabaseException {
+ TransactionClient1 client = new TransactionClient1(this);
+ TransactionClient2 client2 = new TransactionClient2(this);
+ try {
+ client2.tryToKeepWriteTransaction();
+ client.setPolicyRelease();
+ client.emptySyncReadLoop();
+ client.setPolicyKeep();
+ client.emptySyncReadLoop();
+ } finally {
+ client.close();
+ client2.close();
+ }
+ }
+}
+class TranasctionSession {
+ protected static final boolean DEBUG = Configuration.get().debug;
+ protected Session session;
+ TranasctionSession() throws DatabaseException {
+ session = Tests.getTestHandler().getSession();
+ }
+ public void close() throws DatabaseException {
+ try {
+ if (null != session) {
+ LifecycleSupport ls = session.getService(LifecycleSupport.class);
+ ls.close();
+ }
+ } finally {
+ session = null;
+ }
+ }
+}
+class TransctionCommon extends TranasctionSession {
+ protected final ExistingDatabaseTest testCommon;
+ protected final int REQUEST_COUNT = 50;
+ private AtomicInteger counter = new AtomicInteger(0);
+ TransctionCommon(ExistingDatabaseTest testCommon)
+ throws DatabaseException {
+ this.testCommon = testCommon;
+ }
+ public void emptySyncReadLoop()
+ throws DatabaseException {
+ counter.set(0);
+ long start = System.nanoTime();
+ for(int i = 0; i < REQUEST_COUNT; ++i) {
+ session.syncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ int value = counter.incrementAndGet();
+ if (DEBUG)
+ System.out.println("DEBUG: Sync read count=" + value);
+ }
+ });
+ }
+ long end = System.nanoTime();
+ double time = (double)(end-start) * (double)(1e-9);
+ if (REQUEST_COUNT != counter.get())
+ throw new DatabaseException("Transaction count does not match. Transaction counter was " + counter);
+ double speed = counter.get() / time;
+ String t = "Transaction speed was " + speed + " empty synchronous read transactions per second.";
+ if (DEBUG)
+ System.out.println(t);
+ }
+ void setPolicyKeep() {
+ if (DEBUG)
+ System.out.println("Transaction policy is keep.");
+ session.registerService(TransactionPolicySupport.class, new TransactionPolicyKeep());
+ }
+ void setPolicyRelease() {
+ if (DEBUG)
+ System.out.println("Transaction policy is release.");
+ session.registerService(TransactionPolicySupport.class, new TransactionPolicyRelease());
+ }
+}
+
+class TransactionClient1 extends TransctionCommon {
+ TransactionClient1(ExistingDatabaseTest testCommon)
+ throws DatabaseException {
+ super(testCommon);
+ }
+}
+
+class TransactionClient2 extends TransctionCommon
+implements TransactionPolicySupport {
+ Thread tryToKeepWrite;
+ int in;
+ int out;
+ AtomicBoolean breakTransaction = new AtomicBoolean(false);
+ Semaphore relinquish = new Semaphore(0);
+ boolean goOn;
+ TransactionClient2(ExistingDatabaseTest testCommon)
+ throws DatabaseException {
+ super(testCommon);
+ init();
+ }
+ private void init() {
+ synchronized (this) {
+ in = 0;
+ out = 0;
+ breakTransaction.set(false);
+ goOn = true;
+ }
+ }
+ public void tryToKeepReadTransaction()
+ throws DatabaseException {
+ init();
+ setPolicyKeep();
+ session.registerService(TransactionPolicySupport.class, this);
+ startReadTransaction();
+ }
+ public void tryToKeepWriteTransaction()
+ throws DatabaseException {
+ init();
+ setPolicyKeep();
+ session.registerService(TransactionPolicySupport.class, this);
+// startWriteTransaction();
+ this.tryToKeepWrite = new Thread(new TryToKeepWrite(), "TryToKeepWrite");
+ this.tryToKeepWrite.start();
+ }
+ @Override
+ public boolean holdOnToTransactionAfterRead() {
+ return true;
+ }
+ @Override
+ public boolean holdOnToTransactionAfterCancel() {
+ return true;
+ }
+ @Override
+ public boolean holdOnToTransactionAfterCommit() {
+ return true;
+ }
+ @Override
+ public void onRelinquish() {
+ freeTransactionSync();
+ }
+
+ @Override
+ public void onRelinquishDone() {
+ relinquish.release();
+ }
+
+ @Override
+ public void onRelinquishError() {
+ relinquish.release();
+ init();
+ try {
+ super.close();
+ } catch (DatabaseException e) {
+ System.out.println("OnRelinquishError: " + e.getMessage() + ".");
+ }
+ System.exit(-1);
+ }
+
+ @Override
+ public void close()
+ throws DatabaseException {
+ goOn = false;
+ freeTransactionSync();
+ // This HACK gives time for implementation to remove
+ // the asynchronous call from bookkeeping. At this point
+ // one asynchronous call might still be active.
+ try {
+ session.syncRequest(new WriteRequest() {
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ }
+ });
+ } catch (DatabaseException e) {
+ System.out.println("Empty sync write failed: " + e.getMessage());
+ }
+ super.close();
+ }
+ void freeTransactionSync() {
+ breakTransaction.set(true);
+ synchronized (this) {
+ while (in > out) {
+ this.notifyAll();
+ try {
+ TransactionClient2.this.wait();
+ } catch (InterruptedException e) {
+ System.out.println("Wait was interruped: " + e);
+ }
+ }
+ }
+ }
+ private void startReadTransaction()
+ throws DatabaseException {
+ session.asyncRequest(new Read<Integer>() {
+ @Override
+ public Integer perform(ReadGraph graph) throws DatabaseException {
+ synchronized (TransactionClient2.this) {
+ TransactionClient2.this.transactionIn();
+ if (DEBUG)
+ System.out.println("DEBUG: Read request begin ok, count= " + in);
+ while (TransactionClient2.this.transactionKeep()) {
+ try {
+ TransactionClient2.this.wait();
+ } catch (InterruptedException e) {
+ if (DEBUG)
+ System.out.println("DEBUG: read request wait interrupted: " + e.getMessage());
+ }
+ }
+ }
+ return in;
+ }
+ }, new AsyncProcedure<Integer>() {
+ @Override
+ public void execute(AsyncReadGraph graph, Integer result) {
+ TransactionClient2.this.transactionOut();
+ if (DEBUG)
+ System.out.println("DEBUG: Read request end ok, count= " + result);
+ }
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ TransactionClient2.this.transactionOut();
+ if (DEBUG)
+ System.out.println("DEBUG: read request end failed: " + throwable.getMessage());
+ }
+ });
+ synchronized (this) {
+ while (in <= out) {
+ try {
+ TransactionClient2.this.wait();
+ } catch (InterruptedException e) {
+ if (DEBUG)
+ System.out.println("DEBUG: read request sleep was interruped: " + e);
+ }
+ }
+ }
+ }
+ synchronized boolean transactionKeep() {
+ return !breakTransaction.get();
+ }
+ synchronized int transactionIn() {
+ ++in;
+ this.notify();
+ return in;
+ }
+ synchronized int transactionOut() {
+ ++out;
+ this.notify();
+ return out;
+ }
+ class TryToKeepWrite implements Runnable {
+ TryToKeepWrite() {
+ }
+ @Override
+ public void run() {
+ while (goOn) {
+ try {
+ session.syncRequest(new WriteRequest() {
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ synchronized (TransactionClient2.this) {
+ TransactionClient2.this.transactionIn();
+ if (DEBUG)
+ System.out.println("DEBUG: Write request begin ok, count= " + in);
+ while (TransactionClient2.this.transactionKeep()) {
+ try {
+ TransactionClient2.this.wait();
+ } catch (InterruptedException e) {
+ if (DEBUG)
+ System.out.println("DEBUG: Write request wait interrupted: " + e.getMessage());
+ }
+ }
+ }
+ }
+ });
+ } catch (DatabaseException e) {
+ System.out.println("Empty sync write failed: " + e.getMessage());
+ } finally {
+ TransactionClient2.this.transactionOut();
+ }
+ while (goOn)
+ try {
+ relinquish.acquire();
+ break;
+ } catch (InterruptedException e) {
+ if (DEBUG)
+ System.out.println("DEBUG: Relinquish wait interrupted: " + e.getMessage());
+ }
+ if (DEBUG)
+ System.out.println("DEBUG: Write request end, count= " + out);
+ }
+ }
+ }
+}