/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.tests.client; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ReadGraph; import org.simantics.db.Session; import org.simantics.db.WriteGraph; import org.simantics.db.common.TransactionPolicyKeep; import org.simantics.db.common.TransactionPolicyRelease; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.Read; import org.simantics.db.service.LifecycleSupport; import org.simantics.db.service.TransactionPolicySupport; import org.simantics.db.testing.base.ExistingDatabaseTest; import org.simantics.db.testing.common.Tests; import org.simantics.db.testing.impl.Configuration; public class TransactionTest1 extends ExistingDatabaseTest { @Test public void testTransaction1() throws DatabaseException { empty1(); empty2(); // empty3(); } void empty1() throws DatabaseException { TransactionClient1 client = new TransactionClient1(this); try { client.setPolicyRelease(); client.emptySyncReadLoop(); client.setPolicyKeep(); client.emptySyncReadLoop(); } finally { if (null != client) client.close(); } } void empty2() throws DatabaseException { TransactionClient1 client = new TransactionClient1(this); TransactionClient2 client2 = new TransactionClient2(this); try { client2.tryToKeepReadTransaction(); client.setPolicyRelease(); client.emptySyncReadLoop(); client.setPolicyKeep(); client.emptySyncReadLoop(); } finally { client.close(); client2.close(); } } public void empty3() throws DatabaseException { TransactionClient1 client = new TransactionClient1(this); TransactionClient2 client2 = new TransactionClient2(this); try { client2.tryToKeepWriteTransaction(); client.setPolicyRelease(); client.emptySyncReadLoop(); client.setPolicyKeep(); client.emptySyncReadLoop(); } finally { client.close(); client2.close(); } } } class TranasctionSession { protected static final boolean DEBUG = Configuration.get().debug; protected Session session; TranasctionSession() throws DatabaseException { session = Tests.getTestHandler().getSession(); } public void close() throws DatabaseException { try { if (null != session) { LifecycleSupport ls = session.getService(LifecycleSupport.class); ls.close(); } } finally { session = null; } } } class TransctionCommon extends TranasctionSession { protected final ExistingDatabaseTest testCommon; protected final int REQUEST_COUNT = 50; private AtomicInteger counter = new AtomicInteger(0); TransctionCommon(ExistingDatabaseTest testCommon) throws DatabaseException { this.testCommon = testCommon; } public void emptySyncReadLoop() throws DatabaseException { counter.set(0); long start = System.nanoTime(); for(int i = 0; i < REQUEST_COUNT; ++i) { session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { int value = counter.incrementAndGet(); if (DEBUG) System.out.println("DEBUG: Sync read count=" + value); } }); } long end = System.nanoTime(); double time = (double)(end-start) * (double)(1e-9); if (REQUEST_COUNT != counter.get()) throw new DatabaseException("Transaction count does not match. Transaction counter was " + counter); double speed = counter.get() / time; String t = "Transaction speed was " + speed + " empty synchronous read transactions per second."; if (DEBUG) System.out.println(t); } void setPolicyKeep() { if (DEBUG) System.out.println("Transaction policy is keep."); session.registerService(TransactionPolicySupport.class, new TransactionPolicyKeep()); } void setPolicyRelease() { if (DEBUG) System.out.println("Transaction policy is release."); session.registerService(TransactionPolicySupport.class, new TransactionPolicyRelease()); } } class TransactionClient1 extends TransctionCommon { TransactionClient1(ExistingDatabaseTest testCommon) throws DatabaseException { super(testCommon); } } class TransactionClient2 extends TransctionCommon implements TransactionPolicySupport { Thread tryToKeepWrite; int in; int out; AtomicBoolean breakTransaction = new AtomicBoolean(false); Semaphore relinquish = new Semaphore(0); boolean goOn; TransactionClient2(ExistingDatabaseTest testCommon) throws DatabaseException { super(testCommon); init(); } private void init() { synchronized (this) { in = 0; out = 0; breakTransaction.set(false); goOn = true; } } public void tryToKeepReadTransaction() throws DatabaseException { init(); setPolicyKeep(); session.registerService(TransactionPolicySupport.class, this); startReadTransaction(); } public void tryToKeepWriteTransaction() throws DatabaseException { init(); setPolicyKeep(); session.registerService(TransactionPolicySupport.class, this); // startWriteTransaction(); this.tryToKeepWrite = new Thread(new TryToKeepWrite(), "TryToKeepWrite"); this.tryToKeepWrite.start(); } @Override public boolean holdOnToTransactionAfterRead() { return true; } @Override public boolean holdOnToTransactionAfterCancel() { return true; } @Override public boolean holdOnToTransactionAfterCommit() { return true; } @Override public void onRelinquish() { freeTransactionSync(); } @Override public void onRelinquishDone() { relinquish.release(); } @Override public void onRelinquishError() { relinquish.release(); init(); try { super.close(); } catch (DatabaseException e) { System.out.println("OnRelinquishError: " + e.getMessage() + "."); } System.exit(-1); } @Override public void close() throws DatabaseException { goOn = false; freeTransactionSync(); // This HACK gives time for implementation to remove // the asynchronous call from bookkeeping. At this point // one asynchronous call might still be active. try { session.syncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { } }); } catch (DatabaseException e) { System.out.println("Empty sync write failed: " + e.getMessage()); } super.close(); } void freeTransactionSync() { breakTransaction.set(true); synchronized (this) { while (in > out) { this.notifyAll(); try { TransactionClient2.this.wait(); } catch (InterruptedException e) { System.out.println("Wait was interruped: " + e); } } } } private void startReadTransaction() throws DatabaseException { session.asyncRequest(new Read() { @Override public Integer perform(ReadGraph graph) throws DatabaseException { synchronized (TransactionClient2.this) { TransactionClient2.this.transactionIn(); if (DEBUG) System.out.println("DEBUG: Read request begin ok, count= " + in); while (TransactionClient2.this.transactionKeep()) { try { TransactionClient2.this.wait(); } catch (InterruptedException e) { if (DEBUG) System.out.println("DEBUG: read request wait interrupted: " + e.getMessage()); } } } return in; } }, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, Integer result) { TransactionClient2.this.transactionOut(); if (DEBUG) System.out.println("DEBUG: Read request end ok, count= " + result); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { TransactionClient2.this.transactionOut(); if (DEBUG) System.out.println("DEBUG: read request end failed: " + throwable.getMessage()); } }); synchronized (this) { while (in <= out) { try { TransactionClient2.this.wait(); } catch (InterruptedException e) { if (DEBUG) System.out.println("DEBUG: read request sleep was interruped: " + e); } } } } synchronized boolean transactionKeep() { return !breakTransaction.get(); } synchronized int transactionIn() { ++in; this.notify(); return in; } synchronized int transactionOut() { ++out; this.notify(); return out; } class TryToKeepWrite implements Runnable { TryToKeepWrite() { } @Override public void run() { while (goOn) { try { session.syncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { synchronized (TransactionClient2.this) { TransactionClient2.this.transactionIn(); if (DEBUG) System.out.println("DEBUG: Write request begin ok, count= " + in); while (TransactionClient2.this.transactionKeep()) { try { TransactionClient2.this.wait(); } catch (InterruptedException e) { if (DEBUG) System.out.println("DEBUG: Write request wait interrupted: " + e.getMessage()); } } } } }); } catch (DatabaseException e) { System.out.println("Empty sync write failed: " + e.getMessage()); } finally { TransactionClient2.this.transactionOut(); } while (goOn) try { relinquish.acquire(); break; } catch (InterruptedException e) { if (DEBUG) System.out.println("DEBUG: Relinquish wait interrupted: " + e.getMessage()); } if (DEBUG) System.out.println("DEBUG: Write request end, count= " + out); } } } }