package org.simantics.acorn; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.TreeMap; import org.simantics.acorn.MainProgram.MainProgramRunnable; import org.simantics.acorn.exception.AcornAccessVerificationException; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* * Provides work for MainProgram thread * -cluster stream updates from client * -tasks from other threads * * Synchronized via monitor. * */ class OperationQueue { private static final Logger LOGGER = LoggerFactory.getLogger(OperationQueue.class); private final MainProgram mainProgram; private final LinkedList operations = new LinkedList<>(); private final LinkedList tasks = new LinkedList<>(); private long currentChangeSetId = -1; private int nextChunkId = 0; OperationQueue(MainProgram mainProgram) { this.mainProgram = mainProgram; } /* * Called by other Acorn threads than MainProgram thread */ synchronized void scheduleTask(MainProgramRunnable task) { mainProgram.assertNoMainProgramThread(); tasks.add(task); // The only thread waiting for this is MainProgram thread notify(); } /* * Called by DB client write threads */ synchronized void startTransaction(long id) { currentChangeSetId = id; nextChunkId = 0; } /* * Called by DB client write threads */ synchronized ClusterStreamChunk commitLast() { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if(last != null) { last.commit(); notifyAll(); } return last; } /* * Called by DB client write threads */ synchronized void scheduleUpdate(ClusterUpdateOperation operation) throws AcornAccessVerificationException, IllegalAcornStateException { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if(last == null || last.isCommitted()) { String id = "" + currentChangeSetId + "-" + nextChunkId++; last = new ClusterStreamChunk(mainProgram.clusters, mainProgram.clusters.streamLRU, id); operations.add(last); } String chunkId = last.getKey(); int chunkOffset = last.operations.size(); operation.scheduled(chunkId + "." + chunkOffset); last.addOperation(operation); // The only thread waiting for this is MainProgram thread notify(); } /* * Called by MainProgram thread */ synchronized void pumpTasks(List todo) { mainProgram.assertMainProgramThread(); todo.addAll(tasks); tasks.clear(); } /* * Called by MainProgram thread */ synchronized void pumpUpdates(TreeMap> updates) { mainProgram.assertMainProgramThread(); while(!operations.isEmpty() && updates.size() < 100) { ClusterStreamChunk chunk = operations.pollFirst(); for(int i=chunk.nextToProcess;i ops = updates.get(uid); if(ops == null) { ops = new ArrayList(); updates.put(uid, ops); } ops.add(o); } chunk.nextToProcess = chunk.operations.size(); if(!chunk.isCommitted()) { assert(operations.isEmpty()); operations.add(chunk); break; } } } /* * Called by MainProgram thread */ synchronized boolean isEmpty() { mainProgram.assertMainProgramThread(); return operations.isEmpty(); } /* * Wake up when new operations are scheduled or the last operation is committed * Called by MainProgram thread */ synchronized long waitFor() { mainProgram.assertMainProgramThread(); // One last check within the monitor if(!operations.isEmpty() || !tasks.isEmpty()) return 0; long start = System.nanoTime(); try { wait(5000); } catch (InterruptedException e) { LOGGER.error("Unexpected interruption", e); } return System.nanoTime() - start; } }