]> gerrit.simantics Code Review - simantics/platform.git/blob - tests/org.simantics.db.tests/src/org/simantics/db/tests/api/request/misc/RequestQueuingTest.java
Multiple reader thread support for db client
[simantics/platform.git] / tests / org.simantics.db.tests / src / org / simantics / db / tests / api / request / misc / RequestQueuingTest.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.tests.api.request.misc;
13
14 import java.util.concurrent.atomic.AtomicInteger;
15
16 import org.junit.Test;
17 import org.simantics.db.AsyncReadGraph;
18 import org.simantics.db.ReadGraph;
19 import org.simantics.db.Session;
20 import org.simantics.db.common.request.AsyncReadRequest;
21 import org.simantics.db.exception.DatabaseException;
22 import org.simantics.db.procedure.AsyncListener;
23 import org.simantics.db.procedure.AsyncProcedure;
24 import org.simantics.db.request.AsyncRead;
25 import org.simantics.db.request.Read;
26 import org.simantics.db.service.QueryControl;
27 import org.simantics.db.service.QueryControl.ControlProcedure;
28 import org.simantics.db.testing.base.ExistingDatabaseTest;
29
30 public class RequestQueuingTest extends ExistingDatabaseTest {
31         
32         final private static int LOOPS = 10;
33         
34     private Object RESULT = new Object();
35     private DatabaseException EXCEPTION = new DatabaseException();
36     
37     private AtomicInteger results = new AtomicInteger(0);
38     private AtomicInteger exceptions = new AtomicInteger(0);
39         
40     class A implements AsyncRead<Object> {
41         
42         private void perform(AsyncReadGraph graph, final AsyncProcedure<Object> procedure, final int todo) {
43
44                 if(todo == 0) {
45                         
46                                 procedure.execute(graph, RESULT);
47                                 
48                 } else {
49
50                         final QueryControl control = graph.getService(QueryControl.class);
51                         final int current = control.getGraphThread(graph);
52                         final int next = (current + 1) % control.getAmountOfQueryThreads(); 
53                         control.schedule(graph, next, new ControlProcedure() {
54
55                                 public void execute(AsyncReadGraph graph) {
56                                         perform(graph, procedure, todo-1);
57                                 }
58
59                         });
60                         
61                 }
62                 
63                 
64         }
65
66                 @Override
67                 public void perform(AsyncReadGraph graph, final AsyncProcedure<Object> procedure) {
68                         perform(graph, procedure, 5);
69                 }
70
71         @Override
72             public int threadHash() {
73                 return hashCode();
74             }
75
76                 @Override
77                 public int getFlags() {
78                         return 0;
79                 }
80         
81     }
82
83     class B implements Read<Object> {
84
85                 @Override
86                 public Object perform(ReadGraph graph) throws DatabaseException {
87                         
88                         return graph.syncRequest(new A());
89                         
90                 }
91         
92     }
93     
94     class A2 implements AsyncRead<Object> {
95
96         private void perform(AsyncReadGraph graph, final AsyncProcedure<Object> procedure, final int todo) {
97
98                 if(todo == 0) {
99                         
100                                 procedure.exception(graph, EXCEPTION);
101                                 
102                 } else {
103
104                         final QueryControl control = graph.getService(QueryControl.class);
105                         final int current = control.getGraphThread(graph);
106                         final int next = (current + 1) % control.getAmountOfQueryThreads(); 
107                         control.schedule(graph, next, new ControlProcedure() {
108
109                                 public void execute(AsyncReadGraph graph) {
110                                         perform(graph, procedure, todo-1);
111                                 }
112
113                         });
114                         
115                 }
116                 
117                 
118         }
119
120                 @Override
121                 public void perform(AsyncReadGraph graph, final AsyncProcedure<Object> procedure) {
122                         perform(graph, procedure, 5);
123                 }
124
125         @Override
126             public int threadHash() {
127                 return hashCode();
128             }
129
130                 @Override
131                 public int getFlags() {
132                         return 0;
133                 }
134         
135     }
136
137     class B2 implements Read<Object> {
138
139                 @Override
140                 public Object perform(ReadGraph graph) throws DatabaseException {
141                         
142                         return graph.syncRequest(new A2());
143                         
144                 }
145         
146     }
147
148     class C implements AsyncListener<Object> {
149
150                 @Override
151                 public void execute(AsyncReadGraph graph, Object result) {
152                         if(RESULT == result) results.incrementAndGet();
153                 }
154
155                 @Override
156                 public void exception(AsyncReadGraph graph, Throwable throwable) {
157                         if(EXCEPTION == throwable) exceptions.incrementAndGet();
158                 }
159
160                 @Override
161                 public boolean isDisposed() {
162                         return false;
163                 }
164         
165     }
166     
167         @Test
168         public void test() throws Exception {
169                 
170                 Session session = getSession();
171                 
172                 session.syncRequest(new AsyncReadRequest() {
173
174                         @Override
175                         public void run(AsyncReadGraph graph) {
176
177                                 A a = new A();
178                                 B b = new B();
179                                 A2 a2 = new A2();
180                                 B2 b2 = new B2();
181                                 for(int i=0;i<LOOPS;i++) graph.asyncRequest(a, new C());
182                                 for(int i=0;i<LOOPS;i++) graph.asyncRequest(a2, new C());
183                                 for(int i=0;i<LOOPS;i++) graph.asyncRequest(b, new C());
184                                 for(int i=0;i<LOOPS;i++) graph.asyncRequest(b2, new C());
185
186                         }
187
188                 });
189                 
190                 if(results.get() != 2*LOOPS) fail("Incorrect amount of reported results (expected " + 2*LOOPS + ", got " + results.get() + ")");
191                 if(exceptions.get() != 2*LOOPS) fail("Incorrect amount of reported exceptions (expected " + 2*LOOPS + ", got " + exceptions.get() + ")");
192                 
193         }
194
195 }