-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.diagram.synchronization.graph;\r
-\r
-import java.util.ArrayDeque;\r
-import java.util.Deque;\r
-\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.diagram.internal.DebugPolicy;\r
-import org.simantics.diagram.synchronization.ErrorHandler;\r
-import org.simantics.diagram.synchronization.IModification;\r
-import org.simantics.diagram.synchronization.IModificationQueue;\r
-import org.simantics.diagram.synchronization.ModificationAdapter;\r
-import org.simantics.diagram.synchronization.ThreadingModificationProxy;\r
-import org.simantics.utils.logging.TimeLogger;\r
-import org.simantics.utils.threads.IThreadWorkQueue;\r
-\r
-/**\r
- * @author Tuukka Lehtonen\r
- */\r
-public class ModificationQueue implements IModificationQueue {\r
- private final RequestProcessor writebackProcessor;\r
- private final ErrorHandler errorHandler;\r
- private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();\r
-\r
- public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {\r
- this.writebackProcessor = writebackProcessor;\r
- this.errorHandler = errorHandler;\r
- }\r
-\r
- public void dispose() {\r
- }\r
-\r
- ModificationProcessRequest createModificationRequest() {\r
- IModification[] mods = IModification.NONE;\r
- synchronized (writebackQueue) {\r
- mods = writebackQueue.toArray(IModification.NONE);\r
- writebackQueue.clear();\r
- }\r
- return new ModificationProcessRequest(errorHandler, mods);\r
- }\r
-\r
- void dispatchModifications() {\r
- TimeLogger.log("ModificationQueue.dispatchModifications");\r
- writebackProcessor.asyncRequest(createModificationRequest());\r
- }\r
-\r
- public void discardModifications() {\r
- IModification[] discarded = IModification.NONE;\r
- synchronized (writebackQueue) {\r
- discarded = writebackQueue.toArray(discarded);\r
- writebackQueue.clear();\r
- }\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {\r
- System.out.println("--------------------------------------------------------------------------------");\r
- System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");\r
- for (IModification m : discarded) {\r
- System.out.println(" " + m);\r
- }\r
- System.out.println("--------------------------------------------------------------------------------");\r
- }\r
- }\r
-\r
- @Override\r
- public boolean offer(IModification m) {\r
- return offer(m, null);\r
- }\r
-\r
- @Override\r
- public boolean offer(IModification m, IThreadWorkQueue completionThread) {\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Offering graph modification: " + m);\r
- synchronized (writebackQueue) {\r
- if (completionThread != null)\r
- return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));\r
- else\r
- return writebackQueue.add(m);\r
- }\r
- }\r
-\r
- /**\r
- * Asynchronously perform the specified modification, using the\r
- * specified thread for the modification completion event. This method\r
- * will eventually cause an asynchronous write request invocation to the\r
- * backend.\r
- *\r
- * @param m the modification to execute\r
- * @param completionThread the thread to be used for running the\r
- * modification completion event or <code>null</code> to not care\r
- * about the thread.\r
- */\r
- public void async(IModification m, IThreadWorkQueue completionThread) {\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);\r
- synchronized (writebackQueue) {\r
- offer(m, completionThread);\r
- dispatchModifications();\r
- }\r
- }\r
-\r
- /**\r
- * Synchronously perform the specified modification. This method will\r
- * cause a write request invocation to the backend.\r
- *\r
- * @param m the modification to execute\r
- * @throws InterruptedException if the waiting thread is interrupted\r
- */\r
- public void sync(IModification m) throws InterruptedException {\r
-// if (!canvas.getThreadAccess().currentThreadAccess()) {\r
-// throw new IllegalStateException(\r
-// "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "\r
-// + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());\r
-// }\r
-\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);\r
- synchronized (m) {\r
- synchronized (writebackQueue) {\r
- writebackQueue.add(m);\r
- dispatchModifications();\r
- }\r
- while (!m.isComplete()) {\r
- m.wait();\r
- }\r
- m.completed();\r
- }\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);\r
- }\r
-\r
- @Override\r
- public void flush() {\r
- synchronized (writebackQueue) {\r
- // Check if the queue is already empty in which case\r
- // no flushing is needed.\r
- if (writebackQueue.isEmpty())\r
- return;\r
- }\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());\r
- dispatchModifications();\r
- }\r
-\r
- @Override\r
- public void finish() throws InterruptedException {\r
- doFinish();\r
- }\r
-\r
- public void safeFinish(boolean force) {\r
- try {\r
- doFinish();\r
- } catch (InterruptedException e) {\r
- errorHandler.error(e.getMessage(), e);\r
- }\r
- }\r
-\r
- public void doFinish() throws InterruptedException {\r
- synchronized (writebackQueue) {\r
- // Check if the queue is already empty in which case\r
- // no flushing is needed.\r
- if (writebackQueue.isEmpty())\r
- return;\r
- }\r
-\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");\r
-\r
- IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {\r
- @Override\r
- public String toString() {\r
- return "FLUSH_MODIFICATION_QUEUE";\r
- }\r
- };\r
- sync(flush);\r
- if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)\r
- System.out.println(Thread.currentThread() + " Graph modification queue finished");\r
- }\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.diagram.synchronization.graph;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.simantics.db.RequestProcessor;
+import org.simantics.diagram.internal.DebugPolicy;
+import org.simantics.diagram.synchronization.ErrorHandler;
+import org.simantics.diagram.synchronization.IModification;
+import org.simantics.diagram.synchronization.IModificationQueue;
+import org.simantics.diagram.synchronization.ModificationAdapter;
+import org.simantics.diagram.synchronization.ThreadingModificationProxy;
+import org.simantics.utils.logging.TimeLogger;
+import org.simantics.utils.threads.IThreadWorkQueue;
+
+/**
+ * @author Tuukka Lehtonen
+ */
+public class ModificationQueue implements IModificationQueue {
+ private final RequestProcessor writebackProcessor;
+ private final ErrorHandler errorHandler;
+ private final Deque<IModification> writebackQueue = new ArrayDeque<IModification>();
+
+ public ModificationQueue(RequestProcessor writebackProcessor, ErrorHandler errorHandler) {
+ this.writebackProcessor = writebackProcessor;
+ this.errorHandler = errorHandler;
+ }
+
+ public void dispose() {
+ }
+
+ ModificationProcessRequest createModificationRequest() {
+ IModification[] mods = IModification.NONE;
+ synchronized (writebackQueue) {
+ mods = writebackQueue.toArray(IModification.NONE);
+ writebackQueue.clear();
+ }
+ return new ModificationProcessRequest(errorHandler, mods);
+ }
+
+ void dispatchModifications() {
+ TimeLogger.log("ModificationQueue.dispatchModifications");
+ writebackProcessor.asyncRequest(createModificationRequest());
+ }
+
+ public void discardModifications() {
+ IModification[] discarded = IModification.NONE;
+ synchronized (writebackQueue) {
+ discarded = writebackQueue.toArray(discarded);
+ writebackQueue.clear();
+ }
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK) {
+ System.out.println("--------------------------------------------------------------------------------");
+ System.out.println(Thread.currentThread() + " Discarded " + discarded.length + " graph modifications:");
+ for (IModification m : discarded) {
+ System.out.println(" " + m);
+ }
+ System.out.println("--------------------------------------------------------------------------------");
+ }
+ }
+
+ @Override
+ public boolean offer(IModification m) {
+ return offer(m, null);
+ }
+
+ @Override
+ public boolean offer(IModification m, IThreadWorkQueue completionThread) {
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Offering graph modification: " + m);
+ synchronized (writebackQueue) {
+ if (completionThread != null)
+ return writebackQueue.add(new ThreadingModificationProxy(m, completionThread, false));
+ else
+ return writebackQueue.add(m);
+ }
+ }
+
+ /**
+ * Asynchronously perform the specified modification, using the
+ * specified thread for the modification completion event. This method
+ * will eventually cause an asynchronous write request invocation to the
+ * backend.
+ *
+ * @param m the modification to execute
+ * @param completionThread the thread to be used for running the
+ * modification completion event or <code>null</code> to not care
+ * about the thread.
+ */
+ public void async(IModification m, IThreadWorkQueue completionThread) {
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Posting asynchronous graph modification: " + m);
+ synchronized (writebackQueue) {
+ offer(m, completionThread);
+ dispatchModifications();
+ }
+ }
+
+ /**
+ * Synchronously perform the specified modification. This method will
+ * cause a write request invocation to the backend.
+ *
+ * @param m the modification to execute
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public void sync(IModification m) throws InterruptedException {
+// if (!canvas.getThreadAccess().currentThreadAccess()) {
+// throw new IllegalStateException(
+// "Unsafe thread access in GraphToDiagramSynchronizer.syncGraphModification from thread "
+// + Thread.currentThread() + ", expected thread " + canvas.getThreadAccess().getThread());
+// }
+
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Posting synchronous graph modification: " + m);
+ synchronized (m) {
+ synchronized (writebackQueue) {
+ writebackQueue.add(m);
+ dispatchModifications();
+ }
+ while (!m.isComplete()) {
+ m.wait();
+ }
+ m.completed();
+ }
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Synchronous graph modification completed: " + m);
+ }
+
+ @Override
+ public void flush() {
+ synchronized (writebackQueue) {
+ // Check if the queue is already empty in which case
+ // no flushing is needed.
+ if (writebackQueue.isEmpty())
+ return;
+ }
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Flushing graph modification queue of size " + writebackQueue.size());
+ dispatchModifications();
+ }
+
+ @Override
+ public void finish() throws InterruptedException {
+ doFinish();
+ }
+
+ public void safeFinish(boolean force) {
+ try {
+ doFinish();
+ } catch (InterruptedException e) {
+ errorHandler.error(e.getMessage(), e);
+ }
+ }
+
+ public void doFinish() throws InterruptedException {
+ synchronized (writebackQueue) {
+ // Check if the queue is already empty in which case
+ // no flushing is needed.
+ if (writebackQueue.isEmpty())
+ return;
+ }
+
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Waiting for graph modification queue to be finished");
+
+ IModification flush = new ModificationAdapter(ModificationAdapter.LOW_PRIORITY) {
+ @Override
+ public String toString() {
+ return "FLUSH_MODIFICATION_QUEUE";
+ }
+ };
+ sync(flush);
+ if (DebugPolicy.DEBUG_GRAPH_WRITEBACK)
+ System.out.println(Thread.currentThread() + " Graph modification queue finished");
+ }
}
\ No newline at end of file