1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.diagram.synchronization.graph;
14 import java.util.ArrayDeque;
15 import java.util.Deque;
17 import org.simantics.db.RequestProcessor;
18 import org.simantics.diagram.internal.DebugPolicy;
19 import org.simantics.diagram.synchronization.ErrorHandler;
20 import org.simantics.diagram.synchronization.IModification;
21 import org.simantics.diagram.synchronization.IModificationQueue;
22 import org.simantics.diagram.synchronization.ModificationAdapter;
23 import org.simantics.diagram.synchronization.ThreadingModificationProxy;
24 import org.simantics.utils.logging.TimeLogger;
25 import org.simantics.utils.threads.IThreadWorkQueue;
28 * @author Tuukka Lehtonen
30 public class ModificationQueue implements IModificationQueue {
31 private final RequestProcessor writebackProcessor;
32 private final ErrorHandler errorHandler;
33 private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
35 public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
36 this.writebackProcessor = writebackProcessor;
37 this.errorHandler = errorHandler;
40 public void dispose() {
43 ModificationProcessRequest createModificationRequest() {
44 IModification[] mods = IModification.NONE;
45 synchronized (writebackQueue) {
46 mods = writebackQueue.toArray(IModification.NONE);
47 writebackQueue.clear();
49 return new ModificationProcessRequest(errorHandler, mods);
52 void dispatchModifications() {
53 TimeLogger.log("ModificationQueue.dispatchModifications");
54 writebackProcessor.asyncRequest(createModificationRequest());
57 public void discardModifications() {
58 IModification[] discarded = IModification.NONE;
59 synchronized (writebackQueue) {
60 discarded = writebackQueue.toArray(discarded);
61 writebackQueue.clear();
63 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
64 System.out.println("--------------------------------------------------------------------------------");
65 System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
66 for (IModification m : discarded) {
67 System.out.println(" " + m);
69 System.out.println("--------------------------------------------------------------------------------");
74 public boolean offer(IModification m) {
75 return offer(m, null);
79 public boolean offer(IModification m, IThreadWorkQueue completionThread) {
80 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
81 System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
82 synchronized (writebackQueue) {
83 if (completionThread != null)
84 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
86 return writebackQueue.add(m);
91 * Asynchronously perform the specified modification, using the
92 * specified thread for the modification completion event. This method
93 * will eventually cause an asynchronous write request invocation to the
96 * @param m the modification to execute
97 * @param completionThread the thread to be used for running the
98 * modification completion event or <code>null</code> to not care
101 public void async(IModification m, IThreadWorkQueue completionThread) {
102 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
103 System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
104 synchronized (writebackQueue) {
105 offer(m, completionThread);
106 dispatchModifications();
111 * Synchronously perform the specified modification. This method will
112 * cause a write request invocation to the backend.
114 * @param m the modification to execute
115 * @throws InterruptedException if the waiting thread is interrupted
117 public void sync(IModification m) throws InterruptedException {
118 // if (!canvas.getThreadAccess().currentThreadAccess()) {
119 // throw new IllegalStateException(
120 // "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
121 // + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
124 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
125 System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
127 synchronized (writebackQueue) {
128 writebackQueue.add(m);
129 dispatchModifications();
131 while (!m.isComplete()) {
136 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
137 System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
141 public void flush() {
142 synchronized (writebackQueue) {
143 // Check if the queue is already empty in which case
144 // no flushing is needed.
145 if (writebackQueue.isEmpty())
148 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
149 System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
150 dispatchModifications();
154 public void finish() throws InterruptedException {
158 public void safeFinish(boolean force) {
161 } catch (InterruptedException e) {
162 errorHandler.error(e.getMessage(), e);
166 public void doFinish() throws InterruptedException {
167 synchronized (writebackQueue) {
168 // Check if the queue is already empty in which case
169 // no flushing is needed.
170 if (writebackQueue.isEmpty())
174 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
175 System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
177 IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
179 public String toString() {
180 return "FLUSH_MODIFICATION_QUEUE";
184 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
185 System.out.println(Thread.currentThread() + " Graph modification queue finished");