X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;fp=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;h=cff196467f5871adbcdff42e86047e870e8d9368;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java new file mode 100644 index 000000000..cff196467 --- /dev/null +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java @@ -0,0 +1,452 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +/* + * + * @author Toni Kalajainen + */ +package org.simantics.utils.threads.ua; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; + + +/** + * ThreadPool contains a queue of work and a pool of threads processing them. + *
+ * A thread can join the pool either as resident or as temporary. + * Temporary thread works until work queue is empty and returns. + * Thread work mode can be set with setMode(). + *
+ * Current thread can yield to the service of ThreadPool with work()-invoke. + *
+ * A new permanent worker thread can be spawned with newThread(). + * This thread will be released when the ThreadPool is closed or explicitely with + * setMode(thread, ThreadMode.Interrupted); + *
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). + *
+ * ThreadPool can be closed (or paused) with setState(). + *
+ * TODO Use ThreadFactory
+ *
+ * @see SynchronizedWorker Use queue to ensure processing order of work.
+ * @see Worker
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
+ */
+public class ThreadPool extends AbstractState
+ * WorkPool spawns more threads when new work is queued and
+ * the number of working threads goes below this value.
+ *
+ * New thread is spawned if number of threads is less than queued word and
+ * the number of threads is less than limit
value.
+ *
+ * @param limit on demand thread limit
+ */
+ public void setSpawnOnDemandLimit(int limit)
+ {
+ this.spawnOnDemandLimit = limit;
+ }
+
+ /**
+ * Makes current thread yield to the service of the thread pool.
+ * Service can be ended by invoking setMode(ThreadMode.Interrupted).
+ *
+ * @param stayResident if true works until interrupted, if false works until queue is empty
+ */
+ public void work(boolean stayResident)
+ {
+ Thread t = Thread.currentThread();
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ for (;;) {
+ synchronized(this) {
+ ThreadMode mode = getThreadMode(t);
+ ExecutorState state = getState();
+
+ // Temporary threads can return if not working right now
+ if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
+ mode = ThreadMode.Interrupted;
+
+ if (mode == null || mode == ThreadMode.Interrupted) break;
+
+ // Do not do anything if the worker is shut down
+ if (state==ExecutorState.Terminated) break;
+
+
+ if (state==ExecutorState.Paused) {
+ // Remove temporary threads if paused
+ if (mode==ThreadMode.Temporary) break;
+ if (mode==ThreadMode.Resident) {
+ try {
+ waitForState(ExecutorState.NON_PAUSED_STATES);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+ }
+
+ }
+ try {
+ //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
+ s.acquire();
+ WorkImpl work;
+ synchronized(this) {
+ work = queue.removeFirst();
+ }
+ try {
+ work.setState(WorkState.Working);
+ work.getRunnable().run();
+ work.setState(WorkState.Complete);
+ } catch (RuntimeException e) {
+ work.setError(e);
+ work.setState(WorkState.Error);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ // Go to shutdown state?
+ boolean goToShutdown = false;
+ synchronized(this) {
+ threads.remove(t);
+ // If this is last thread and queue is empty, then go to shutdown state
+ goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
+ }
+ // Go to shutdown state.
+ if (goToShutdown)
+ super.setState(ExecutorState.Terminated);
+ }
+
+ Runnable residentRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(true);
+ }
+ };
+
+ Runnable tempRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(false);
+ }
+ };
+
+ /**
+ * Spawn new thread to the service of the thread pool
+ * @param stayResident
+ * @return new running thread
+ */
+ public Thread newThread(boolean stayResident)
+ {
+ Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
+
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ t.start();
+ return t;
+ }
+
+ public synchronized ThreadMode getThreadMode(Thread t)
+ {
+ return threads.get(t);
+ }
+
+ public synchronized void setMode(Thread t, ThreadMode tm)
+ {
+ if (t==null || tm==null) throw new IllegalArgumentException();
+ ThreadMode oldMode = threads.get(t);
+ if (oldMode == tm) return;
+ threads.put(t, tm);
+ if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ }
+
+ /**
+ * Is work queue empty
+ * @return true if the work queue is empty
+ */
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ void interrupt(Thread t)
+ {
+ t.interrupt();
+ }
+
+ public synchronized boolean remove(WorkImpl r)
+ {
+ return queue.remove(r);
+ }
+
+ @Override
+ public WorkMonitor asyncExec(Runnable runnable, StateListener