+package org.simantics.acorn;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Provides work for MainProgram thread
+ * -cluster stream updates from client
+ * -tasks from other threads
+ *
+ * Synchronized via monitor.
+ *
+ */
+class OperationQueue {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
+
+ private final MainProgram mainProgram;
+
+ private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+ private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
+
+ private long currentChangeSetId = -1;
+ private int nextChunkId = 0;
+
+ OperationQueue(MainProgram mainProgram) {
+ this.mainProgram = mainProgram;
+ }
+
+ /*
+ * Called by other Acorn threads than MainProgram thread
+ */
+ synchronized void scheduleTask(MainProgramRunnable task) {
+
+ mainProgram.assertNoMainProgramThread();
+
+ tasks.add(task);
+
+ // The only thread waiting for this is MainProgram thread
+ notify();
+
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized void startTransaction(long id) {
+ currentChangeSetId = id;
+ nextChunkId = 0;
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized ClusterStreamChunk commitLast() {
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last != null) {
+ last.commit();
+ notifyAll();
+ }
+ return last;
+ }
+
+ /*
+ * Called by DB client write threads
+ */
+ synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last == null || last.isCommitted()) {
+ String id = "" + currentChangeSetId + "-" + nextChunkId++;
+ last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
+ operations.add(last);
+ }
+
+ String chunkId = last.getKey();
+ int chunkOffset = last.operations.size();
+ operation.scheduled(chunkId + "." + chunkOffset);
+
+ last.addOperation(operation);
+
+ // The only thread waiting for this is MainProgram thread
+ notify();
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized void pumpTasks(List<MainProgramRunnable> todo) {
+
+ mainProgram.assertMainProgramThread();
+
+ todo.addAll(tasks);
+ tasks.clear();
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
+
+ mainProgram.assertMainProgramThread();
+
+ while(!operations.isEmpty() && updates.size() < 100) {
+
+ ClusterStreamChunk chunk = operations.pollFirst();
+
+ for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+ ClusterUpdateOperation o = chunk.operations.get(i);
+ ClusterUID uid = o.uid;
+ List<ClusterUpdateOperation> ops = updates.get(uid);
+ if(ops == null) {
+ ops = new ArrayList<ClusterUpdateOperation>();
+ updates.put(uid, ops);
+ }
+ ops.add(o);
+ }
+
+ chunk.nextToProcess = chunk.operations.size();
+
+ if(!chunk.isCommitted()) {
+ assert(operations.isEmpty());
+ operations.add(chunk);
+ break;
+ }
+
+ }
+
+ }
+
+ /*
+ * Called by MainProgram thread
+ */
+ synchronized boolean isEmpty() {
+
+ mainProgram.assertMainProgramThread();
+
+ return operations.isEmpty();
+
+ }
+
+ /*
+ * Wake up when new operations are scheduled or the last operation is committed
+ * Called by MainProgram thread
+ */
+ synchronized void waitFor() {
+
+ mainProgram.assertMainProgramThread();
+
+ // One last check within the monitor
+ if(!operations.isEmpty() || !tasks.isEmpty()) return;
+
+ try {
+ wait(5000);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interruption", e);
+ }
+
+ }
+
+}