X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=tests%2Forg.simantics.db.tests%2Fsrc%2Forg%2Fsimantics%2Fdb%2Ftests%2Fclient%2FTransactionTest1.java;fp=tests%2Forg.simantics.db.tests%2Fsrc%2Forg%2Fsimantics%2Fdb%2Ftests%2Fclient%2FTransactionTest1.java;h=68c8ffe6a4844e4e025baaf62f49b9ee5f589528;hb=67fd62f9c742337ec80eef658192db198a0efaac;hp=0000000000000000000000000000000000000000;hpb=cde82ba81327d5515fdca362f7f4c70f5103ae80;p=simantics%2Fplatform.git diff --git a/tests/org.simantics.db.tests/src/org/simantics/db/tests/client/TransactionTest1.java b/tests/org.simantics.db.tests/src/org/simantics/db/tests/client/TransactionTest1.java new file mode 100644 index 000000000..68c8ffe6a --- /dev/null +++ b/tests/org.simantics.db.tests/src/org/simantics/db/tests/client/TransactionTest1.java @@ -0,0 +1,355 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.tests.client; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.ReadGraph; +import org.simantics.db.Session; +import org.simantics.db.WriteGraph; +import org.simantics.db.common.TransactionPolicyKeep; +import org.simantics.db.common.TransactionPolicyRelease; +import org.simantics.db.common.request.ReadRequest; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.request.Read; +import org.simantics.db.service.LifecycleSupport; +import org.simantics.db.service.TransactionPolicySupport; +import org.simantics.db.testing.base.ExistingDatabaseTest; +import org.simantics.db.testing.common.Tests; +import org.simantics.db.testing.impl.Configuration; + +public class TransactionTest1 extends ExistingDatabaseTest { + @Test + public void testTransaction1() throws DatabaseException { + empty1(); + empty2(); +// empty3(); + } + void empty1() throws DatabaseException { + TransactionClient1 client = new TransactionClient1(this); + try { + client.setPolicyRelease(); + client.emptySyncReadLoop(); + client.setPolicyKeep(); + client.emptySyncReadLoop(); + } finally { + if (null != client) + client.close(); + } + } + void empty2() throws DatabaseException { + TransactionClient1 client = new TransactionClient1(this); + TransactionClient2 client2 = new TransactionClient2(this); + try { + client2.tryToKeepReadTransaction(); + client.setPolicyRelease(); + client.emptySyncReadLoop(); + client.setPolicyKeep(); + client.emptySyncReadLoop(); + } finally { + client.close(); + client2.close(); + } + } + public void empty3() throws DatabaseException { + TransactionClient1 client = new TransactionClient1(this); + TransactionClient2 client2 = new TransactionClient2(this); + try { + client2.tryToKeepWriteTransaction(); + client.setPolicyRelease(); + client.emptySyncReadLoop(); + client.setPolicyKeep(); + client.emptySyncReadLoop(); + } finally { + client.close(); + client2.close(); + } + } +} +class TranasctionSession { + protected static final boolean DEBUG = Configuration.get().debug; + protected Session session; + TranasctionSession() throws DatabaseException { + session = Tests.getTestHandler().getSession(); + } + public void close() throws DatabaseException { + try { + if (null != session) { + LifecycleSupport ls = session.getService(LifecycleSupport.class); + ls.close(); + } + } finally { + session = null; + } + } +} +class TransctionCommon extends TranasctionSession { + protected final ExistingDatabaseTest testCommon; + protected final int REQUEST_COUNT = 50; + private AtomicInteger counter = new AtomicInteger(0); + TransctionCommon(ExistingDatabaseTest testCommon) + throws DatabaseException { + this.testCommon = testCommon; + } + public void emptySyncReadLoop() + throws DatabaseException { + counter.set(0); + long start = System.nanoTime(); + for(int i = 0; i < REQUEST_COUNT; ++i) { + session.syncRequest(new ReadRequest() { + @Override + public void run(ReadGraph graph) throws DatabaseException { + int value = counter.incrementAndGet(); + if (DEBUG) + System.out.println("DEBUG: Sync read count=" + value); + } + }); + } + long end = System.nanoTime(); + double time = (double)(end-start) * (double)(1e-9); + if (REQUEST_COUNT != counter.get()) + throw new DatabaseException("Transaction count does not match. Transaction counter was " + counter); + double speed = counter.get() / time; + String t = "Transaction speed was " + speed + " empty synchronous read transactions per second."; + if (DEBUG) + System.out.println(t); + } + void setPolicyKeep() { + if (DEBUG) + System.out.println("Transaction policy is keep."); + session.registerService(TransactionPolicySupport.class, new TransactionPolicyKeep()); + } + void setPolicyRelease() { + if (DEBUG) + System.out.println("Transaction policy is release."); + session.registerService(TransactionPolicySupport.class, new TransactionPolicyRelease()); + } +} + +class TransactionClient1 extends TransctionCommon { + TransactionClient1(ExistingDatabaseTest testCommon) + throws DatabaseException { + super(testCommon); + } +} + +class TransactionClient2 extends TransctionCommon +implements TransactionPolicySupport { + Thread tryToKeepWrite; + int in; + int out; + AtomicBoolean breakTransaction = new AtomicBoolean(false); + Semaphore relinquish = new Semaphore(0); + boolean goOn; + TransactionClient2(ExistingDatabaseTest testCommon) + throws DatabaseException { + super(testCommon); + init(); + } + private void init() { + synchronized (this) { + in = 0; + out = 0; + breakTransaction.set(false); + goOn = true; + } + } + public void tryToKeepReadTransaction() + throws DatabaseException { + init(); + setPolicyKeep(); + session.registerService(TransactionPolicySupport.class, this); + startReadTransaction(); + } + public void tryToKeepWriteTransaction() + throws DatabaseException { + init(); + setPolicyKeep(); + session.registerService(TransactionPolicySupport.class, this); +// startWriteTransaction(); + this.tryToKeepWrite = new Thread(new TryToKeepWrite(), "TryToKeepWrite"); + this.tryToKeepWrite.start(); + } + @Override + public boolean holdOnToTransactionAfterRead() { + return true; + } + @Override + public boolean holdOnToTransactionAfterCancel() { + return true; + } + @Override + public boolean holdOnToTransactionAfterCommit() { + return true; + } + @Override + public void onRelinquish() { + freeTransactionSync(); + } + + @Override + public void onRelinquishDone() { + relinquish.release(); + } + + @Override + public void onRelinquishError() { + relinquish.release(); + init(); + try { + super.close(); + } catch (DatabaseException e) { + System.out.println("OnRelinquishError: " + e.getMessage() + "."); + } + System.exit(-1); + } + + @Override + public void close() + throws DatabaseException { + goOn = false; + freeTransactionSync(); + // This HACK gives time for implementation to remove + // the asynchronous call from bookkeeping. At this point + // one asynchronous call might still be active. + try { + session.syncRequest(new WriteRequest() { + @Override + public void perform(WriteGraph graph) throws DatabaseException { + } + }); + } catch (DatabaseException e) { + System.out.println("Empty sync write failed: " + e.getMessage()); + } + super.close(); + } + void freeTransactionSync() { + breakTransaction.set(true); + synchronized (this) { + while (in > out) { + this.notifyAll(); + try { + TransactionClient2.this.wait(); + } catch (InterruptedException e) { + System.out.println("Wait was interruped: " + e); + } + } + } + } + private void startReadTransaction() + throws DatabaseException { + session.asyncRequest(new Read() { + @Override + public Integer perform(ReadGraph graph) throws DatabaseException { + synchronized (TransactionClient2.this) { + TransactionClient2.this.transactionIn(); + if (DEBUG) + System.out.println("DEBUG: Read request begin ok, count= " + in); + while (TransactionClient2.this.transactionKeep()) { + try { + TransactionClient2.this.wait(); + } catch (InterruptedException e) { + if (DEBUG) + System.out.println("DEBUG: read request wait interrupted: " + e.getMessage()); + } + } + } + return in; + } + }, new AsyncProcedure() { + @Override + public void execute(AsyncReadGraph graph, Integer result) { + TransactionClient2.this.transactionOut(); + if (DEBUG) + System.out.println("DEBUG: Read request end ok, count= " + result); + } + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + TransactionClient2.this.transactionOut(); + if (DEBUG) + System.out.println("DEBUG: read request end failed: " + throwable.getMessage()); + } + }); + synchronized (this) { + while (in <= out) { + try { + TransactionClient2.this.wait(); + } catch (InterruptedException e) { + if (DEBUG) + System.out.println("DEBUG: read request sleep was interruped: " + e); + } + } + } + } + synchronized boolean transactionKeep() { + return !breakTransaction.get(); + } + synchronized int transactionIn() { + ++in; + this.notify(); + return in; + } + synchronized int transactionOut() { + ++out; + this.notify(); + return out; + } + class TryToKeepWrite implements Runnable { + TryToKeepWrite() { + } + @Override + public void run() { + while (goOn) { + try { + session.syncRequest(new WriteRequest() { + @Override + public void perform(WriteGraph graph) throws DatabaseException { + synchronized (TransactionClient2.this) { + TransactionClient2.this.transactionIn(); + if (DEBUG) + System.out.println("DEBUG: Write request begin ok, count= " + in); + while (TransactionClient2.this.transactionKeep()) { + try { + TransactionClient2.this.wait(); + } catch (InterruptedException e) { + if (DEBUG) + System.out.println("DEBUG: Write request wait interrupted: " + e.getMessage()); + } + } + } + } + }); + } catch (DatabaseException e) { + System.out.println("Empty sync write failed: " + e.getMessage()); + } finally { + TransactionClient2.this.transactionOut(); + } + while (goOn) + try { + relinquish.acquire(); + break; + } catch (InterruptedException e) { + if (DEBUG) + System.out.println("DEBUG: Relinquish wait interrupted: " + e.getMessage()); + } + if (DEBUG) + System.out.println("DEBUG: Write request end, count= " + out); + } + } + } +}