1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.diagram.synchronization.graph;
14 import java.util.ArrayDeque;
15 import java.util.Deque;
17 import org.simantics.db.RequestProcessor;
18 import org.simantics.db.VirtualGraph;
19 import org.simantics.diagram.internal.DebugPolicy;
20 import org.simantics.diagram.synchronization.ErrorHandler;
21 import org.simantics.diagram.synchronization.IModification;
22 import org.simantics.diagram.synchronization.IModificationQueue;
23 import org.simantics.diagram.synchronization.ModificationAdapter;
24 import org.simantics.diagram.synchronization.ThreadingModificationProxy;
25 import org.simantics.utils.logging.TimeLogger;
26 import org.simantics.utils.threads.IThreadWorkQueue;
29 * @author Tuukka Lehtonen
31 public class ModificationQueue implements IModificationQueue {
32 private final RequestProcessor writebackProcessor;
33 private final ErrorHandler errorHandler;
34 private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
35 private final VirtualGraph virtualGraph;
37 public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
38 this.writebackProcessor = writebackProcessor;
39 this.errorHandler = errorHandler;
40 this.virtualGraph = null;
43 public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler, VirtualGraph virtualGraph) {
44 this.writebackProcessor = writebackProcessor;
45 this.errorHandler = errorHandler;
46 this.virtualGraph = virtualGraph;
49 public void dispose() {
53 ModificationProcessRequest createModificationRequest() {
54 IModification[] mods = IModification.NONE;
55 synchronized (writebackQueue) {
56 mods = writebackQueue.toArray(IModification.NONE);
57 writebackQueue.clear();
59 return new ModificationProcessRequest(errorHandler, mods, virtualGraph);
62 void dispatchModifications() {
63 TimeLogger.log("ModificationQueue.dispatchModifications");
64 writebackProcessor.asyncRequest(createModificationRequest());
67 public void discardModifications() {
68 IModification[] discarded = IModification.NONE;
69 synchronized (writebackQueue) {
70 discarded = writebackQueue.toArray(discarded);
71 writebackQueue.clear();
73 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
74 System.out.println("--------------------------------------------------------------------------------");
75 System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
76 for (IModification m : discarded) {
77 System.out.println(" " + m);
79 System.out.println("--------------------------------------------------------------------------------");
84 public boolean offer(IModification m) {
85 return offer(m, null);
89 public boolean offer(IModification m, IThreadWorkQueue completionThread) {
90 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
91 System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
92 synchronized (writebackQueue) {
93 if (completionThread != null)
94 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
96 return writebackQueue.add(m);
101 * Asynchronously perform the specified modification, using the
102 * specified thread for the modification completion event. This method
103 * will eventually cause an asynchronous write request invocation to the
106 * @param m the modification to execute
107 * @param completionThread the thread to be used for running the
108 * modification completion event or <code>null</code> to not care
111 public void async(IModification m, IThreadWorkQueue completionThread) {
112 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
113 System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
114 synchronized (writebackQueue) {
115 offer(m, completionThread);
116 dispatchModifications();
121 * Synchronously perform the specified modification. This method will
122 * cause a write request invocation to the backend.
124 * @param m the modification to execute
125 * @throws InterruptedException if the waiting thread is interrupted
127 public void sync(IModification m) throws InterruptedException {
128 // if (!canvas.getThreadAccess().currentThreadAccess()) {
129 // throw new IllegalStateException(
130 // "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
131 // + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
134 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
135 System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
137 synchronized (writebackQueue) {
138 writebackQueue.add(m);
139 dispatchModifications();
141 while (!m.isComplete()) {
146 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
147 System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
151 public void flush() {
152 synchronized (writebackQueue) {
153 // Check if the queue is already empty in which case
154 // no flushing is needed.
155 if (writebackQueue.isEmpty())
158 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
159 System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
160 dispatchModifications();
164 public void finish() throws InterruptedException {
168 public void safeFinish(boolean force) {
171 } catch (InterruptedException e) {
172 errorHandler.error(e.getMessage(), e);
176 public void doFinish() throws InterruptedException {
177 synchronized (writebackQueue) {
178 // Check if the queue is already empty in which case
179 // no flushing is needed.
180 if (writebackQueue.isEmpty())
184 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
185 System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
187 IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
189 public String toString() {
190 return "FLUSH_MODIFICATION_QUEUE";
194 if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
195 System.out.println(Thread.currentThread() + " Graph modification queue finished");