1 package org.simantics.acorn;
3 import java.util.ArrayList;
4 import java.util.LinkedList;
6 import java.util.TreeMap;
8 import org.simantics.acorn.MainProgram.MainProgramRunnable;
9 import org.simantics.acorn.exception.AcornAccessVerificationException;
10 import org.simantics.acorn.exception.IllegalAcornStateException;
11 import org.simantics.acorn.lru.ClusterStreamChunk;
12 import org.simantics.acorn.lru.ClusterUpdateOperation;
13 import org.simantics.db.service.ClusterUID;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
18 * Provides work for MainProgram thread
19 * -cluster stream updates from client
20 * -tasks from other threads
22 * Synchronized via monitor.
25 class OperationQueue {
27 private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
29 private final MainProgram mainProgram;
31 private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
33 private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
35 private long currentChangeSetId = -1;
36 private int nextChunkId = 0;
38 OperationQueue(MainProgram mainProgram) {
39 this.mainProgram = mainProgram;
43 * Called by other Acorn threads than MainProgram thread
45 synchronized void scheduleTask(MainProgramRunnable task) {
47 mainProgram.assertNoMainProgramThread();
51 // The only thread waiting for this is MainProgram thread
57 * Called by DB client write threads
59 synchronized void startTransaction(long id) {
60 currentChangeSetId = id;
65 * Called by DB client write threads
67 synchronized ClusterStreamChunk commitLast() {
68 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
77 * Called by DB client write threads
79 synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
81 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
82 if(last == null || last.isCommitted()) {
83 String id = "" + currentChangeSetId + "-" + nextChunkId++;
84 last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
88 String chunkId = last.getKey();
89 int chunkOffset = last.operations.size();
90 operation.scheduled(chunkId + "." + chunkOffset);
92 last.addOperation(operation);
94 // The only thread waiting for this is MainProgram thread
100 * Called by MainProgram thread
102 synchronized void pumpTasks(List<MainProgramRunnable> todo) {
104 mainProgram.assertMainProgramThread();
112 * Called by MainProgram thread
114 synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
116 mainProgram.assertMainProgramThread();
118 while(!operations.isEmpty() && updates.size() < 100) {
120 ClusterStreamChunk chunk = operations.pollFirst();
122 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
123 ClusterUpdateOperation o = chunk.operations.get(i);
124 ClusterUID uid = o.uid;
125 List<ClusterUpdateOperation> ops = updates.get(uid);
127 ops = new ArrayList<ClusterUpdateOperation>();
128 updates.put(uid, ops);
133 chunk.nextToProcess = chunk.operations.size();
135 if(!chunk.isCommitted()) {
136 assert(operations.isEmpty());
137 operations.add(chunk);
146 * Called by MainProgram thread
148 synchronized boolean isEmpty() {
150 mainProgram.assertMainProgramThread();
152 return operations.isEmpty();
157 * Wake up when new operations are scheduled or the last operation is committed
158 * Called by MainProgram thread
160 synchronized long waitFor() {
162 mainProgram.assertMainProgramThread();
164 // One last check within the monitor
165 if(!operations.isEmpty() || !tasks.isEmpty()) return 0;
167 long start = System.nanoTime();
170 } catch (InterruptedException e) {
171 LOGGER.error("Unexpected interruption", e);
173 return System.nanoTime() - start;