]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.diagram/src/org/simantics/diagram/synchronization/graph/ModificationQueue.java
Support VirtualGraph in GraphToDiagramSynchronizer and DiagramSGProvider
[simantics/platform.git] / bundles / org.simantics.diagram / src / org / simantics / diagram / synchronization / graph / ModificationQueue.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.diagram.synchronization.graph;
13
14 import java.util.ArrayDeque;
15 import java.util.Deque;
16
17 import org.simantics.db.RequestProcessor;
18 import org.simantics.db.VirtualGraph;
19 import org.simantics.diagram.internal.DebugPolicy;
20 import org.simantics.diagram.synchronization.ErrorHandler;
21 import org.simantics.diagram.synchronization.IModification;
22 import org.simantics.diagram.synchronization.IModificationQueue;
23 import org.simantics.diagram.synchronization.ModificationAdapter;
24 import org.simantics.diagram.synchronization.ThreadingModificationProxy;
25 import org.simantics.utils.logging.TimeLogger;
26 import org.simantics.utils.threads.IThreadWorkQueue;
27
28 /**
29  * @author Tuukka Lehtonen
30  */
31 public class ModificationQueue implements IModificationQueue {
32     private final RequestProcessor     writebackProcessor;
33     private final ErrorHandler         errorHandler;
34     private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
35     private final VirtualGraph virtualGraph;
36     
37     public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
38         this.writebackProcessor = writebackProcessor;
39         this.errorHandler = errorHandler;
40         this.virtualGraph = null;
41     }
42     
43     public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler, VirtualGraph virtualGraph) {
44         this.writebackProcessor = writebackProcessor;
45         this.errorHandler = errorHandler;
46         this.virtualGraph = virtualGraph;
47     }
48
49     public void dispose() {
50     }
51     
52
53     ModificationProcessRequest createModificationRequest() {
54         IModification[] mods = IModification.NONE;
55         synchronized (writebackQueue) {
56             mods = writebackQueue.toArray(IModification.NONE);
57             writebackQueue.clear();
58         }
59         return new ModificationProcessRequest(errorHandler, mods, virtualGraph);
60     }
61
62     void dispatchModifications() {
63         TimeLogger.log("ModificationQueue.dispatchModifications");
64         writebackProcessor.asyncRequest(createModificationRequest());
65     }
66
67     public void discardModifications() {
68         IModification[] discarded = IModification.NONE;
69         synchronized (writebackQueue) {
70             discarded = writebackQueue.toArray(discarded);
71             writebackQueue.clear();
72         }
73         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
74             System.out.println("--------------------------------------------------------------------------------");
75             System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
76             for (IModification m : discarded) {
77                 System.out.println("    " + m);
78             }
79             System.out.println("--------------------------------------------------------------------------------");
80         }
81     }
82
83     @Override
84     public boolean offer(IModification m) {
85         return offer(m, null);
86     }
87
88     @Override
89     public boolean offer(IModification m, IThreadWorkQueue completionThread) {
90         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
91             System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
92         synchronized (writebackQueue) {
93             if (completionThread != null)
94                 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
95             else
96                 return writebackQueue.add(m);
97         }
98     }
99
100     /**
101      * Asynchronously perform the specified modification, using the
102      * specified thread for the modification completion event. This method
103      * will eventually cause an asynchronous write request invocation to the
104      * backend.
105      *
106      * @param m the modification to execute
107      * @param completionThread the thread to be used for running the
108      *        modification completion event or <code>null</code> to not care
109      *        about the thread.
110      */
111     public void async(IModification m, IThreadWorkQueue completionThread) {
112         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
113             System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
114         synchronized (writebackQueue) {
115             offer(m, completionThread);
116             dispatchModifications();
117         }
118     }
119
120     /**
121      * Synchronously perform the specified modification. This method will
122      * cause a write request invocation to the backend.
123      *
124      * @param m the modification to execute
125      * @throws InterruptedException if the waiting thread is interrupted
126      */
127     public void sync(IModification m) throws InterruptedException {
128 //        if (!canvas.getThreadAccess().currentThreadAccess()) {
129 //            throw new IllegalStateException(
130 //                    "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
131 //                    + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
132 //        }
133
134         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
135             System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
136         synchronized (m) {
137             synchronized (writebackQueue) {
138                 writebackQueue.add(m);
139                 dispatchModifications();
140             }
141             while (!m.isComplete()) {
142                 m.wait();
143             }
144             m.completed();
145         }
146         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
147             System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
148     }
149
150     @Override
151     public void flush() {
152         synchronized (writebackQueue) {
153             // Check if the queue is already empty in which case
154             // no flushing is needed.
155             if (writebackQueue.isEmpty())
156                 return;
157         }
158         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
159             System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
160         dispatchModifications();
161     }
162
163     @Override
164     public void finish() throws InterruptedException {
165         doFinish();
166     }
167
168     public void safeFinish(boolean force) {
169         try {
170             doFinish();
171         } catch (InterruptedException e) {
172             errorHandler.error(e.getMessage(), e);
173         }
174     }
175
176     public void doFinish() throws InterruptedException {
177         synchronized (writebackQueue) {
178             // Check if the queue is already empty in which case
179             // no flushing is needed.
180             if (writebackQueue.isEmpty())
181                 return;
182         }
183
184         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
185             System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
186
187         IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
188             @Override
189             public String toString() {
190                 return "FLUSH_MODIFICATION_QUEUE";
191             }
192         };
193         sync(flush);
194         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
195             System.out.println(Thread.currentThread() + " Graph modification queue finished");
196     }
197 }