1 package fi.vtt.simantics.procore.internal;
\r
3 import java.io.IOException;
\r
4 import java.util.Collection;
\r
5 import java.util.LinkedList;
\r
7 import org.simantics.db.Resource;
\r
8 import org.simantics.db.common.utils.Logger;
\r
9 import org.simantics.db.exception.CancelTransactionException;
\r
10 import org.simantics.db.exception.DatabaseException;
\r
11 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;
\r
13 import org.simantics.db.impl.internal.ResourceData;
\r
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
\r
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
\r
16 import org.simantics.db.request.WriteTraits;
\r
17 import org.simantics.utils.datastructures.Pair;
\r
19 public class SessionRequestManager {
\r
21 private static boolean DEBUG = false;
\r
25 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
\r
29 final SessionImplSocket session;
\r
30 final fi.vtt.simantics.procore.internal.State transactionState;
\r
31 final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
\r
32 final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
\r
34 State state = State.INIT;
\r
36 private boolean legal(State current, State target) {
\r
38 if(State.IDLE == current) {
\r
40 return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
\r
42 } else if (State.WRITE == current) {
\r
44 return State.WRITE_UPDATE == target;
\r
46 } else if (State.READ == current) {
\r
48 return State.READ_UPDATE == target;
\r
50 } else if (State.ERROR == current) {
\r
52 return State.IDLE == target;
\r
56 return State.IDLE == target;
\r
62 private synchronized void setState(int thread, State state, SessionTask task) {
\r
64 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
\r
65 if(this.state == state) return;
\r
67 assert(legal(this.state, state));
\r
69 if(State.READ == state) {
\r
71 assert(State.IDLE == this.state);
\r
75 State oldState = this.state;
\r
78 if(State.IDLE == oldState && State.IDLE != state) {
\r
83 * We are now executing in new state.
\r
84 * The methods below SHALL NOT start any task
\r
85 * immediately. All tasks shall be scheduled.
\r
88 if(State.READ == state) startRead(thread, (SessionRead)task);
\r
89 else if(State.WRITE == state) startWrite(thread, task);
\r
90 else if(State.READ == oldState) startReadUpdate(thread);
\r
91 else if(State.WRITE == oldState) startWriteUpdate(thread);
\r
93 // Check the need for new transaction
\r
94 if(state == State.IDLE) {
\r
96 closeRandomAccessValues();
\r
98 if(!writes.isEmpty()) {
\r
99 setState(thread, State.WRITE, writes.poll());
\r
100 } else if (!reads.isEmpty()) {
\r
101 setState(thread, State.READ, reads.poll());
\r
104 session.decAsync();
\r
110 public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
\r
111 this.session = session;
\r
112 this.transactionState = transactionState;
\r
115 public synchronized void startRead(int thread, final SessionRead task) {
\r
117 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
\r
120 public void run(int thread) {
\r
122 transactionState.startReadTransaction(thread);
\r
124 } catch (Throwable t) {
\r
125 Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));
\r
126 if(task.throwable != null)
\r
127 task.throwable.set(t);
\r
128 state = State.ERROR;
\r
130 if(task.notify != null)
\r
131 task.notify.release();
\r
139 public synchronized void startReadUpdate(int thread) {
\r
141 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
\r
144 public void run(int thread) {
\r
146 session.fireFinishReadTransaction();
\r
149 transactionState.stopReadTransaction();
\r
150 } catch (DatabaseException e) {
\r
151 e.printStackTrace();
\r
160 // public synchronized void stopRead(int thread) {
\r
163 // transactionState.stopReadTransaction();
\r
164 // } catch (DatabaseException e) {
\r
165 // e.printStackTrace();
\r
170 public synchronized void startWrite(int thread, final SessionTask task) {
\r
172 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
\r
175 public void run(int thread) {
\r
178 transactionState.startWriteTransaction(thread);
\r
179 } catch (Throwable t) {
\r
180 DatabaseException e = new DatabaseException("Write transaction could not be started", t);
\r
181 Logger.defaultLogError(e);
\r
192 public synchronized void startWriteUpdate(int thread) {
\r
194 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
\r
197 public void run(int thread) {
\r
199 // Support for DelayedWriteRequest cancels during the
\r
200 // read-only part of the request.
\r
201 WriteStateBase<?> delayedState = session.delayedWriteState;
\r
202 session.delayedWriteState = null;
\r
203 if (delayedState != null) {
\r
204 if (session.writeState != null)
\r
205 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
\r
207 if (delayedState.isExcepted()) {
\r
208 // There can never be any changes in the cluster stream.
\r
209 // In this case, don't bother rolling back.
\r
210 boolean empty = session.clusterStream.reallyFlush();
\r
212 transactionState.stopWriteTransaction(session.clusterStream);
\r
214 throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
\r
216 session.clientChanges = new ClientChangesImpl(session);
\r
217 delayedState.finish();
\r
221 // The session could have been terminated
\r
222 if(!session.state.isAlive()) return;
\r
224 WriteState<?> writeState = session.writeState;
\r
225 WriteGraphImpl graph = writeState.getGraph();
\r
227 if(writeState.isExcepted()) {
\r
229 if(!(writeState.exception instanceof CancelTransactionException))
\r
230 writeState.exception.printStackTrace();
\r
232 transactionState.cancelWriteTransaction(graph);
\r
236 session.handleUpdatesAndMetadata(graph);
\r
238 // long start = System.nanoTime();
\r
239 transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
\r
240 // long duration = System.nanoTime() - start;
\r
241 // System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
\r
245 session.clientChanges = new ClientChangesImpl(session);
\r
247 WriteState<?> state = session.writeState;
\r
249 session.writeState = null;
\r
257 // public synchronized void stopWrite(int thread) {
\r
259 // session.clientChanges = new ClientChangesImpl(session);
\r
261 // WriteState<?> state = session.writeState;
\r
263 // System.err.println("D");
\r
265 // System.err.println("E");
\r
267 // session.writeState = null;
\r
271 public synchronized void ceased(int thread) {
\r
273 if(State.WRITE == state) {
\r
275 setState(thread, State.WRITE_UPDATE, null);
\r
277 } else if (State.WRITE_UPDATE == state) {
\r
279 setState(thread, State.IDLE, null);
\r
281 } else if (State.READ_UPDATE == state) {
\r
283 setState(thread, State.IDLE, null);
\r
285 } else if (State.READ == state) {
\r
287 if (!reads.isEmpty()) {
\r
289 final SessionRead read = reads.poll();
\r
290 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
\r
293 public void run(int thread) {
\r
295 if(read.notify != null) read.notify.release();
\r
302 setState(thread, State.READ_UPDATE, null);
\r
306 } else if (State.INIT == state) {
\r
308 assert(reads.isEmpty());
\r
309 assert(writes.isEmpty());
\r
310 setState(thread, State.IDLE, null);
\r
312 } else if (State.ERROR == state) {
\r
314 setState(thread, State.IDLE, null);
\r
318 throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")");
\r
324 public synchronized void scheduleRead(final SessionRead task) {
\r
326 assert(State.INIT != state);
\r
328 if(State.READ == state) {
\r
329 session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
\r
332 public void run(int thread) {
\r
336 if (task.notify != null) task.notify.release();
\r
341 } else if (State.IDLE == state) {
\r
342 setState(Integer.MIN_VALUE, State.READ, task);
\r
349 public int lastUpdateIndex() {
\r
350 return writes.size();
\r
353 public synchronized void scheduleWrite(SessionTask task) {
\r
355 scheduleWrite(task, null);
\r
360 * @param task the task to schedule
\r
361 * @param combine <code>true</code> or <code>false</code> to explicitly
\r
362 * specify whether to combine the write with the previous operation
\r
363 * or <code>null</code> to use the default logic, i.e. combine when
\r
364 * this schedule is performed from within update processing (state ==
\r
365 * State.WRITE_UPDATE).
\r
367 public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
\r
369 boolean inUpdate = state == State.WRITE_UPDATE;
\r
371 assert(State.INIT != state);
\r
372 //task.combine = combine != null ? combine : inUpdate;
\r
373 if(State.IDLE == state) {
\r
374 setState(Integer.MIN_VALUE, State.WRITE, task);
\r
377 int index = lastUpdateIndex();
\r
378 if(index == writes.size()) writes.offer(task);
\r
379 else writes.add(index, task);
\r
381 writes.offer(task);
\r
388 * Unregisters and closes all currently registered ResourceData random
\r
389 * access value file handles. Any I/O exceptions occurring during closing
\r
390 * are logged into {@link Logger}.
\r
392 private void closeRandomAccessValues() {
\r
393 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
\r
396 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
\r
397 for (Pair<Resource, ResourceData> value : values) {
\r
399 value.second.binaryFile.close();
\r
400 } catch (IOException e) {
\r
401 Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
\r