--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.diagram.synchronization.graph;\r
+\r
+import java.util.ArrayDeque;\r
+import java.util.Deque;\r
+\r
+import org.simantics.db.RequestProcessor;\r
+import org.simantics.diagram.internal.DebugPolicy;\r
+import org.simantics.diagram.synchronization.ErrorHandler;\r
+import org.simantics.diagram.synchronization.IModification;\r
+import org.simantics.diagram.synchronization.IModificationQueue;\r
+import org.simantics.diagram.synchronization.ModificationAdapter;\r
+import org.simantics.diagram.synchronization.ThreadingModificationProxy;\r
+import org.simantics.utils.logging.TimeLogger;\r
+import org.simantics.utils.threads.IThreadWorkQueue;\r
+\r
+/**\r
+ * @author Tuukka Lehtonen\r
+ */\r
+public class ModificationQueue implements IModificationQueue {\r
+ private final RequestProcessor writebackProcessor;\r
+ private final ErrorHandler errorHandler;\r
+ private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();\r
+\r
+ public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {\r
+ this.writebackProcessor = writebackProcessor;\r
+ this.errorHandler = errorHandler;\r
+ }\r
+\r
+ public void dispose() {\r
+ }\r
+\r
+ ModificationProcessRequest createModificationRequest() {\r
+ IModification[] mods = IModification.NONE;\r
+ synchronized (writebackQueue) {\r
+ mods = writebackQueue.toArray(IModification.NONE);\r
+ writebackQueue.clear();\r
+ }\r
+ return new ModificationProcessRequest(errorHandler, mods);\r
+ }\r
+\r
+ void dispatchModifications() {\r
+ TimeLogger.log("ModificationQueue.dispatchModifications");\r
+ writebackProcessor.asyncRequest(createModificationRequest());\r
+ }\r
+\r
+ public void discardModifications() {\r
+ IModification[] discarded = IModification.NONE;\r
+ synchronized (writebackQueue) {\r
+ discarded = writebackQueue.toArray(discarded);\r
+ writebackQueue.clear();\r
+ }\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {\r
+ System.out.println("--------------------------------------------------------------------------------");\r
+ System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");\r
+ for (IModification m : discarded) {\r
+ System.out.println(" " + m);\r
+ }\r
+ System.out.println("--------------------------------------------------------------------------------");\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public boolean offer(IModification m) {\r
+ return offer(m, null);\r
+ }\r
+\r
+ @Override\r
+ public boolean offer(IModification m, IThreadWorkQueue completionThread) {\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Offering graph modification: " + m);\r
+ synchronized (writebackQueue) {\r
+ if (completionThread != null)\r
+ return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));\r
+ else\r
+ return writebackQueue.add(m);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Asynchronously perform the specified modification, using the\r
+ * specified thread for the modification completion event. This method\r
+ * will eventually cause an asynchronous write request invocation to the\r
+ * backend.\r
+ *\r
+ * @param m the modification to execute\r
+ * @param completionThread the thread to be used for running the\r
+ * modification completion event or <code>null</code> to not care\r
+ * about the thread.\r
+ */\r
+ public void async(IModification m, IThreadWorkQueue completionThread) {\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);\r
+ synchronized (writebackQueue) {\r
+ offer(m, completionThread);\r
+ dispatchModifications();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Synchronously perform the specified modification. This method will\r
+ * cause a write request invocation to the backend.\r
+ *\r
+ * @param m the modification to execute\r
+ * @throws InterruptedException if the waiting thread is interrupted\r
+ */\r
+ public void sync(IModification m) throws InterruptedException {\r
+// if (!canvas.getThreadAccess().currentThreadAccess()) {\r
+// throw new IllegalStateException(\r
+// "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "\r
+// + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());\r
+// }\r
+\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);\r
+ synchronized (m) {\r
+ synchronized (writebackQueue) {\r
+ writebackQueue.add(m);\r
+ dispatchModifications();\r
+ }\r
+ while (!m.isComplete()) {\r
+ m.wait();\r
+ }\r
+ m.completed();\r
+ }\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);\r
+ }\r
+\r
+ @Override\r
+ public void flush() {\r
+ synchronized (writebackQueue) {\r
+ // Check if the queue is already empty in which case\r
+ // no flushing is needed.\r
+ if (writebackQueue.isEmpty())\r
+ return;\r
+ }\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());\r
+ dispatchModifications();\r
+ }\r
+\r
+ @Override\r
+ public void finish() throws InterruptedException {\r
+ doFinish();\r
+ }\r
+\r
+ public void safeFinish(boolean force) {\r
+ try {\r
+ doFinish();\r
+ } catch (InterruptedException e) {\r
+ errorHandler.error(e.getMessage(), e);\r
+ }\r
+ }\r
+\r
+ public void doFinish() throws InterruptedException {\r
+ synchronized (writebackQueue) {\r
+ // Check if the queue is already empty in which case\r
+ // no flushing is needed.\r
+ if (writebackQueue.isEmpty())\r
+ return;\r
+ }\r
+\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");\r
+\r
+ IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {\r
+ @Override\r
+ public String toString() {\r
+ return "FLUSH_MODIFICATION_QUEUE";\r
+ }\r
+ };\r
+ sync(flush);\r
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
+ System.out.println(Thread.currentThread() + " Graph modification queue finished");\r
+ }\r
+}
\ No newline at end of file