-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-\r
-package org.simantics.utils.threads.ua;\r
-\r
-import java.util.concurrent.Callable;\r
-import java.util.concurrent.CancellationException;\r
-import java.util.concurrent.ExecutionException;\r
-import java.util.concurrent.RunnableFuture;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.TimeoutException;\r
-\r
-/**\r
- * Add stateful monitoring features to runnable and callable.\r
- * <p>\r
- * Executing two work instances in parallel causes RuntimeException. \r
- * \r
- * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
- */\r
-public class Work<T> extends AbstractState<WorkState, RuntimeException> implements WorkMonitor, RunnableFuture<T> {\r
- \r
- public static Work<Object> createMonitor(Runnable r)\r
- {\r
- return new Work<Object>(r);\r
- }\r
-\r
- public static Work<Object> createMonitor(Runnable r, StateListener<WorkState> listener)\r
- {\r
- Work<Object> result = new Work<Object>(r);\r
- result.addStateListener(listener);\r
- return result;\r
- }\r
- \r
- public static <T> Work<T> createMonitor(Callable<T> r)\r
- {\r
- return new Work<T>(r);\r
- }\r
-\r
- public static <T> Work<T> createMonitor(Callable<T> r, StateListener<WorkState> listener)\r
- {\r
- Work<T> result = new Work<T>(r);\r
- result.addStateListener(listener);\r
- return result;\r
- }\r
- \r
- \r
- Callable<T> c; \r
- Runnable r;\r
- Thread thread;\r
- transient Exception error;\r
- transient T result;\r
- boolean canceled;\r
- \r
- public Work(Runnable runnable) {\r
- super(WorkState.Ready);\r
- if (runnable == null)\r
- throw new IllegalArgumentException("null arg");\r
- \r
- this.r = runnable;\r
- }\r
- \r
- public Work(Runnable runnable, T result) {\r
- super(WorkState.Ready);\r
- if (runnable == null)\r
- throw new IllegalArgumentException("null arg");\r
- \r
- this.r = runnable;\r
- this.result = result;\r
- }\r
- \r
- public Work(Callable<T> c) {\r
- super(WorkState.Ready);\r
- if (c == null)\r
- throw new IllegalArgumentException("null arg");\r
- \r
- this.c = c;\r
- }\r
-\r
- @Override\r
- public void run() {\r
- WorkState s = getState();\r
- if (s!=WorkState.Ready) \r
- throw new RuntimeException("Work must be restarted before it can be reused");\r
- thread = Thread.currentThread();\r
- if (setState(WorkState.Working, null, WorkState.READY_STATE)==WorkState.Working) return;\r
- try { \r
- if (r!=null)\r
- r.run();\r
- else \r
- result = c.call();\r
- } catch (Exception e) {\r
- error = e;\r
- setState(WorkState.Error);\r
- }\r
- WorkState newState = WorkState.Complete;\r
- synchronized(this) {\r
- s = getState();\r
- if (s==WorkState.Interrupting) {\r
- Thread.interrupted();\r
- newState = WorkState.Interrupted;\r
- }\r
- }\r
- setState(newState);\r
- }\r
-\r
- @Override\r
- public boolean cancel(boolean mayInterruptIfRunning) {\r
- synchronized(this) {\r
- WorkState s = getState();\r
- if (s != WorkState.Ready && s != WorkState.Working) return false;\r
- }\r
- // Attempt cancel\r
- if (attemptSetState(WorkState.READY_STATE, WorkState.Canceled)==WorkState.Ready) {\r
- canceled |= true;\r
- return true;\r
- }\r
- // Attempt interrupt\r
- if (mayInterruptIfRunning && \r
- attemptSetState(WorkState.WORKING_STATE, WorkState.Interrupting)==WorkState.Working)\r
- {\r
- canceled |= true;\r
- thread.interrupt();\r
- return true;\r
- }\r
- return false;\r
- }\r
- \r
- @Override\r
- protected boolean isStateTransitionAllowed(WorkState oldState, WorkState newState) {\r
- return true;\r
- }\r
-\r
- // Raise visibility\r
- @Override\r
- public void setError(RuntimeException error)\r
- {\r
- super.setError(error);\r
- }\r
-\r
- /**\r
- * Reset work for reuse. \r
- */\r
- public synchronized void restart() \r
- {\r
- WorkState s = getState();\r
- if (s==WorkState.Ready) return;\r
- if (!s.isFinalState())\r
- throw new RuntimeException("Work cannot be restarted until the previous run has completed.");\r
- setState(WorkState.Ready);\r
- }\r
-\r
- @Override\r
- public Runnable getRunnable() {\r
- return r;\r
- }\r
-\r
- @Override\r
- public T get() throws InterruptedException, ExecutionException {\r
- WorkState s = waitForState(WorkState.FINAL_STATES);\r
- if (s==WorkState.Canceled)\r
- throw new CancellationException();\r
- if (s==WorkState.Interrupted)\r
- throw new InterruptedException();\r
- if (s==WorkState.Error)\r
- throw new ExecutionException(error); \r
- return result;\r
- }\r
-\r
- @Override\r
- public T get(long timeout, TimeUnit unit) throws InterruptedException,\r
- ExecutionException, TimeoutException {\r
- WorkState s = waitForState(WorkState.FINAL_STATES, timeout, unit);\r
- if (s==WorkState.Canceled)\r
- throw new CancellationException();\r
- if (s==WorkState.Interrupted)\r
- throw new InterruptedException();\r
- if (s==WorkState.Error)\r
- throw new ExecutionException(error); \r
- return result;\r
- }\r
-\r
- @Override\r
- public boolean isCancelled() {\r
- return canceled;\r
- }\r
-\r
- @Override\r
- public boolean isDone() {\r
- return WorkState.FINAL_STATES.contains(getState());\r
- }\r
- \r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+
+package org.simantics.utils.threads.ua;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Add stateful monitoring features to runnable and callable.
+ * <p>
+ * Executing two work instances in parallel causes RuntimeException.
+ *
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
+ */
+public class Work<T> extends AbstractState<WorkState, RuntimeException> implements WorkMonitor, RunnableFuture<T> {
+
+ public static Work<Object> createMonitor(Runnable r)
+ {
+ return new Work<Object>(r);
+ }
+
+ public static Work<Object> createMonitor(Runnable r, StateListener<WorkState> listener)
+ {
+ Work<Object> result = new Work<Object>(r);
+ result.addStateListener(listener);
+ return result;
+ }
+
+ public static <T> Work<T> createMonitor(Callable<T> r)
+ {
+ return new Work<T>(r);
+ }
+
+ public static <T> Work<T> createMonitor(Callable<T> r, StateListener<WorkState> listener)
+ {
+ Work<T> result = new Work<T>(r);
+ result.addStateListener(listener);
+ return result;
+ }
+
+
+ Callable<T> c;
+ Runnable r;
+ Thread thread;
+ transient Exception error;
+ transient T result;
+ boolean canceled;
+
+ public Work(Runnable runnable) {
+ super(WorkState.Ready);
+ if (runnable == null)
+ throw new IllegalArgumentException("null arg");
+
+ this.r = runnable;
+ }
+
+ public Work(Runnable runnable, T result) {
+ super(WorkState.Ready);
+ if (runnable == null)
+ throw new IllegalArgumentException("null arg");
+
+ this.r = runnable;
+ this.result = result;
+ }
+
+ public Work(Callable<T> c) {
+ super(WorkState.Ready);
+ if (c == null)
+ throw new IllegalArgumentException("null arg");
+
+ this.c = c;
+ }
+
+ @Override
+ public void run() {
+ WorkState s = getState();
+ if (s!=WorkState.Ready)
+ throw new RuntimeException("Work must be restarted before it can be reused");
+ thread = Thread.currentThread();
+ if (setState(WorkState.Working, null, WorkState.READY_STATE)==WorkState.Working) return;
+ try {
+ if (r!=null)
+ r.run();
+ else
+ result = c.call();
+ } catch (Exception e) {
+ error = e;
+ setState(WorkState.Error);
+ }
+ WorkState newState = WorkState.Complete;
+ synchronized(this) {
+ s = getState();
+ if (s==WorkState.Interrupting) {
+ Thread.interrupted();
+ newState = WorkState.Interrupted;
+ }
+ }
+ setState(newState);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized(this) {
+ WorkState s = getState();
+ if (s != WorkState.Ready && s != WorkState.Working) return false;
+ }
+ // Attempt cancel
+ if (attemptSetState(WorkState.READY_STATE, WorkState.Canceled)==WorkState.Ready) {
+ canceled |= true;
+ return true;
+ }
+ // Attempt interrupt
+ if (mayInterruptIfRunning &&
+ attemptSetState(WorkState.WORKING_STATE, WorkState.Interrupting)==WorkState.Working)
+ {
+ canceled |= true;
+ thread.interrupt();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean isStateTransitionAllowed(WorkState oldState, WorkState newState) {
+ return true;
+ }
+
+ // Raise visibility
+ @Override
+ public void setError(RuntimeException error)
+ {
+ super.setError(error);
+ }
+
+ /**
+ * Reset work for reuse.
+ */
+ public synchronized void restart()
+ {
+ WorkState s = getState();
+ if (s==WorkState.Ready) return;
+ if (!s.isFinalState())
+ throw new RuntimeException("Work cannot be restarted until the previous run has completed.");
+ setState(WorkState.Ready);
+ }
+
+ @Override
+ public Runnable getRunnable() {
+ return r;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ WorkState s = waitForState(WorkState.FINAL_STATES);
+ if (s==WorkState.Canceled)
+ throw new CancellationException();
+ if (s==WorkState.Interrupted)
+ throw new InterruptedException();
+ if (s==WorkState.Error)
+ throw new ExecutionException(error);
+ return result;
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ WorkState s = waitForState(WorkState.FINAL_STATES, timeout, unit);
+ if (s==WorkState.Canceled)
+ throw new CancellationException();
+ if (s==WorkState.Interrupted)
+ throw new InterruptedException();
+ if (s==WorkState.Error)
+ throw new ExecutionException(error);
+ return result;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return WorkState.FINAL_STATES.contains(getState());
+ }
+
+}