X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FOperationQueue.java;fp=bundles%2Forg.simantics.acorn%2Fsrc%2Forg%2Fsimantics%2Facorn%2FOperationQueue.java;h=7a9d8aba23f1b714d190a92552f879066775a94e;hp=0000000000000000000000000000000000000000;hb=751ee12501d220832b672dd433655a4d65806fd9;hpb=53ed96e828ad57679b6933ebde323b7869191c94 diff --git a/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java new file mode 100644 index 000000000..7a9d8aba2 --- /dev/null +++ b/bundles/org.simantics.acorn/src/org/simantics/acorn/OperationQueue.java @@ -0,0 +1,175 @@ +package org.simantics.acorn; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; + +import org.simantics.acorn.MainProgram.MainProgramRunnable; +import org.simantics.acorn.exception.AcornAccessVerificationException; +import org.simantics.acorn.exception.IllegalAcornStateException; +import org.simantics.acorn.lru.ClusterStreamChunk; +import org.simantics.acorn.lru.ClusterUpdateOperation; +import org.simantics.db.service.ClusterUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Provides work for MainProgram thread + * -cluster stream updates from client + * -tasks from other threads + * + * Synchronized via monitor. + * + */ +class OperationQueue { + + private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class); + + private final MainProgram mainProgram; + + private final LinkedList operations = new LinkedList<>(); + + private final LinkedList tasks = new LinkedList<>(); + + private long currentChangeSetId = -1; + private int nextChunkId = 0; + + OperationQueue(MainProgram mainProgram) { + this.mainProgram = mainProgram; + } + + /* + * Called by other Acorn threads than MainProgram thread + */ + synchronized void scheduleTask(MainProgramRunnable task) { + + mainProgram.assertNoMainProgramThread(); + + tasks.add(task); + + // The only thread waiting for this is MainProgram thread + notify(); + + } + + /* + * Called by DB client write threads + */ + synchronized void startTransaction(long id) { + currentChangeSetId = id; + nextChunkId = 0; + } + + /* + * Called by DB client write threads + */ + synchronized ClusterStreamChunk commitLast() { + ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + if(last != null) { + last.commit(); + notifyAll(); + } + return last; + } + + /* + * Called by DB client write threads + */ + synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException { + + ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); + if(last == null || last.isCommitted()) { + String id = "" + currentChangeSetId + "-" + nextChunkId++; + last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id); + operations.add(last); + } + + String chunkId = last.getKey(); + int chunkOffset = last.operations.size(); + operation.scheduled(chunkId + "." + chunkOffset); + + last.addOperation(operation); + + // The only thread waiting for this is MainProgram thread + notify(); + + } + + /* + * Called by MainProgram thread + */ + synchronized void pumpTasks(List todo) { + + mainProgram.assertMainProgramThread(); + + todo.addAll(tasks); + tasks.clear(); + + } + + /* + * Called by MainProgram thread + */ + synchronized void pumpUpdates(TreeMap> updates) { + + mainProgram.assertMainProgramThread(); + + while(!operations.isEmpty() && updates.size() < 100) { + + ClusterStreamChunk chunk = operations.pollFirst(); + + for(int i=chunk.nextToProcess;i ops = updates.get(uid); + if(ops == null) { + ops = new ArrayList(); + updates.put(uid, ops); + } + ops.add(o); + } + + chunk.nextToProcess = chunk.operations.size(); + + if(!chunk.isCommitted()) { + assert(operations.isEmpty()); + operations.add(chunk); + break; + } + + } + + } + + /* + * Called by MainProgram thread + */ + synchronized boolean isEmpty() { + + mainProgram.assertMainProgramThread(); + + return operations.isEmpty(); + + } + + /* + * Wake up when new operations are scheduled or the last operation is committed + * Called by MainProgram thread + */ + synchronized void waitFor() { + + mainProgram.assertMainProgramThread(); + + // One last check within the monitor + if(!operations.isEmpty() || !tasks.isEmpty()) return; + + try { + wait(5000); + } catch (InterruptedException e) { + LOGGER.error("Unexpected interruption", e); + } + + } + +}