X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;fp=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;h=cff196467f5871adbcdff42e86047e870e8d9368;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java new file mode 100644 index 000000000..cff196467 --- /dev/null +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java @@ -0,0 +1,452 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +/* + * + * @author Toni Kalajainen + */ +package org.simantics.utils.threads.ua; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; + + +/** + * ThreadPool contains a queue of work and a pool of threads processing them. + *

+ * A thread can join the pool either as resident or as temporary. + * Temporary thread works until work queue is empty and returns. + * Thread work mode can be set with setMode(). + *

+ * Current thread can yield to the service of ThreadPool with work()-invoke. + *

+ * A new permanent worker thread can be spawned with newThread(). + * This thread will be released when the ThreadPool is closed or explicitely with + * setMode(thread, ThreadMode.Interrupted); + *

+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). + *

+ * ThreadPool can be closed (or paused) with setState(). + *

+ * TODO Use ThreadFactory + * + * @see SynchronizedWorker Use queue to ensure processing order of work. + * @see Worker + * @author Toni Kalajainen (toni.kalajainen@vtt.fi) + */ +public class ThreadPool extends AbstractState implements Worker, StatefulExecutor { + + + private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool"); + /** Constructs daemon threads of "ThreadPool" group */ + private final static ThreadFactory FACTORY = new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(THREADGROUP, r, "WorkerThread"); + t.setDaemon(true); + return t; + } + }; + + LinkedList queue = new LinkedList(); + Semaphore s = new Semaphore(0); + + Map threads = new HashMap(); + // How many threads may be spawned on demand when new work is queued + int spawnOnDemandLimit = 0; + static ThreadPool INSTANCE; + ThreadFactory threadFactory = FACTORY; + + /** + * Get singleton instance. This pool has 0 resident threads and spawns + * temporary threads up to the number of CPUs in the system. + * NOTE! Do not use this shared resource for blocking actions. + * + * @return singleton instance which has one worker for each core + */ + public synchronized static ThreadPool getInstance() { + if (INSTANCE == null) + INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); + return INSTANCE; + } + + public enum ThreadMode { + Resident, // Works and stays resident (sleeps if no work available) + Temporary, // Works until queue is empty + Interrupted // Stops working right away or after current task + } + + public ThreadPool() { + super(ExecutorState.Active); + } + + /** + * Create thread pool with 0 initial worker threads. + * + * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads. + */ + public ThreadPool(int maxNumberOfTemporaryThreads) { + super(ExecutorState.Active); + setSpawnOnDemandLimit(maxNumberOfTemporaryThreads); + } + + public synchronized int getWorkingThreadCount() + { + int result = 0; + for (ThreadMode m : threads.values()) + if (m != ThreadMode.Interrupted) { + result++; + } + return result; + } + + /** + * Sets the limit for how many threads may be spawned on demand when new + * work is queued. + *

+ * WorkPool spawns more threads when new work is queued and + * the number of working threads goes below this value. + *

+ * New thread is spawned if number of threads is less than queued word and + * the number of threads is less than limit value. + * + * @param limit on demand thread limit + */ + public void setSpawnOnDemandLimit(int limit) + { + this.spawnOnDemandLimit = limit; + } + + /** + * Makes current thread yield to the service of the thread pool. + * Service can be ended by invoking setMode(ThreadMode.Interrupted). + * + * @param stayResident if true works until interrupted, if false works until queue is empty + */ + public void work(boolean stayResident) + { + Thread t = Thread.currentThread(); + synchronized(this) { + threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); + } + for (;;) { + synchronized(this) { + ThreadMode mode = getThreadMode(t); + ExecutorState state = getState(); + + // Temporary threads can return if not working right now + if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) + mode = ThreadMode.Interrupted; + + if (mode == null || mode == ThreadMode.Interrupted) break; + + // Do not do anything if the worker is shut down + if (state==ExecutorState.Terminated) break; + + + if (state==ExecutorState.Paused) { + // Remove temporary threads if paused + if (mode==ThreadMode.Temporary) break; + if (mode==ThreadMode.Resident) { + try { + waitForState(ExecutorState.NON_PAUSED_STATES); + } catch (InterruptedException e) { + } + continue; + } + } + + } + try { + //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue; + s.acquire(); + WorkImpl work; + synchronized(this) { + work = queue.removeFirst(); + } + try { + work.setState(WorkState.Working); + work.getRunnable().run(); + work.setState(WorkState.Complete); + } catch (RuntimeException e) { + work.setError(e); + work.setState(WorkState.Error); + } + } catch (InterruptedException e) { + } + } + // Go to shutdown state? + boolean goToShutdown = false; + synchronized(this) { + threads.remove(t); + // If this is last thread and queue is empty, then go to shutdown state + goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty(); + } + // Go to shutdown state. + if (goToShutdown) + super.setState(ExecutorState.Terminated); + } + + Runnable residentRun = new Runnable() + { + @Override + public void run() { + work(true); + } + }; + + Runnable tempRun = new Runnable() + { + @Override + public void run() { + work(false); + } + }; + + /** + * Spawn new thread to the service of the thread pool + * @param stayResident + * @return new running thread + */ + public Thread newThread(boolean stayResident) + { + Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread"); + + synchronized(this) { + threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); + } + t.start(); + return t; + } + + public synchronized ThreadMode getThreadMode(Thread t) + { + return threads.get(t); + } + + public synchronized void setMode(Thread t, ThreadMode tm) + { + if (t==null || tm==null) throw new IllegalArgumentException(); + ThreadMode oldMode = threads.get(t); + if (oldMode == tm) return; + threads.put(t, tm); + if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/) + { + t.interrupt(); + } + if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) + { + t.interrupt(); + } + } + + /** + * Is work queue empty + * @return true if the work queue is empty + */ + public synchronized boolean isEmpty() + { + return queue.isEmpty(); + } + + void interrupt(Thread t) + { + t.interrupt(); + } + + public synchronized boolean remove(WorkImpl r) + { + return queue.remove(r); + } + + @Override + public WorkMonitor asyncExec(Runnable runnable, StateListener listener) + throws WorkerClosedException + { + WorkImpl w = null; + synchronized(this) { + if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState())) + throw new WorkerClosedException(); + w = new WorkImpl(runnable); + if (listener!=null) + w.addStateListener(listener); + queue.add(w); + spawnTempThreads(); + } + s.release(); + return w; + } + + + // Raise visibility + @Override + public boolean setState(ExecutorState state) { + // If worker is already shutdown then it must not be reactivated + if (getState() == ExecutorState.Terminated) + return false; + synchronized(this) { + // Go directly to shutdown if queue is empty and there are no threads + if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown) + { + if (threads.isEmpty() && queue.isEmpty()) { + super.setState( ExecutorState.Shutdown ); + super.setState( ExecutorState.Terminated ); + } else { + super.setState( ExecutorState.Shutdown ); + } + return true; + } + } + return super.setState(state); + } + + /** + * Cancel all work. + * @param mayInterrupt + */ + public synchronized void cancelAll(boolean mayInterrupt) + { + for (WorkMonitor w : queue) + w.cancel(mayInterrupt); + queue.clear(); + } + + @Override + protected void onStateTransition(ExecutorState oldState, + ExecutorState newState) { + if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState)) + spawnTempThreads(); + } + + private synchronized void spawnTempThreads() + { + if (!ExecutorState.WORKING_STATES.contains(getState())) return; + if (spawnOnDemandLimit>0) { + int workers = getWorkingThreadCount(); + int n = spawnOnDemandLimit - workers; + for (int i=0; i result) { + result.addAll(queue); + } + + + public static void main(String[] args) throws InterruptedException { + ThreadPool pool = new ThreadPool(2); + + pool.addStateListener(new StateListener() { + @Override + public void onStateTransition(IStatefulObject monitor, ExecutorState oldState, ExecutorState newState) { + System.out.println("Worker State: "+newState); + }}); + + StateListener l = new StateListener() { + @Override + public void onStateTransition(IStatefulObject monitor, WorkState oldState, WorkState newState) { + System.out.println("Work ("+monitor+"): "+newState); + } + }; + + Runnable[] r = new Runnable[10]; + WorkMonitor[] m = new WorkMonitor[10]; + for (int i=0; i getQueue() { +// return queue; +// } + + +}