]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Merge "Better emptying of trash bin"
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
6
7 import org.simantics.db.Disposable;
8 import org.simantics.db.Resource;
9 import org.simantics.db.exception.CancelTransactionException;
10 import org.simantics.db.exception.DatabaseException;
11 import org.simantics.db.impl.graph.WriteGraphImpl;
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;
13 import org.simantics.db.impl.internal.ResourceData;
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
16 import org.simantics.db.request.WriteTraits;
17 import org.simantics.utils.datastructures.Pair;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 public class SessionRequestManager {
22
23         private static final Logger LOGGER = LoggerFactory.getLogger(SessionRequestManager.class);
24
25         private static boolean DEBUG = false;
26         
27         enum State {
28                 
29                 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
30                 
31         }
32         
33         final SessionImplSocket session;
34         final fi.vtt.simantics.procore.internal.State transactionState;
35         final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
36         final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
37
38         State state = State.INIT;
39
40         private boolean legal(State current, State target) {
41                 
42                 if(State.IDLE == current) {
43                         
44                         return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
45                         
46                 } else if (State.WRITE == current) {
47                         
48                         return State.WRITE_UPDATE == target;
49
50                 } else if (State.READ == current) {
51                         
52                         return State.READ_UPDATE == target;
53
54                 } else if (State.ERROR == current) {
55                         
56                         return State.IDLE == target;
57                         
58                 } else {
59                         
60                         return State.IDLE == target;
61                         
62                 }
63                 
64         }
65         
66         private synchronized void setState(int thread, State state, SessionTask task) {
67
68                 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
69                 if(this.state == state) return;
70                 
71                 assert(legal(this.state, state));
72                 
73                 if(State.READ == state) {
74                         
75                         assert(State.IDLE == this.state);
76                         
77                 }
78                 
79                 State oldState = this.state;
80                 this.state = state;
81                 
82                 if(State.IDLE == oldState && State.IDLE != state) {
83                     session.incAsync();
84                 }
85                 
86                 /*
87                  * We are now executing in new state.
88                  * The methods below SHALL NOT start any task
89                  * immediately. All tasks shall be scheduled.
90                  * 
91                  */
92                 if(State.READ == state) startRead(thread, (SessionRead)task);
93                 else if(State.WRITE == state) startWrite(thread, task);
94                 else if(State.READ == oldState) startReadUpdate(thread);
95                 else if(State.WRITE == oldState) startWriteUpdate(thread);
96
97                 // Check the need for new transaction
98                 if(state == State.IDLE) {
99
100                         closeRandomAccessValues();
101
102                         if(!writes.isEmpty()) {
103                                 setState(thread, State.WRITE, writes.poll());
104                         } else if (!reads.isEmpty()) {
105                                 setState(thread, State.READ, reads.poll());
106                         }
107
108             session.decAsync();
109
110                 }
111                 
112         }
113         
114         public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
115                 this.session = session;
116                 this.transactionState = transactionState;
117         }
118         
119         public synchronized void startRead(int thread, final SessionRead task) {
120                 
121                 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
122
123                         @Override
124             public void run(int thread) {
125                 try {
126                     transactionState.startReadTransaction(thread);
127                     task.run(thread);
128                 } catch (Throwable t) {
129                     LOGGER.error("Read transaction could not be started", t);
130                     if(task.throwable != null)
131                         task.throwable.set(t);
132                     state = State.ERROR;
133                 } finally {
134                     if(task.notify != null)
135                         task.notify.release();
136                 }
137             }
138                         
139                 });
140                 
141         }
142         
143         public synchronized void startReadUpdate(int thread) {
144                 
145                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
146
147                         @Override
148                         public void run(int thread) {
149
150                                 session.fireFinishReadTransaction();
151
152                                 try {
153                                         transactionState.stopReadTransaction();
154                                 } catch (DatabaseException e) {
155                                         LOGGER.error("Read transaction could not be stopped", e);
156                                 }
157                                 
158                         }
159                         
160                 });
161                 
162         }
163
164         public synchronized void startWrite(int thread, final SessionTask task) {
165                 
166                 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
167
168                         @Override
169                         public void run(int thread) {
170
171                         try {
172                                 transactionState.startWriteTransaction(thread);
173                         } catch (Throwable t) {
174                             LOGGER.error("Write transaction could not be started", t);
175                             return;
176                         }
177                                 task.run(thread);
178                                 
179                         }
180                         
181                 });
182                 
183         }
184
185         public synchronized void startWriteUpdate(int thread) {
186                 
187                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
188
189                         @Override
190                         public void run(int thread) {
191
192                                 // Support for DelayedWriteRequest cancels during the
193                                 // read-only part of the request.
194                                 WriteStateBase<?> delayedState = session.delayedWriteState;
195                                 session.delayedWriteState = null;
196                                 if (delayedState != null) {
197                                         if (session.writeState != null)
198                                                 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
199
200                         if (delayedState.isExcepted()) {
201                         // There can never be any changes in the cluster stream.
202                         // In this case, don't bother rolling back.
203                             boolean empty = session.clusterStream.reallyFlush(); 
204                         assert(empty);
205                             transactionState.stopWriteTransaction(session.clusterStream);
206                         } else {
207                             throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
208                         }
209                         Disposable.safeDispose(session.clientChanges);
210                         session.clientChanges = new ClientChangesImpl(session);
211                                         delayedState.finish();
212                                         return;
213                                 }
214
215                                 // The session could have been terminated
216                                 if(!session.state.isAlive()) return;
217
218                                 WriteState<?> writeState = session.writeState;
219                                 WriteGraphImpl graph = writeState.getGraph();
220
221                                 if(writeState.isExcepted()) {
222
223                                     if(!(writeState.exception instanceof CancelTransactionException))
224                                         LOGGER.error("Write request failed", writeState.exception);
225
226                                     transactionState.cancelWriteTransaction(graph);
227
228                                 } else {
229
230                                         session.handleUpdatesAndMetadata(graph);
231
232 //                    long start = System.nanoTime();
233                                         transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
234 //                                      long duration = System.nanoTime() - start;
235 //                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
236
237                                 }
238
239                                 Disposable.safeDispose(session.clientChanges);
240                                 session.clientChanges = new ClientChangesImpl(session);
241
242                                 WriteState<?> state = session.writeState;
243                                 state.finish();
244                                 session.writeState = null;
245
246                         }
247
248                 });
249
250         }
251
252         public synchronized void ceased(int thread) {
253
254                 if(State.WRITE == state) {
255
256                         setState(thread, State.WRITE_UPDATE, null);
257                         
258                 } else if (State.WRITE_UPDATE == state) {
259
260                         setState(thread, State.IDLE, null);
261
262                 } else if (State.READ_UPDATE == state) {
263
264                         setState(thread, State.IDLE, null);
265                         
266                 } else if (State.READ == state) {
267
268                         if (!reads.isEmpty()) {
269
270                                 final SessionRead read = reads.poll();
271                                 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
272
273                                         @Override
274                                         public void run(int thread) {
275                                                 read.run(thread);
276                                                 if(read.notify != null) read.notify.release();
277                                         }
278                                         
279                                 });     
280                                 
281                         } else {
282                                 
283                                 setState(thread, State.READ_UPDATE, null);
284                                 
285                         }
286
287                 } else if (State.INIT == state) {
288                         
289                         assert(reads.isEmpty());
290                         assert(writes.isEmpty());
291                         setState(thread, State.IDLE, null);
292                         
293                 } else if (State.ERROR == state) {
294                         
295                         setState(thread, State.IDLE, null);
296                         
297                 } else {
298
299                     throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); 
300                         
301                 }
302                 
303         }
304         
305     public synchronized void scheduleRead(final SessionRead task) {
306
307                 assert(State.INIT != state);
308                 
309                 if(State.READ == state) {
310                         session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
311
312                                 @Override
313                                 public void run(int thread) {
314                                         try {
315                                                 task.run(thread);
316                                         } finally {
317                                                 if (task.notify != null) task.notify.release();
318                                         }
319                                 }
320                                 
321                         });
322                 } else if (State.IDLE == state) {
323                         setState(Integer.MIN_VALUE, State.READ, task);
324                 } else {
325                         reads.offer(task);
326                 }
327                 
328         }
329
330     public int lastUpdateIndex() {
331         return writes.size();
332     }
333     
334         public synchronized void scheduleWrite(SessionTask task) {
335
336                 scheduleWrite(task, null);
337
338         }
339
340         /**
341          * @param task the task to schedule 
342          * @param combine <code>true</code> or <code>false</code> to explicitly
343          *        specify whether to combine the write with the previous operation
344          *        or <code>null</code> to use the default logic, i.e. combine when
345          *        this schedule is performed from within update processing (state ==
346          *        State.WRITE_UPDATE).
347          */
348         public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
349
350                 boolean inUpdate = state == State.WRITE_UPDATE;
351
352                 assert(State.INIT != state);
353                 //task.combine = combine != null ? combine : inUpdate;
354                 if(State.IDLE == state) {
355                         setState(Integer.MIN_VALUE, State.WRITE, task);
356                 } else {
357                         if(inUpdate) {
358                                 int index = lastUpdateIndex();
359                                 if(index == writes.size()) writes.offer(task);
360                                 else writes.add(index, task);
361                         } else {
362                                 writes.offer(task);
363                         }
364                 }
365
366         }
367
368         /**
369          * Unregisters and closes all currently registered ResourceData random
370          * access value file handles. Any I/O exceptions occurring during closing
371          * are logged into {@link Logger}.
372          */
373         private void closeRandomAccessValues() {
374                 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
375                 if (ravs == null)
376                         return;
377                 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
378                 for (Pair<Resource, ResourceData> value : values) {
379                         try {
380                                 value.second.binaryFile.close();
381                         } catch (IOException e) {
382                                 LOGGER.error("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
383                         }
384                 }
385         }
386
387 }