/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.utils.threads.ua; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Add stateful monitoring features to runnable and callable. *

* Executing two work instances in parallel causes RuntimeException. * * @author Toni Kalajainen (toni.kalajainen@vtt.fi) */ public class Work extends AbstractState implements WorkMonitor, RunnableFuture { public static Work createMonitor(Runnable r) { return new Work(r); } public static Work createMonitor(Runnable r, StateListener listener) { Work result = new Work(r); result.addStateListener(listener); return result; } public static Work createMonitor(Callable r) { return new Work(r); } public static Work createMonitor(Callable r, StateListener listener) { Work result = new Work(r); result.addStateListener(listener); return result; } Callable c; Runnable r; Thread thread; transient Exception error; transient T result; boolean canceled; public Work(Runnable runnable) { super(WorkState.Ready); if (runnable == null) throw new IllegalArgumentException("null arg"); this.r = runnable; } public Work(Runnable runnable, T result) { super(WorkState.Ready); if (runnable == null) throw new IllegalArgumentException("null arg"); this.r = runnable; this.result = result; } public Work(Callable c) { super(WorkState.Ready); if (c == null) throw new IllegalArgumentException("null arg"); this.c = c; } @Override public void run() { WorkState s = getState(); if (s!=WorkState.Ready) throw new RuntimeException("Work must be restarted before it can be reused"); thread = Thread.currentThread(); if (setState(WorkState.Working, null, WorkState.READY_STATE)==WorkState.Working) return; try { if (r!=null) r.run(); else result = c.call(); } catch (Exception e) { error = e; setState(WorkState.Error); } WorkState newState = WorkState.Complete; synchronized(this) { s = getState(); if (s==WorkState.Interrupting) { Thread.interrupted(); newState = WorkState.Interrupted; } } setState(newState); } @Override public boolean cancel(boolean mayInterruptIfRunning) { synchronized(this) { WorkState s = getState(); if (s != WorkState.Ready && s != WorkState.Working) return false; } // Attempt cancel if (attemptSetState(WorkState.READY_STATE, WorkState.Canceled)==WorkState.Ready) { canceled |= true; return true; } // Attempt interrupt if (mayInterruptIfRunning && attemptSetState(WorkState.WORKING_STATE, WorkState.Interrupting)==WorkState.Working) { canceled |= true; thread.interrupt(); return true; } return false; } @Override protected boolean isStateTransitionAllowed(WorkState oldState, WorkState newState) { return true; } // Raise visibility @Override public void setError(RuntimeException error) { super.setError(error); } /** * Reset work for reuse. */ public synchronized void restart() { WorkState s = getState(); if (s==WorkState.Ready) return; if (!s.isFinalState()) throw new RuntimeException("Work cannot be restarted until the previous run has completed."); setState(WorkState.Ready); } @Override public Runnable getRunnable() { return r; } @Override public T get() throws InterruptedException, ExecutionException { WorkState s = waitForState(WorkState.FINAL_STATES); if (s==WorkState.Canceled) throw new CancellationException(); if (s==WorkState.Interrupted) throw new InterruptedException(); if (s==WorkState.Error) throw new ExecutionException(error); return result; } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { WorkState s = waitForState(WorkState.FINAL_STATES, timeout, unit); if (s==WorkState.Canceled) throw new CancellationException(); if (s==WorkState.Interrupted) throw new InterruptedException(); if (s==WorkState.Error) throw new ExecutionException(error); return result; } @Override public boolean isCancelled() { return canceled; } @Override public boolean isDone() { return WorkState.FINAL_STATES.contains(getState()); } }