]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java
Fixed Acorn deadlock during snapshotting
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / OperationQueue.java
diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java
new file mode 100644 (file)
index 0000000..7a9d8ab
--- /dev/null
@@ -0,0 +1,175 @@
+package org.simantics.acorn;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.simantics.acorn.MainProgram.MainProgramRunnable;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Provides work for MainProgram thread
+ * -cluster stream updates from client
+ * -tasks from other threads
+ *
+ * Synchronized via monitor.
+ *
+ */
+class OperationQueue {
+
+       private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class);
+
+       private final MainProgram mainProgram;
+
+       private final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+       private final LinkedList<MainProgramRunnable> tasks = new LinkedList<>();
+
+       private long currentChangeSetId = -1;
+       private int nextChunkId = 0;
+
+       OperationQueue(MainProgram mainProgram) {
+               this.mainProgram = mainProgram;
+       }
+
+       /*
+        * Called by other Acorn threads than MainProgram thread
+        */
+       synchronized void scheduleTask(MainProgramRunnable task) {
+
+               mainProgram.assertNoMainProgramThread();
+
+               tasks.add(task);
+
+               // The only thread waiting for this is MainProgram thread
+               notify();
+
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized void startTransaction(long id) {
+               currentChangeSetId = id;
+               nextChunkId = 0;
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized ClusterStreamChunk commitLast() {
+               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               if(last != null) {
+                       last.commit();
+                       notifyAll();
+               }
+               return last;
+       }
+
+       /*
+        * Called by DB client write threads
+        */
+       synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+               ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+               if(last == null || last.isCommitted()) {
+                       String id = "" + currentChangeSetId + "-" + nextChunkId++;
+                       last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id);
+                       operations.add(last);
+               }
+
+               String chunkId = last.getKey();
+               int chunkOffset = last.operations.size();
+               operation.scheduled(chunkId + "." + chunkOffset);
+
+               last.addOperation(operation);
+
+               // The only thread waiting for this is MainProgram thread
+               notify();
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized void pumpTasks(List<MainProgramRunnable> todo) {
+
+               mainProgram.assertMainProgramThread();
+
+               todo.addAll(tasks);
+               tasks.clear();
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized void pumpUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) {
+
+               mainProgram.assertMainProgramThread();
+
+               while(!operations.isEmpty() && updates.size() < 100) {
+
+                       ClusterStreamChunk chunk = operations.pollFirst();
+
+                       for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+                               ClusterUpdateOperation o = chunk.operations.get(i);
+                               ClusterUID uid = o.uid;
+                               List<ClusterUpdateOperation> ops = updates.get(uid);
+                               if(ops == null) {
+                                       ops = new ArrayList<ClusterUpdateOperation>();
+                                       updates.put(uid, ops);
+                               }
+                               ops.add(o);
+                       }
+
+                       chunk.nextToProcess = chunk.operations.size();
+
+                       if(!chunk.isCommitted()) {
+                               assert(operations.isEmpty());
+                               operations.add(chunk);
+                               break;
+                       }
+
+               }
+
+       }
+
+       /*
+        * Called by MainProgram thread
+        */
+       synchronized boolean isEmpty() {
+
+               mainProgram.assertMainProgramThread();
+
+               return operations.isEmpty();
+
+       }
+
+       /*
+        * Wake up when new operations are scheduled or the last operation is committed
+        * Called by MainProgram thread
+        */
+       synchronized void waitFor() {
+
+               mainProgram.assertMainProgramThread();
+
+               // One last check within the monitor 
+               if(!operations.isEmpty() || !tasks.isEmpty()) return;
+
+               try {
+                       wait(5000);
+               } catch (InterruptedException e) {
+                       LOGGER.error("Unexpected interruption", e);
+               }
+
+       }
+
+}