/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.diagram.synchronization.graph; import java.util.ArrayDeque; import java.util.Deque; import org.simantics.db.RequestProcessor; import org.simantics.diagram.internal.DebugPolicy; import org.simantics.diagram.synchronization.ErrorHandler; import org.simantics.diagram.synchronization.IModification; import org.simantics.diagram.synchronization.IModificationQueue; import org.simantics.diagram.synchronization.ModificationAdapter; import org.simantics.diagram.synchronization.ThreadingModificationProxy; import org.simantics.utils.logging.TimeLogger; import org.simantics.utils.threads.IThreadWorkQueue; /** * @author Tuukka Lehtonen */ public class ModificationQueue implements IModificationQueue { private final RequestProcessor writebackProcessor; private final ErrorHandler errorHandler; private final Deque writebackQueue = new ArrayDeque(); public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) { this.writebackProcessor = writebackProcessor; this.errorHandler = errorHandler; } public void dispose() { } ModificationProcessRequest createModificationRequest() { IModification[] mods = IModification.NONE; synchronized (writebackQueue) { mods = writebackQueue.toArray(IModification.NONE); writebackQueue.clear(); } return new ModificationProcessRequest(errorHandler, mods); } void dispatchModifications() { TimeLogger.log("ModificationQueue.dispatchModifications"); writebackProcessor.asyncRequest(createModificationRequest()); } public void discardModifications() { IModification[] discarded = IModification.NONE; synchronized (writebackQueue) { discarded = writebackQueue.toArray(discarded); writebackQueue.clear(); } if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) { System.out.println("--------------------------------------------------------------------------------"); System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:"); for (IModification m : discarded) { System.out.println(" " + m); } System.out.println("--------------------------------------------------------------------------------"); } } @Override public boolean offer(IModification m) { return offer(m, null); } @Override public boolean offer(IModification m, IThreadWorkQueue completionThread) { if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Offering graph modification: " + m); synchronized (writebackQueue) { if (completionThread != null) return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false)); else return writebackQueue.add(m); } } /** * Asynchronously perform the specified modification, using the * specified thread for the modification completion event. This method * will eventually cause an asynchronous write request invocation to the * backend. * * @param m the modification to execute * @param completionThread the thread to be used for running the * modification completion event or null to not care * about the thread. */ public void async(IModification m, IThreadWorkQueue completionThread) { if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m); synchronized (writebackQueue) { offer(m, completionThread); dispatchModifications(); } } /** * Synchronously perform the specified modification. This method will * cause a write request invocation to the backend. * * @param m the modification to execute * @throws InterruptedException if the waiting thread is interrupted */ public void sync(IModification m) throws InterruptedException { // if (!canvas.getThreadAccess().currentThreadAccess()) { // throw new IllegalStateException( // "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread " // + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread()); // } if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m); synchronized (m) { synchronized (writebackQueue) { writebackQueue.add(m); dispatchModifications(); } while (!m.isComplete()) { m.wait(); } m.completed(); } if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m); } @Override public void flush() { synchronized (writebackQueue) { // Check if the queue is already empty in which case // no flushing is needed. if (writebackQueue.isEmpty()) return; } if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size()); dispatchModifications(); } @Override public void finish() throws InterruptedException { doFinish(); } public void safeFinish(boolean force) { try { doFinish(); } catch (InterruptedException e) { errorHandler.error(e.getMessage(), e); } } public void doFinish() throws InterruptedException { synchronized (writebackQueue) { // Check if the queue is already empty in which case // no flushing is needed. if (writebackQueue.isEmpty()) return; } if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished"); IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) { @Override public String toString() { return "FLUSH_MODIFICATION_QUEUE"; } }; sync(flush); if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) System.out.println(Thread.currentThread() + " Graph modification queue finished"); } }