1 package fi.vtt.simantics.procore.internal;
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
7 import org.simantics.db.Disposable;
8 import org.simantics.db.Resource;
9 import org.simantics.db.exception.CancelTransactionException;
10 import org.simantics.db.exception.DatabaseException;
11 import org.simantics.db.impl.graph.WriteGraphImpl;
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;
13 import org.simantics.db.impl.internal.ResourceData;
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
16 import org.simantics.db.request.WriteTraits;
17 import org.simantics.utils.datastructures.Pair;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 public class SessionRequestManager {
23 private static final Logger LOGGER = LoggerFactory.getLogger(SessionRequestManager.class);
25 private static boolean DEBUG = false;
29 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
33 final SessionImplSocket session;
34 final fi.vtt.simantics.procore.internal.State transactionState;
35 final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
36 final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
38 State state = State.INIT;
40 private boolean legal(State current, State target) {
42 if(State.IDLE == current) {
44 return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
46 } else if (State.WRITE == current) {
48 return State.WRITE_UPDATE == target;
50 } else if (State.READ == current) {
52 return State.READ_UPDATE == target;
54 } else if (State.ERROR == current) {
56 return State.IDLE == target;
60 return State.IDLE == target;
66 private synchronized void setState(int thread, State state, SessionTask task) {
68 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
69 if(this.state == state) return;
71 assert(legal(this.state, state));
73 if(State.READ == state) {
75 assert(State.IDLE == this.state);
79 State oldState = this.state;
82 if(State.IDLE == oldState && State.IDLE != state) {
87 * We are now executing in new state.
88 * The methods below SHALL NOT start any task
89 * immediately. All tasks shall be scheduled.
92 if(State.READ == state) startRead(thread, (SessionRead)task);
93 else if(State.WRITE == state) startWrite(thread, task);
94 else if(State.READ == oldState) startReadUpdate(thread);
95 else if(State.WRITE == oldState) startWriteUpdate(thread);
97 // Check the need for new transaction
98 if(state == State.IDLE) {
100 closeRandomAccessValues();
102 if(!writes.isEmpty()) {
103 setState(thread, State.WRITE, writes.poll());
104 } else if (!reads.isEmpty()) {
105 setState(thread, State.READ, reads.poll());
114 public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
115 this.session = session;
116 this.transactionState = transactionState;
119 public synchronized void startRead(int thread, final SessionRead task) {
121 session.queryProvider2.scheduleNow(new SessionTask(null) {
124 public void run0(int thread) {
126 transactionState.startReadTransaction(thread);
128 } catch (Throwable t) {
129 LOGGER.error("Read transaction could not be started", t);
130 if(task.throwable != null)
131 task.throwable.set(t);
134 if(task.notify != null)
135 task.notify.release();
143 public synchronized void startReadUpdate(int thread) {
145 session.queryProvider2.scheduleNow(new SessionTask(null) {
148 public void run0(int thread) {
150 session.fireFinishReadTransaction();
153 transactionState.stopReadTransaction();
154 } catch (DatabaseException e) {
155 LOGGER.error("Read transaction could not be stopped", e);
164 public synchronized void startWrite(int thread, final SessionTask task) {
166 session.queryProvider2.scheduleNow(new SessionTask(null) {
169 public void run0(int thread) {
172 transactionState.startWriteTransaction(thread);
173 } catch (Throwable t) {
174 LOGGER.error("Write transaction could not be started", t);
185 public synchronized void startWriteUpdate(int thread) {
187 session.queryProvider2.scheduleNow(new SessionTask(null) {
190 public void run0(int thread) {
192 // Support for DelayedWriteRequest cancels during the
193 // read-only part of the request.
194 WriteStateBase<?> delayedState = session.delayedWriteState;
195 session.delayedWriteState = null;
196 if (delayedState != null) {
197 if (session.writeState != null)
198 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
200 if (delayedState.isExcepted()) {
201 // There can never be any changes in the cluster stream.
202 // In this case, don't bother rolling back.
203 boolean empty = session.clusterStream.reallyFlush();
205 transactionState.stopWriteTransaction(session.clusterStream);
207 throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
209 Disposable.safeDispose(session.clientChanges);
210 session.clientChanges = new ClientChangesImpl(session);
211 delayedState.finish();
215 // The session could have been terminated
216 if(!session.state.isAlive()) return;
218 WriteState<?> writeState = session.writeState;
220 assert(writeState != null);
222 WriteGraphImpl graph = writeState.getGraph();
224 if(writeState.isExcepted()) {
226 if(!(writeState.exception instanceof CancelTransactionException))
227 LOGGER.error("Write request failed", writeState.exception);
229 transactionState.cancelWriteTransaction(graph);
233 session.handleUpdatesAndMetadata(graph);
235 // long start = System.nanoTime();
236 transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
237 // long duration = System.nanoTime() - start;
238 // System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
242 Disposable.safeDispose(session.clientChanges);
243 session.clientChanges = new ClientChangesImpl(session);
245 WriteState<?> state = session.writeState;
247 session.writeState = null;
255 public synchronized void ceased(int thread) {
257 if(State.WRITE == state) {
259 setState(thread, State.WRITE_UPDATE, null);
261 } else if (State.WRITE_UPDATE == state) {
263 setState(thread, State.IDLE, null);
265 } else if (State.READ_UPDATE == state) {
267 setState(thread, State.IDLE, null);
269 } else if (State.READ == state) {
271 if (!reads.isEmpty()) {
273 final SessionRead read = reads.poll();
274 session.queryProvider2.scheduleNow(new SessionTask(null) {
277 public void run0(int thread) {
279 if(read.notify != null) read.notify.release();
286 setState(thread, State.READ_UPDATE, null);
290 } else if (State.INIT == state) {
292 assert(reads.isEmpty());
293 assert(writes.isEmpty());
294 setState(thread, State.IDLE, null);
296 } else if (State.ERROR == state) {
298 setState(thread, State.IDLE, null);
304 //throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")");
310 public synchronized void scheduleRead(final SessionRead task) {
312 assert(State.INIT != state);
314 if(State.READ == state) {
315 session.queryProvider2.scheduleNow(new SessionTask(null) {
318 public void run0(int thread) {
322 if (task.notify != null) task.notify.release();
327 } else if (State.IDLE == state) {
328 setState(Integer.MIN_VALUE, State.READ, task);
335 public int lastUpdateIndex() {
336 return writes.size();
339 public synchronized void scheduleWrite(SessionTask task) {
341 scheduleWrite(task, null);
346 * @param task the task to schedule
347 * @param combine <code>true</code> or <code>false</code> to explicitly
348 * specify whether to combine the write with the previous operation
349 * or <code>null</code> to use the default logic, i.e. combine when
350 * this schedule is performed from within update processing (state ==
351 * State.WRITE_UPDATE).
353 public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
355 boolean inUpdate = state == State.WRITE_UPDATE;
357 //System.err.println("schedule write " + task);
359 assert(State.INIT != state);
360 //task.combine = combine != null ? combine : inUpdate;
361 if(State.IDLE == state) {
362 setState(Integer.MIN_VALUE, State.WRITE, task);
365 int index = lastUpdateIndex();
366 if(index == writes.size()) writes.offer(task);
367 else writes.add(index, task);
376 * Unregisters and closes all currently registered ResourceData random
377 * access value file handles. Any I/O exceptions occurring during closing
378 * are logged into {@link Logger}.
380 private void closeRandomAccessValues() {
381 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
384 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
385 for (Pair<Resource, ResourceData> value : values) {
387 value.second.binaryFile.close();
388 } catch (IOException e) {
389 LOGGER.error("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);