]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java
Fixed Acorn deadlock during snapshotting
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / OperationQueue.java
1 package org.simantics.acorn;
2
3 import java.util.ArrayList;
4 import java.util.LinkedList;
5 import java.util.List;
6 import java.util.TreeMap;
7
8 import org.simantics.acorn.MainProgram.MainProgramRunnable;
9 import org.simantics.acorn.exception.AcornAccessVerificationException;
10 import org.simantics.acorn.exception.IllegalAcornStateException;
11 import org.simantics.acorn.lru.ClusterStreamChunk;
12 import org.simantics.acorn.lru.ClusterUpdateOperation;
13 import org.simantics.db.service.ClusterUID;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 /*
18  * Provides work for MainProgram thread
19  * -cluster stream updates from client
20  * -tasks from other threads
21  *
22  * Synchronized via monitor.
23  *
24  */
25 class OperationQueue {
26
27         private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
28
29         private final MainProgram mainProgram;
30
31         private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
32
33         private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
34
35         private long currentChangeSetId = -1;
36         private int nextChunkId = 0;
37
38         OperationQueue(MainProgram mainProgram) {
39                 this.mainProgram = mainProgram;
40         }
41
42         /*
43          * Called by other Acorn threads than MainProgram thread
44          */
45         synchronized void scheduleTask(MainProgramRunnable task) {
46
47                 mainProgram.assertNoMainProgramThread();
48
49                 tasks.add(task);
50
51                 // The only thread waiting for this is MainProgram thread
52                 notify();
53
54         }
55
56         /*
57          * Called by DB client write threads
58          */
59         synchronized void startTransaction(long id) {
60                 currentChangeSetId = id;
61                 nextChunkId = 0;
62         }
63
64         /*
65          * Called by DB client write threads
66          */
67         synchronized ClusterStreamChunk commitLast() {
68                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
69                 if(last != null) {
70                         last.commit();
71                         notifyAll();
72                 }
73                 return last;
74         }
75
76         /*
77          * Called by DB client write threads
78          */
79         synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
80
81                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
82                 if(last == null || last.isCommitted()) {
83                         String id = "" + currentChangeSetId + "-" + nextChunkId++;
84                         last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
85                         operations.add(last);
86                 }
87
88                 String chunkId = last.getKey();
89                 int chunkOffset = last.operations.size();
90                 operation.scheduled(chunkId + "." + chunkOffset);
91
92                 last.addOperation(operation);
93
94                 // The only thread waiting for this is MainProgram thread
95                 notify();
96
97         }
98
99         /*
100          * Called by MainProgram thread
101          */
102         synchronized void pumpTasks(List<MainProgramRunnable> todo) {
103
104                 mainProgram.assertMainProgramThread();
105
106                 todo.addAll(tasks);
107                 tasks.clear();
108
109         }
110
111         /*
112          * Called by MainProgram thread
113          */
114         synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
115
116                 mainProgram.assertMainProgramThread();
117
118                 while(!operations.isEmpty() && updates.size() < 100) {
119
120                         ClusterStreamChunk chunk = operations.pollFirst();
121
122                         for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
123                                 ClusterUpdateOperation o = chunk.operations.get(i);
124                                 ClusterUID uid = o.uid;
125                                 List<ClusterUpdateOperation> ops = updates.get(uid);
126                                 if(ops == null) {
127                                         ops = new ArrayList<ClusterUpdateOperation>();
128                                         updates.put(uid, ops);
129                                 }
130                                 ops.add(o);
131                         }
132
133                         chunk.nextToProcess = chunk.operations.size();
134
135                         if(!chunk.isCommitted()) {
136                                 assert(operations.isEmpty());
137                                 operations.add(chunk);
138                                 break;
139                         }
140
141                 }
142
143         }
144
145         /*
146          * Called by MainProgram thread
147          */
148         synchronized boolean isEmpty() {
149
150                 mainProgram.assertMainProgramThread();
151
152                 return operations.isEmpty();
153
154         }
155
156         /*
157          * Wake up when new operations are scheduled or the last operation is committed
158          * Called by MainProgram thread
159          */
160         synchronized void waitFor() {
161
162                 mainProgram.assertMainProgramThread();
163
164                 // One last check within the monitor 
165                 if(!operations.isEmpty() || !tasks.isEmpty()) return;
166
167                 try {
168                         wait(5000);
169                 } catch (InterruptedException e) {
170                         LOGGER.error("Unexpected interruption", e);
171                 }
172
173         }
174
175 }