/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.tests.api.request.misc; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ReadGraph; import org.simantics.db.Session; import org.simantics.db.common.request.AsyncReadRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.AsyncListener; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.AsyncRead; import org.simantics.db.request.Read; import org.simantics.db.service.QueryControl; import org.simantics.db.service.QueryControl.ControlProcedure; import org.simantics.db.testing.base.ExistingDatabaseTest; public class RequestQueuingTest extends ExistingDatabaseTest { final private static int LOOPS = 10; private Object RESULT = new Object(); private DatabaseException EXCEPTION = new DatabaseException(); private AtomicInteger results = new AtomicInteger(0); private AtomicInteger exceptions = new AtomicInteger(0); class A implements AsyncRead { private void perform(AsyncReadGraph graph, final AsyncProcedure procedure, final int todo) { if(todo == 0) { procedure.execute(graph, RESULT); } else { final QueryControl control = graph.getService(QueryControl.class); final int current = control.getGraphThread(graph); final int next = (current + 1) % control.getAmountOfQueryThreads(); control.schedule(graph, next, new ControlProcedure() { public void execute(AsyncReadGraph graph) { perform(graph, procedure, todo-1); } }); } } @Override public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { perform(graph, procedure, 5); } @Override public int threadHash() { return hashCode(); } @Override public int getFlags() { return 0; } } class B implements Read { @Override public Object perform(ReadGraph graph) throws DatabaseException { return graph.syncRequest(new A()); } } class A2 implements AsyncRead { private void perform(AsyncReadGraph graph, final AsyncProcedure procedure, final int todo) { if(todo == 0) { procedure.exception(graph, EXCEPTION); } else { final QueryControl control = graph.getService(QueryControl.class); final int current = control.getGraphThread(graph); final int next = (current + 1) % control.getAmountOfQueryThreads(); control.schedule(graph, next, new ControlProcedure() { public void execute(AsyncReadGraph graph) { perform(graph, procedure, todo-1); } }); } } @Override public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { perform(graph, procedure, 5); } @Override public int threadHash() { return hashCode(); } @Override public int getFlags() { return 0; } } class B2 implements Read { @Override public Object perform(ReadGraph graph) throws DatabaseException { return graph.syncRequest(new A2()); } } class C implements AsyncListener { @Override public void execute(AsyncReadGraph graph, Object result) { if(RESULT == result) results.incrementAndGet(); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { if(EXCEPTION == throwable) exceptions.incrementAndGet(); } @Override public boolean isDisposed() { return false; } } @Test public void test() throws Exception { Session session = getSession(); session.syncRequest(new AsyncReadRequest() { @Override public void run(AsyncReadGraph graph) { A a = new A(); B b = new B(); A2 a2 = new A2(); B2 b2 = new B2(); for(int i=0;i