]> gerrit.simantics Code Review - simantics/platform.git/blob
0e74431bd5d2d34ab676181d67eb6e7580b17d86
[simantics/platform.git] /
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.diagram.synchronization.graph;
13
14 import java.util.ArrayDeque;
15 import java.util.Deque;
16
17 import org.simantics.db.RequestProcessor;
18 import org.simantics.diagram.internal.DebugPolicy;
19 import org.simantics.diagram.synchronization.ErrorHandler;
20 import org.simantics.diagram.synchronization.IModification;
21 import org.simantics.diagram.synchronization.IModificationQueue;
22 import org.simantics.diagram.synchronization.ModificationAdapter;
23 import org.simantics.diagram.synchronization.ThreadingModificationProxy;
24 import org.simantics.utils.logging.TimeLogger;
25 import org.simantics.utils.threads.IThreadWorkQueue;
26
27 /**
28  * @author Tuukka Lehtonen
29  */
30 public class ModificationQueue implements IModificationQueue {
31     private final RequestProcessor     writebackProcessor;
32     private final ErrorHandler         errorHandler;
33     private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
34
35     public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
36         this.writebackProcessor = writebackProcessor;
37         this.errorHandler = errorHandler;
38     }
39
40     public void dispose() {
41     }
42
43     ModificationProcessRequest createModificationRequest() {
44         IModification[] mods = IModification.NONE;
45         synchronized (writebackQueue) {
46             mods = writebackQueue.toArray(IModification.NONE);
47             writebackQueue.clear();
48         }
49         return new ModificationProcessRequest(errorHandler, mods);
50     }
51
52     void dispatchModifications() {
53         TimeLogger.log("ModificationQueue.dispatchModifications");
54         writebackProcessor.asyncRequest(createModificationRequest());
55     }
56
57     public void discardModifications() {
58         IModification[] discarded = IModification.NONE;
59         synchronized (writebackQueue) {
60             discarded = writebackQueue.toArray(discarded);
61             writebackQueue.clear();
62         }
63         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
64             System.out.println("--------------------------------------------------------------------------------");
65             System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
66             for (IModification m : discarded) {
67                 System.out.println("    " + m);
68             }
69             System.out.println("--------------------------------------------------------------------------------");
70         }
71     }
72
73     @Override
74     public boolean offer(IModification m) {
75         return offer(m, null);
76     }
77
78     @Override
79     public boolean offer(IModification m, IThreadWorkQueue completionThread) {
80         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
81             System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
82         synchronized (writebackQueue) {
83             if (completionThread != null)
84                 return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
85             else
86                 return writebackQueue.add(m);
87         }
88     }
89
90     /**
91      * Asynchronously perform the specified modification, using the
92      * specified thread for the modification completion event. This method
93      * will eventually cause an asynchronous write request invocation to the
94      * backend.
95      *
96      * @param m the modification to execute
97      * @param completionThread the thread to be used for running the
98      *        modification completion event or <code>null</code> to not care
99      *        about the thread.
100      */
101     public void async(IModification m, IThreadWorkQueue completionThread) {
102         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
103             System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
104         synchronized (writebackQueue) {
105             offer(m, completionThread);
106             dispatchModifications();
107         }
108     }
109
110     /**
111      * Synchronously perform the specified modification. This method will
112      * cause a write request invocation to the backend.
113      *
114      * @param m the modification to execute
115      * @throws InterruptedException if the waiting thread is interrupted
116      */
117     public void sync(IModification m) throws InterruptedException {
118 //        if (!canvas.getThreadAccess().currentThreadAccess()) {
119 //            throw new IllegalStateException(
120 //                    "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
121 //                    + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
122 //        }
123
124         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
125             System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
126         synchronized (m) {
127             synchronized (writebackQueue) {
128                 writebackQueue.add(m);
129                 dispatchModifications();
130             }
131             while (!m.isComplete()) {
132                 m.wait();
133             }
134             m.completed();
135         }
136         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
137             System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
138     }
139
140     @Override
141     public void flush() {
142         synchronized (writebackQueue) {
143             // Check if the queue is already empty in which case
144             // no flushing is needed.
145             if (writebackQueue.isEmpty())
146                 return;
147         }
148         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
149             System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
150         dispatchModifications();
151     }
152
153     @Override
154     public void finish() throws InterruptedException {
155         doFinish();
156     }
157
158     public void safeFinish(boolean force) {
159         try {
160             doFinish();
161         } catch (InterruptedException e) {
162             errorHandler.error(e.getMessage(), e);
163         }
164     }
165
166     public void doFinish() throws InterruptedException {
167         synchronized (writebackQueue) {
168             // Check if the queue is already empty in which case
169             // no flushing is needed.
170             if (writebackQueue.isEmpty())
171                 return;
172         }
173
174         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
175             System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
176
177         IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
178             @Override
179             public String toString() {
180                 return "FLUSH_MODIFICATION_QUEUE";
181             }
182         };
183         sync(flush);
184         if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
185             System.out.println(Thread.currentThread() + " Graph modification queue finished");
186     }
187 }