]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.diagram/src/org/simantics/diagram/synchronization/graph/ModificationQueue.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.diagram / src / org / simantics / diagram / synchronization / graph / ModificationQueue.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.diagram.synchronization.graph;\r
13 \r
14 import java.util.ArrayDeque;\r
15 import java.util.Deque;\r
16 \r
17 import org.simantics.db.RequestProcessor;\r
18 import org.simantics.diagram.internal.DebugPolicy;\r
19 import org.simantics.diagram.synchronization.ErrorHandler;\r
20 import org.simantics.diagram.synchronization.IModification;\r
21 import org.simantics.diagram.synchronization.IModificationQueue;\r
22 import org.simantics.diagram.synchronization.ModificationAdapter;\r
23 import org.simantics.diagram.synchronization.ThreadingModificationProxy;\r
24 import org.simantics.utils.logging.TimeLogger;\r
25 import org.simantics.utils.threads.IThreadWorkQueue;\r
26 \r
27 /**\r
28  * @author Tuukka Lehtonen\r
29  */\r
30 public class ModificationQueue implements IModificationQueue {\r
31     private final RequestProcessor     writebackProcessor;\r
32     private final ErrorHandler         errorHandler;\r
33     private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();\r
34 \r
35     public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {\r
36         this.writebackProcessor = writebackProcessor;\r
37         this.errorHandler = errorHandler;\r
38     }\r
39 \r
40     public void dispose() {\r
41     }\r
42 \r
43     ModificationProcessRequest createModificationRequest() {\r
44         IModification[] mods = IModification.NONE;\r
45         synchronized (writebackQueue) {\r
46             mods = writebackQueue.toArray(IModification.NONE);\r
47             writebackQueue.clear();\r
48         }\r
49         return new ModificationProcessRequest(errorHandler, mods);\r
50     }\r
51 \r
52     void dispatchModifications() {\r
53         TimeLogger.log("ModificationQueue.dispatchModifications");\r
54         writebackProcessor.asyncRequest(createModificationRequest());\r
55     }\r
56 \r
57     public void discardModifications() {\r
58         IModification[] discarded = IModification.NONE;\r
59         synchronized (writebackQueue) {\r
60             discarded = writebackQueue.toArray(discarded);\r
61             writebackQueue.clear();\r
62         }\r
63         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {\r
64             System.out.println("--------------------------------------------------------------------------------");\r
65             System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");\r
66             for (IModification m : discarded) {\r
67                 System.out.println("    " + m);\r
68             }\r
69             System.out.println("--------------------------------------------------------------------------------");\r
70         }\r
71     }\r
72 \r
73     @Override\r
74     public boolean offer(IModification m) {\r
75         return offer(m, null);\r
76     }\r
77 \r
78     @Override\r
79     public boolean offer(IModification m, IThreadWorkQueue completionThread) {\r
80         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
81             System.out.println(Thread.currentThread() + " Offering graph modification: " + m);\r
82         synchronized (writebackQueue) {\r
83             if (completionThread != null)\r
84                 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));\r
85             else\r
86                 return writebackQueue.add(m);\r
87         }\r
88     }\r
89 \r
90     /**\r
91      * Asynchronously perform the specified modification, using the\r
92      * specified thread for the modification completion event. This method\r
93      * will eventually cause an asynchronous write request invocation to the\r
94      * backend.\r
95      *\r
96      * @param m the modification to execute\r
97      * @param completionThread the thread to be used for running the\r
98      *        modification completion event or <code>null</code> to not care\r
99      *        about the thread.\r
100      */\r
101     public void async(IModification m, IThreadWorkQueue completionThread) {\r
102         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
103             System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);\r
104         synchronized (writebackQueue) {\r
105             offer(m, completionThread);\r
106             dispatchModifications();\r
107         }\r
108     }\r
109 \r
110     /**\r
111      * Synchronously perform the specified modification. This method will\r
112      * cause a write request invocation to the backend.\r
113      *\r
114      * @param m the modification to execute\r
115      * @throws InterruptedException if the waiting thread is interrupted\r
116      */\r
117     public void sync(IModification m) throws InterruptedException {\r
118 //        if (!canvas.getThreadAccess().currentThreadAccess()) {\r
119 //            throw new IllegalStateException(\r
120 //                    "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "\r
121 //                    + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());\r
122 //        }\r
123 \r
124         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
125             System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);\r
126         synchronized (m) {\r
127             synchronized (writebackQueue) {\r
128                 writebackQueue.add(m);\r
129                 dispatchModifications();\r
130             }\r
131             while (!m.isComplete()) {\r
132                 m.wait();\r
133             }\r
134             m.completed();\r
135         }\r
136         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
137             System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);\r
138     }\r
139 \r
140     @Override\r
141     public void flush() {\r
142         synchronized (writebackQueue) {\r
143             // Check if the queue is already empty in which case\r
144             // no flushing is needed.\r
145             if (writebackQueue.isEmpty())\r
146                 return;\r
147         }\r
148         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
149             System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());\r
150         dispatchModifications();\r
151     }\r
152 \r
153     @Override\r
154     public void finish() throws InterruptedException {\r
155         doFinish();\r
156     }\r
157 \r
158     public void safeFinish(boolean force) {\r
159         try {\r
160             doFinish();\r
161         } catch (InterruptedException e) {\r
162             errorHandler.error(e.getMessage(), e);\r
163         }\r
164     }\r
165 \r
166     public void doFinish() throws InterruptedException {\r
167         synchronized (writebackQueue) {\r
168             // Check if the queue is already empty in which case\r
169             // no flushing is needed.\r
170             if (writebackQueue.isEmpty())\r
171                 return;\r
172         }\r
173 \r
174         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
175             System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");\r
176 \r
177         IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {\r
178             @Override\r
179             public String toString() {\r
180                 return "FLUSH_MODIFICATION_QUEUE";\r
181             }\r
182         };\r
183         sync(flush);\r
184         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
185             System.out.println(Thread.currentThread() + " Graph modification queue finished");\r
186     }\r
187 }