X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;h=6738771d9eca009289e0a29149bb33254fd54572;hb=refs%2Fchanges%2F38%2F238%2F2;hp=cff196467f5871adbcdff42e86047e870e8d9368;hpb=24e2b34260f219f0d1644ca7a138894980e25b14;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java index cff196467..6738771d9 100644 --- a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java @@ -1,452 +1,452 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -/* - * - * @author Toni Kalajainen - */ -package org.simantics.utils.threads.ua; - -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; - - -/** - * ThreadPool contains a queue of work and a pool of threads processing them. - *

- * A thread can join the pool either as resident or as temporary. - * Temporary thread works until work queue is empty and returns. - * Thread work mode can be set with setMode(). - *

- * Current thread can yield to the service of ThreadPool with work()-invoke. - *

- * A new permanent worker thread can be spawned with newThread(). - * This thread will be released when the ThreadPool is closed or explicitely with - * setMode(thread, ThreadMode.Interrupted); - *

- * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). - *

- * ThreadPool can be closed (or paused) with setState(). - *

- * TODO Use ThreadFactory - * - * @see SynchronizedWorker Use queue to ensure processing order of work. - * @see Worker - * @author Toni Kalajainen (toni.kalajainen@vtt.fi) - */ -public class ThreadPool extends AbstractState implements Worker, StatefulExecutor { - - - private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool"); - /** Constructs daemon threads of "ThreadPool" group */ - private final static ThreadFactory FACTORY = new ThreadFactory() - { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(THREADGROUP, r, "WorkerThread"); - t.setDaemon(true); - return t; - } - }; - - LinkedList queue = new LinkedList(); - Semaphore s = new Semaphore(0); - - Map threads = new HashMap(); - // How many threads may be spawned on demand when new work is queued - int spawnOnDemandLimit = 0; - static ThreadPool INSTANCE; - ThreadFactory threadFactory = FACTORY; - - /** - * Get singleton instance. This pool has 0 resident threads and spawns - * temporary threads up to the number of CPUs in the system. - * NOTE! Do not use this shared resource for blocking actions. - * - * @return singleton instance which has one worker for each core - */ - public synchronized static ThreadPool getInstance() { - if (INSTANCE == null) - INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); - return INSTANCE; - } - - public enum ThreadMode { - Resident, // Works and stays resident (sleeps if no work available) - Temporary, // Works until queue is empty - Interrupted // Stops working right away or after current task - } - - public ThreadPool() { - super(ExecutorState.Active); - } - - /** - * Create thread pool with 0 initial worker threads. - * - * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads. - */ - public ThreadPool(int maxNumberOfTemporaryThreads) { - super(ExecutorState.Active); - setSpawnOnDemandLimit(maxNumberOfTemporaryThreads); - } - - public synchronized int getWorkingThreadCount() - { - int result = 0; - for (ThreadMode m : threads.values()) - if (m != ThreadMode.Interrupted) { - result++; - } - return result; - } - - /** - * Sets the limit for how many threads may be spawned on demand when new - * work is queued. - *

- * WorkPool spawns more threads when new work is queued and - * the number of working threads goes below this value. - *

- * New thread is spawned if number of threads is less than queued word and - * the number of threads is less than limit value. - * - * @param limit on demand thread limit - */ - public void setSpawnOnDemandLimit(int limit) - { - this.spawnOnDemandLimit = limit; - } - - /** - * Makes current thread yield to the service of the thread pool. - * Service can be ended by invoking setMode(ThreadMode.Interrupted). - * - * @param stayResident if true works until interrupted, if false works until queue is empty - */ - public void work(boolean stayResident) - { - Thread t = Thread.currentThread(); - synchronized(this) { - threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); - } - for (;;) { - synchronized(this) { - ThreadMode mode = getThreadMode(t); - ExecutorState state = getState(); - - // Temporary threads can return if not working right now - if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) - mode = ThreadMode.Interrupted; - - if (mode == null || mode == ThreadMode.Interrupted) break; - - // Do not do anything if the worker is shut down - if (state==ExecutorState.Terminated) break; - - - if (state==ExecutorState.Paused) { - // Remove temporary threads if paused - if (mode==ThreadMode.Temporary) break; - if (mode==ThreadMode.Resident) { - try { - waitForState(ExecutorState.NON_PAUSED_STATES); - } catch (InterruptedException e) { - } - continue; - } - } - - } - try { - //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue; - s.acquire(); - WorkImpl work; - synchronized(this) { - work = queue.removeFirst(); - } - try { - work.setState(WorkState.Working); - work.getRunnable().run(); - work.setState(WorkState.Complete); - } catch (RuntimeException e) { - work.setError(e); - work.setState(WorkState.Error); - } - } catch (InterruptedException e) { - } - } - // Go to shutdown state? - boolean goToShutdown = false; - synchronized(this) { - threads.remove(t); - // If this is last thread and queue is empty, then go to shutdown state - goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty(); - } - // Go to shutdown state. - if (goToShutdown) - super.setState(ExecutorState.Terminated); - } - - Runnable residentRun = new Runnable() - { - @Override - public void run() { - work(true); - } - }; - - Runnable tempRun = new Runnable() - { - @Override - public void run() { - work(false); - } - }; - - /** - * Spawn new thread to the service of the thread pool - * @param stayResident - * @return new running thread - */ - public Thread newThread(boolean stayResident) - { - Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread"); - - synchronized(this) { - threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); - } - t.start(); - return t; - } - - public synchronized ThreadMode getThreadMode(Thread t) - { - return threads.get(t); - } - - public synchronized void setMode(Thread t, ThreadMode tm) - { - if (t==null || tm==null) throw new IllegalArgumentException(); - ThreadMode oldMode = threads.get(t); - if (oldMode == tm) return; - threads.put(t, tm); - if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/) - { - t.interrupt(); - } - if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) - { - t.interrupt(); - } - } - - /** - * Is work queue empty - * @return true if the work queue is empty - */ - public synchronized boolean isEmpty() - { - return queue.isEmpty(); - } - - void interrupt(Thread t) - { - t.interrupt(); - } - - public synchronized boolean remove(WorkImpl r) - { - return queue.remove(r); - } - - @Override - public WorkMonitor asyncExec(Runnable runnable, StateListener listener) - throws WorkerClosedException - { - WorkImpl w = null; - synchronized(this) { - if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState())) - throw new WorkerClosedException(); - w = new WorkImpl(runnable); - if (listener!=null) - w.addStateListener(listener); - queue.add(w); - spawnTempThreads(); - } - s.release(); - return w; - } - - - // Raise visibility - @Override - public boolean setState(ExecutorState state) { - // If worker is already shutdown then it must not be reactivated - if (getState() == ExecutorState.Terminated) - return false; - synchronized(this) { - // Go directly to shutdown if queue is empty and there are no threads - if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown) - { - if (threads.isEmpty() && queue.isEmpty()) { - super.setState( ExecutorState.Shutdown ); - super.setState( ExecutorState.Terminated ); - } else { - super.setState( ExecutorState.Shutdown ); - } - return true; - } - } - return super.setState(state); - } - - /** - * Cancel all work. - * @param mayInterrupt - */ - public synchronized void cancelAll(boolean mayInterrupt) - { - for (WorkMonitor w : queue) - w.cancel(mayInterrupt); - queue.clear(); - } - - @Override - protected void onStateTransition(ExecutorState oldState, - ExecutorState newState) { - if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState)) - spawnTempThreads(); - } - - private synchronized void spawnTempThreads() - { - if (!ExecutorState.WORKING_STATES.contains(getState())) return; - if (spawnOnDemandLimit>0) { - int workers = getWorkingThreadCount(); - int n = spawnOnDemandLimit - workers; - for (int i=0; i result) { - result.addAll(queue); - } - - - public static void main(String[] args) throws InterruptedException { - ThreadPool pool = new ThreadPool(2); - - pool.addStateListener(new StateListener() { - @Override - public void onStateTransition(IStatefulObject monitor, ExecutorState oldState, ExecutorState newState) { - System.out.println("Worker State: "+newState); - }}); - - StateListener l = new StateListener() { - @Override - public void onStateTransition(IStatefulObject monitor, WorkState oldState, WorkState newState) { - System.out.println("Work ("+monitor+"): "+newState); - } - }; - - Runnable[] r = new Runnable[10]; - WorkMonitor[] m = new WorkMonitor[10]; - for (int i=0; i getQueue() { -// return queue; -// } - - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +/* + * + * @author Toni Kalajainen + */ +package org.simantics.utils.threads.ua; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; + + +/** + * ThreadPool contains a queue of work and a pool of threads processing them. + *

+ * A thread can join the pool either as resident or as temporary. + * Temporary thread works until work queue is empty and returns. + * Thread work mode can be set with setMode(). + *

+ * Current thread can yield to the service of ThreadPool with work()-invoke. + *

+ * A new permanent worker thread can be spawned with newThread(). + * This thread will be released when the ThreadPool is closed or explicitely with + * setMode(thread, ThreadMode.Interrupted); + *

+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). + *

+ * ThreadPool can be closed (or paused) with setState(). + *

+ * TODO Use ThreadFactory + * + * @see SynchronizedWorker Use queue to ensure processing order of work. + * @see Worker + * @author Toni Kalajainen (toni.kalajainen@vtt.fi) + */ +public class ThreadPool extends AbstractState implements Worker, StatefulExecutor { + + + private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool"); + /** Constructs daemon threads of "ThreadPool" group */ + private final static ThreadFactory FACTORY = new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(THREADGROUP, r, "WorkerThread"); + t.setDaemon(true); + return t; + } + }; + + LinkedList queue = new LinkedList(); + Semaphore s = new Semaphore(0); + + Map threads = new HashMap(); + // How many threads may be spawned on demand when new work is queued + int spawnOnDemandLimit = 0; + static ThreadPool INSTANCE; + ThreadFactory threadFactory = FACTORY; + + /** + * Get singleton instance. This pool has 0 resident threads and spawns + * temporary threads up to the number of CPUs in the system. + * NOTE! Do not use this shared resource for blocking actions. + * + * @return singleton instance which has one worker for each core + */ + public synchronized static ThreadPool getInstance() { + if (INSTANCE == null) + INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); + return INSTANCE; + } + + public enum ThreadMode { + Resident, // Works and stays resident (sleeps if no work available) + Temporary, // Works until queue is empty + Interrupted // Stops working right away or after current task + } + + public ThreadPool() { + super(ExecutorState.Active); + } + + /** + * Create thread pool with 0 initial worker threads. + * + * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads. + */ + public ThreadPool(int maxNumberOfTemporaryThreads) { + super(ExecutorState.Active); + setSpawnOnDemandLimit(maxNumberOfTemporaryThreads); + } + + public synchronized int getWorkingThreadCount() + { + int result = 0; + for (ThreadMode m : threads.values()) + if (m != ThreadMode.Interrupted) { + result++; + } + return result; + } + + /** + * Sets the limit for how many threads may be spawned on demand when new + * work is queued. + *

+ * WorkPool spawns more threads when new work is queued and + * the number of working threads goes below this value. + *

+ * New thread is spawned if number of threads is less than queued word and + * the number of threads is less than limit value. + * + * @param limit on demand thread limit + */ + public void setSpawnOnDemandLimit(int limit) + { + this.spawnOnDemandLimit = limit; + } + + /** + * Makes current thread yield to the service of the thread pool. + * Service can be ended by invoking setMode(ThreadMode.Interrupted). + * + * @param stayResident if true works until interrupted, if false works until queue is empty + */ + public void work(boolean stayResident) + { + Thread t = Thread.currentThread(); + synchronized(this) { + threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); + } + for (;;) { + synchronized(this) { + ThreadMode mode = getThreadMode(t); + ExecutorState state = getState(); + + // Temporary threads can return if not working right now + if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) + mode = ThreadMode.Interrupted; + + if (mode == null || mode == ThreadMode.Interrupted) break; + + // Do not do anything if the worker is shut down + if (state==ExecutorState.Terminated) break; + + + if (state==ExecutorState.Paused) { + // Remove temporary threads if paused + if (mode==ThreadMode.Temporary) break; + if (mode==ThreadMode.Resident) { + try { + waitForState(ExecutorState.NON_PAUSED_STATES); + } catch (InterruptedException e) { + } + continue; + } + } + + } + try { + //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue; + s.acquire(); + WorkImpl work; + synchronized(this) { + work = queue.removeFirst(); + } + try { + work.setState(WorkState.Working); + work.getRunnable().run(); + work.setState(WorkState.Complete); + } catch (RuntimeException e) { + work.setError(e); + work.setState(WorkState.Error); + } + } catch (InterruptedException e) { + } + } + // Go to shutdown state? + boolean goToShutdown = false; + synchronized(this) { + threads.remove(t); + // If this is last thread and queue is empty, then go to shutdown state + goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty(); + } + // Go to shutdown state. + if (goToShutdown) + super.setState(ExecutorState.Terminated); + } + + Runnable residentRun = new Runnable() + { + @Override + public void run() { + work(true); + } + }; + + Runnable tempRun = new Runnable() + { + @Override + public void run() { + work(false); + } + }; + + /** + * Spawn new thread to the service of the thread pool + * @param stayResident + * @return new running thread + */ + public Thread newThread(boolean stayResident) + { + Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread"); + + synchronized(this) { + threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); + } + t.start(); + return t; + } + + public synchronized ThreadMode getThreadMode(Thread t) + { + return threads.get(t); + } + + public synchronized void setMode(Thread t, ThreadMode tm) + { + if (t==null || tm==null) throw new IllegalArgumentException(); + ThreadMode oldMode = threads.get(t); + if (oldMode == tm) return; + threads.put(t, tm); + if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/) + { + t.interrupt(); + } + if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) + { + t.interrupt(); + } + } + + /** + * Is work queue empty + * @return true if the work queue is empty + */ + public synchronized boolean isEmpty() + { + return queue.isEmpty(); + } + + void interrupt(Thread t) + { + t.interrupt(); + } + + public synchronized boolean remove(WorkImpl r) + { + return queue.remove(r); + } + + @Override + public WorkMonitor asyncExec(Runnable runnable, StateListener listener) + throws WorkerClosedException + { + WorkImpl w = null; + synchronized(this) { + if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState())) + throw new WorkerClosedException(); + w = new WorkImpl(runnable); + if (listener!=null) + w.addStateListener(listener); + queue.add(w); + spawnTempThreads(); + } + s.release(); + return w; + } + + + // Raise visibility + @Override + public boolean setState(ExecutorState state) { + // If worker is already shutdown then it must not be reactivated + if (getState() == ExecutorState.Terminated) + return false; + synchronized(this) { + // Go directly to shutdown if queue is empty and there are no threads + if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown) + { + if (threads.isEmpty() && queue.isEmpty()) { + super.setState( ExecutorState.Shutdown ); + super.setState( ExecutorState.Terminated ); + } else { + super.setState( ExecutorState.Shutdown ); + } + return true; + } + } + return super.setState(state); + } + + /** + * Cancel all work. + * @param mayInterrupt + */ + public synchronized void cancelAll(boolean mayInterrupt) + { + for (WorkMonitor w : queue) + w.cancel(mayInterrupt); + queue.clear(); + } + + @Override + protected void onStateTransition(ExecutorState oldState, + ExecutorState newState) { + if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState)) + spawnTempThreads(); + } + + private synchronized void spawnTempThreads() + { + if (!ExecutorState.WORKING_STATES.contains(getState())) return; + if (spawnOnDemandLimit>0) { + int workers = getWorkingThreadCount(); + int n = spawnOnDemandLimit - workers; + for (int i=0; i result) { + result.addAll(queue); + } + + + public static void main(String[] args) throws InterruptedException { + ThreadPool pool = new ThreadPool(2); + + pool.addStateListener(new StateListener() { + @Override + public void onStateTransition(IStatefulObject monitor, ExecutorState oldState, ExecutorState newState) { + System.out.println("Worker State: "+newState); + }}); + + StateListener l = new StateListener() { + @Override + public void onStateTransition(IStatefulObject monitor, WorkState oldState, WorkState newState) { + System.out.println("Work ("+monitor+"): "+newState); + } + }; + + Runnable[] r = new Runnable[10]; + WorkMonitor[] m = new WorkMonitor[10]; + for (int i=0; i getQueue() { +// return queue; +// } + + +}