1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.diagram.synchronization.graph;
\r
14 import java.util.ArrayDeque;
\r
15 import java.util.Deque;
\r
17 import org.simantics.db.RequestProcessor;
\r
18 import org.simantics.diagram.internal.DebugPolicy;
\r
19 import org.simantics.diagram.synchronization.ErrorHandler;
\r
20 import org.simantics.diagram.synchronization.IModification;
\r
21 import org.simantics.diagram.synchronization.IModificationQueue;
\r
22 import org.simantics.diagram.synchronization.ModificationAdapter;
\r
23 import org.simantics.diagram.synchronization.ThreadingModificationProxy;
\r
24 import org.simantics.utils.logging.TimeLogger;
\r
25 import org.simantics.utils.threads.IThreadWorkQueue;
\r
28 * @author Tuukka Lehtonen
\r
30 public class ModificationQueue implements IModificationQueue {
\r
31 private final RequestProcessor writebackProcessor;
\r
32 private final ErrorHandler errorHandler;
\r
33 private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
\r
35 public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
\r
36 this.writebackProcessor = writebackProcessor;
\r
37 this.errorHandler = errorHandler;
\r
40 public void dispose() {
\r
43 ModificationProcessRequest createModificationRequest() {
\r
44 IModification[] mods = IModification.NONE;
\r
45 synchronized (writebackQueue) {
\r
46 mods = writebackQueue.toArray(IModification.NONE);
\r
47 writebackQueue.clear();
\r
49 return new ModificationProcessRequest(errorHandler, mods);
\r
52 void dispatchModifications() {
\r
53 TimeLogger.log("ModificationQueue.dispatchModifications");
\r
54 writebackProcessor.asyncRequest(createModificationRequest());
\r
57 public void discardModifications() {
\r
58 IModification[] discarded = IModification.NONE;
\r
59 synchronized (writebackQueue) {
\r
60 discarded = writebackQueue.toArray(discarded);
\r
61 writebackQueue.clear();
\r
63 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
\r
64 System.out.println("--------------------------------------------------------------------------------");
\r
65 System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
\r
66 for (IModification m : discarded) {
\r
67 System.out.println(" " + m);
\r
69 System.out.println("--------------------------------------------------------------------------------");
\r
74 public boolean offer(IModification m) {
\r
75 return offer(m, null);
\r
79 public boolean offer(IModification m, IThreadWorkQueue completionThread) {
\r
80 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
81 System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
\r
82 synchronized (writebackQueue) {
\r
83 if (completionThread != null)
\r
84 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
\r
86 return writebackQueue.add(m);
\r
91 * Asynchronously perform the specified modification, using the
\r
92 * specified thread for the modification completion event. This method
\r
93 * will eventually cause an asynchronous write request invocation to the
\r
96 * @param m the modification to execute
\r
97 * @param completionThread the thread to be used for running the
\r
98 * modification completion event or <code>null</code> to not care
\r
101 public void async(IModification m, IThreadWorkQueue completionThread) {
\r
102 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
103 System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
\r
104 synchronized (writebackQueue) {
\r
105 offer(m, completionThread);
\r
106 dispatchModifications();
\r
111 * Synchronously perform the specified modification. This method will
\r
112 * cause a write request invocation to the backend.
\r
114 * @param m the modification to execute
\r
115 * @throws InterruptedException if the waiting thread is interrupted
\r
117 public void sync(IModification m) throws InterruptedException {
\r
118 // if (!canvas.getThreadAccess().currentThreadAccess()) {
\r
119 // throw new IllegalStateException(
\r
120 // "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
\r
121 // + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
\r
124 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
125 System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
\r
127 synchronized (writebackQueue) {
\r
128 writebackQueue.add(m);
\r
129 dispatchModifications();
\r
131 while (!m.isComplete()) {
\r
136 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
137 System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
\r
141 public void flush() {
\r
142 synchronized (writebackQueue) {
\r
143 // Check if the queue is already empty in which case
\r
144 // no flushing is needed.
\r
145 if (writebackQueue.isEmpty())
\r
148 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
149 System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
\r
150 dispatchModifications();
\r
154 public void finish() throws InterruptedException {
\r
158 public void safeFinish(boolean force) {
\r
161 } catch (InterruptedException e) {
\r
162 errorHandler.error(e.getMessage(), e);
\r
166 public void doFinish() throws InterruptedException {
\r
167 synchronized (writebackQueue) {
\r
168 // Check if the queue is already empty in which case
\r
169 // no flushing is needed.
\r
170 if (writebackQueue.isEmpty())
\r
174 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
175 System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
\r
177 IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
\r
179 public String toString() {
\r
180 return "FLUSH_MODIFICATION_QUEUE";
\r
184 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
\r
185 System.out.println(Thread.currentThread() + " Graph modification queue finished");
\r