1 package fi.vtt.simantics.procore.internal;
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
7 import org.simantics.db.Disposable;
8 import org.simantics.db.Resource;
9 import org.simantics.db.common.utils.Logger;
10 import org.simantics.db.exception.CancelTransactionException;
11 import org.simantics.db.exception.DatabaseException;
12 import org.simantics.db.impl.graph.WriteGraphImpl;
13 import org.simantics.db.impl.internal.RandomAccessValueSupport;
14 import org.simantics.db.impl.internal.ResourceData;
15 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
16 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
17 import org.simantics.db.request.WriteTraits;
18 import org.simantics.utils.datastructures.Pair;
20 public class SessionRequestManager {
22 private static boolean DEBUG = false;
26 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
30 final SessionImplSocket session;
31 final fi.vtt.simantics.procore.internal.State transactionState;
32 final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
33 final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
35 State state = State.INIT;
37 private boolean legal(State current, State target) {
39 if(State.IDLE == current) {
41 return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
43 } else if (State.WRITE == current) {
45 return State.WRITE_UPDATE == target;
47 } else if (State.READ == current) {
49 return State.READ_UPDATE == target;
51 } else if (State.ERROR == current) {
53 return State.IDLE == target;
57 return State.IDLE == target;
63 private synchronized void setState(int thread, State state, SessionTask task) {
65 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
66 if(this.state == state) return;
68 assert(legal(this.state, state));
70 if(State.READ == state) {
72 assert(State.IDLE == this.state);
76 State oldState = this.state;
79 if(State.IDLE == oldState && State.IDLE != state) {
84 * We are now executing in new state.
85 * The methods below SHALL NOT start any task
86 * immediately. All tasks shall be scheduled.
89 if(State.READ == state) startRead(thread, (SessionRead)task);
90 else if(State.WRITE == state) startWrite(thread, task);
91 else if(State.READ == oldState) startReadUpdate(thread);
92 else if(State.WRITE == oldState) startWriteUpdate(thread);
94 // Check the need for new transaction
95 if(state == State.IDLE) {
97 closeRandomAccessValues();
99 if(!writes.isEmpty()) {
100 setState(thread, State.WRITE, writes.poll());
101 } else if (!reads.isEmpty()) {
102 setState(thread, State.READ, reads.poll());
111 public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
112 this.session = session;
113 this.transactionState = transactionState;
116 public synchronized void startRead(int thread, final SessionRead task) {
118 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
121 public void run(int thread) {
123 transactionState.startReadTransaction(thread);
125 } catch (Throwable t) {
126 Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));
127 if(task.throwable != null)
128 task.throwable.set(t);
131 if(task.notify != null)
132 task.notify.release();
140 public synchronized void startReadUpdate(int thread) {
142 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
145 public void run(int thread) {
147 session.fireFinishReadTransaction();
150 transactionState.stopReadTransaction();
151 } catch (DatabaseException e) {
161 // public synchronized void stopRead(int thread) {
164 // transactionState.stopReadTransaction();
165 // } catch (DatabaseException e) {
166 // e.printStackTrace();
171 public synchronized void startWrite(int thread, final SessionTask task) {
173 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
176 public void run(int thread) {
179 transactionState.startWriteTransaction(thread);
180 } catch (Throwable t) {
181 DatabaseException e = new DatabaseException("Write transaction could not be started", t);
182 Logger.defaultLogError(e);
193 public synchronized void startWriteUpdate(int thread) {
195 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
198 public void run(int thread) {
200 // Support for DelayedWriteRequest cancels during the
201 // read-only part of the request.
202 WriteStateBase<?> delayedState = session.delayedWriteState;
203 session.delayedWriteState = null;
204 if (delayedState != null) {
205 if (session.writeState != null)
206 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
208 if (delayedState.isExcepted()) {
209 // There can never be any changes in the cluster stream.
210 // In this case, don't bother rolling back.
211 boolean empty = session.clusterStream.reallyFlush();
213 transactionState.stopWriteTransaction(session.clusterStream);
215 throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
217 Disposable.safeDispose(session.clientChanges);
218 session.clientChanges = new ClientChangesImpl(session);
219 delayedState.finish();
223 // The session could have been terminated
224 if(!session.state.isAlive()) return;
226 WriteState<?> writeState = session.writeState;
228 assert(writeState != null);
230 WriteGraphImpl graph = writeState.getGraph();
232 if(writeState.isExcepted()) {
234 if(!(writeState.exception instanceof CancelTransactionException))
235 writeState.exception.printStackTrace();
237 transactionState.cancelWriteTransaction(graph);
241 session.handleUpdatesAndMetadata(graph);
243 // long start = System.nanoTime();
244 transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
245 // long duration = System.nanoTime() - start;
246 // System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
250 Disposable.safeDispose(session.clientChanges);
251 session.clientChanges = new ClientChangesImpl(session);
253 WriteState<?> state = session.writeState;
255 session.writeState = null;
263 // public synchronized void stopWrite(int thread) {
265 // session.clientChanges = new ClientChangesImpl(session);
267 // WriteState<?> state = session.writeState;
269 // System.err.println("D");
271 // System.err.println("E");
273 // session.writeState = null;
277 public synchronized void ceased(int thread) {
279 if(State.WRITE == state) {
281 setState(thread, State.WRITE_UPDATE, null);
283 } else if (State.WRITE_UPDATE == state) {
285 setState(thread, State.IDLE, null);
287 } else if (State.READ_UPDATE == state) {
289 setState(thread, State.IDLE, null);
291 } else if (State.READ == state) {
293 if (!reads.isEmpty()) {
295 final SessionRead read = reads.poll();
296 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
299 public void run(int thread) {
301 if(read.notify != null) read.notify.release();
308 setState(thread, State.READ_UPDATE, null);
312 } else if (State.INIT == state) {
314 assert(reads.isEmpty());
315 assert(writes.isEmpty());
316 setState(thread, State.IDLE, null);
318 } else if (State.ERROR == state) {
320 setState(thread, State.IDLE, null);
326 //throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")");
332 public synchronized void scheduleRead(final SessionRead task) {
334 assert(State.INIT != state);
336 if(State.READ == state) {
337 session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
340 public void run(int thread) {
344 if (task.notify != null) task.notify.release();
349 } else if (State.IDLE == state) {
350 setState(Integer.MIN_VALUE, State.READ, task);
357 public int lastUpdateIndex() {
358 return writes.size();
361 public synchronized void scheduleWrite(SessionTask task) {
363 scheduleWrite(task, null);
368 * @param task the task to schedule
369 * @param combine <code>true</code> or <code>false</code> to explicitly
370 * specify whether to combine the write with the previous operation
371 * or <code>null</code> to use the default logic, i.e. combine when
372 * this schedule is performed from within update processing (state ==
373 * State.WRITE_UPDATE).
375 public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
377 boolean inUpdate = state == State.WRITE_UPDATE;
379 //System.err.println("schedule write " + task);
381 assert(State.INIT != state);
382 //task.combine = combine != null ? combine : inUpdate;
383 if(State.IDLE == state) {
384 setState(Integer.MIN_VALUE, State.WRITE, task);
387 int index = lastUpdateIndex();
388 if(index == writes.size()) writes.offer(task);
389 else writes.add(index, task);
398 * Unregisters and closes all currently registered ResourceData random
399 * access value file handles. Any I/O exceptions occurring during closing
400 * are logged into {@link Logger}.
402 private void closeRandomAccessValues() {
403 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
406 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
407 for (Pair<Resource, ResourceData> value : values) {
409 value.second.binaryFile.close();
410 } catch (IOException e) {
411 Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);