1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
14 * @author Toni Kalajainen
\r
16 package org.simantics.utils.threads;
\r
18 import java.lang.reflect.InvocationTargetException;
\r
19 import java.lang.reflect.Method;
\r
20 import java.util.ArrayList;
\r
21 import java.util.Collection;
\r
22 import java.util.HashMap;
\r
23 import java.util.List;
\r
24 import java.util.Map;
\r
25 import java.util.Map.Entry;
\r
27 import org.simantics.utils.threads.internal.ListenerList;
\r
31 * @author Toni Kalajainen
\r
33 * @see ListenerList Simple listener list
\r
37 public class SyncListenerList<T> {
\r
40 Map<IThreadWorkQueue, ListenerList<T>> lists =
\r
41 new HashMap<IThreadWorkQueue, ListenerList<T>>(2);
\r
43 /** Snapshot version */
\r
44 Map<IThreadWorkQueue, T[]> snapshot;
\r
46 Boolean requiresThreadSwitching = Boolean.FALSE;
\r
51 private final Class<T> componentType;
\r
53 @SuppressWarnings("unchecked")
\r
54 public SyncListenerList(Class<?> componentType)
\r
56 this.componentType = (Class<T>) componentType;
\r
60 * Add listener to the list
\r
61 * @param thread thread to use to handle the event
\r
64 public synchronized void add(IThreadWorkQueue thread, T listener)
\r
67 throw new IllegalArgumentException("null");
\r
70 ListenerList<T> list = lists.get(thread);
\r
72 list = new ListenerList<T>(componentType);
\r
73 lists.put(thread, list);
\r
76 if (thread!=CurrentThread.getThreadAccess())
\r
77 requiresThreadSwitching = Boolean.TRUE;
\r
81 * Contains elements that require thread switching.
\r
82 * @return true if contains elements with thread access other than current thread
\r
84 public synchronized Boolean containsContextSwitchingListeners() {
\r
85 if (requiresThreadSwitching==null)
\r
86 requiresThreadSwitching = _HasContextSwitching();
\r
87 return requiresThreadSwitching;
\r
92 * @return true if contains listeners that require context switching
\r
94 private boolean _HasContextSwitching()
\r
96 if (lists.size()!=0) return false;
\r
97 if (lists.size()>1) return false;
\r
98 Entry<IThreadWorkQueue, ListenerList<T>> e = lists.entrySet().iterator().next();
\r
99 if (e.getKey()!=CurrentThread.getThreadAccess()) return false;
\r
100 return !e.getValue().isEmpty();
\r
104 * Add listener to the list. Listener will be invoked in the thread that
\r
105 * happens to be running at the time of events.
\r
109 public void add(T listener)
\r
111 if (listener==null)
\r
112 throw new IllegalArgumentException("null");
\r
113 add(CurrentThread.getThreadAccess(), listener);
\r
117 * Remove listener from the list
\r
118 * @param thread thread to use to handle the event
\r
121 public synchronized void remove(IThreadWorkQueue thread, T listener)
\r
124 ListenerList<T> list = lists.get(thread);
\r
125 if (list==null) return;
\r
126 list.remove(listener);
\r
127 if (list.isEmpty())
\r
128 lists.remove(thread);
\r
130 requiresThreadSwitching = Boolean.FALSE;
\r
133 if (requiresThreadSwitching==null) return;
\r
134 if (!requiresThreadSwitching) return;
\r
135 if (thread==CurrentThread.getThreadAccess()) return;
\r
136 requiresThreadSwitching = null;
\r
139 public void remove(T listener)
\r
141 remove(CurrentThread.getThreadAccess(), listener);
\r
144 public synchronized boolean isEmpty()
\r
146 return lists.size()==0;
\r
149 public synchronized void clear()
\r
151 requiresThreadSwitching = Boolean.FALSE;
\r
157 public synchronized Map<IThreadWorkQueue, T[]> getSnapshot()
\r
159 if (snapshot==null) {
\r
160 snapshot = new HashMap<IThreadWorkQueue, T[]>(lists.size());
\r
161 for (Entry<IThreadWorkQueue, ListenerList<T>> e : lists.entrySet())
\r
163 T[] list = e.getValue().getListeners();
\r
164 snapshot.put(e.getKey(), list);
\r
170 public void fireEventAsync(final Method m, final Object ... args)
\r
173 throw new IllegalArgumentException("null");
\r
174 if (isEmpty()) return;
\r
175 Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();
\r
176 for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())
\r
178 final IThreadWorkQueue thread = e.getKey();
\r
179 final T[] list = e.getValue();
\r
180 Runnable r = new Runnable() {
\r
182 public void run() {
\r
186 } catch (RuntimeException e) {
\r
187 e.printStackTrace();
\r
188 } catch (IllegalAccessException e) {
\r
189 e.printStackTrace();
\r
190 } catch (InvocationTargetException e) {
\r
191 e.getCause().printStackTrace();
\r
195 ThreadUtils.asyncExec(thread, r);
\r
199 * Version that does not use ThreadUtils
\r
200 public void fireEventSync(final Method m, final Object ... args)
\r
202 Map<IThreadAccess, T[]> snapshot = getSnapshot();
\r
203 final Semaphore s = new Semaphore(0);
\r
205 int countAsyncThreads = 0;
\r
207 // todo use snapshot version of lists
\r
208 for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
\r
210 final IThreadAccess thread = e.getKey();
\r
211 if (!thread.currentThreadAccess()) {
\r
212 countAsyncThreads++;
\r
216 // Start async prosessing
\r
217 for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
\r
219 final IThreadAccess thread = e.getKey();
\r
220 if (thread.currentThreadAccess()) {
\r
221 countAsyncThreads++;
\r
224 final T[] list = e.getValue();
\r
225 Runnable r = new Runnable() {
\r
227 public void run() {
\r
233 } catch (RuntimeException e) {
\r
234 e.printStackTrace();
\r
235 } catch (IllegalAccessException e) {
\r
236 e.printStackTrace();
\r
237 } catch (InvocationTargetException e) {
\r
238 e.printStackTrace();
\r
247 ThreadUtils.asyncExec(thread, r);
\r
249 // Start local thread processing
\r
250 for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
\r
252 final T[] list = e.getValue();
\r
257 } catch (RuntimeException e1) {
\r
258 e1.printStackTrace();
\r
259 } catch (IllegalAccessException e2) {
\r
260 e2.printStackTrace();
\r
261 } catch (InvocationTargetException e3) {
\r
262 e3.printStackTrace();
\r
267 // wait until all threads are ready
\r
269 s.acquire(countAsyncThreads);
\r
270 } catch (InterruptedException e) {
\r
275 public void addExecutables(Collection<Executable> container, final Method m, final Object ... args)
\r
277 Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();
\r
279 for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())
\r
281 IThreadWorkQueue thread = e.getKey();
\r
282 final T[] list = e.getValue();
\r
283 Runnable r = new Runnable() {
\r
285 public void run() {
\r
290 } catch (RuntimeException e) {
\r
291 e.printStackTrace();
\r
292 } catch (IllegalAccessException e) {
\r
293 e.printStackTrace();
\r
294 } catch (InvocationTargetException e) {
\r
295 e.getCause().printStackTrace();
\r
301 container.add( new Executable(thread, r) );
\r
305 public Executable[] getExecutables(final Method m, final Object ... args)
\r
307 Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();
\r
308 List<Executable> container =
\r
309 new ArrayList<Executable>(snapshot.size());
\r
310 addExecutables(container, m, args);
\r
311 return container.toArray(new Executable[container.size()]);
\r
314 // Version that uses thread utils
\r
315 public void fireEventSync(final Method m, final Object ... args)
\r
318 throw new IllegalArgumentException("null");
\r
319 if (isEmpty()) return;
\r
320 ThreadUtils.multiSyncExec(getExecutables(m, args));
\r
323 public static Method getMethod(Class<?> clazz, String name)
\r
326 Method result = null;
\r
327 for (Method m : clazz.getMethods())
\r
329 if (!m.getName().equals(name)) continue;
\r
333 if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName());
\r
337 public synchronized T[] getListenersByThread(IThreadWorkQueue ta)
\r
339 ListenerList<T> l = lists.get(ta);
\r
340 if (l==null) return null;
\r
341 return l.getListeners();
\r
345 * Is the list executable in current thread.
\r
347 * @return <code>true</code> if executable
\r
349 public synchronized boolean executableInCurrentThread() {
\r
350 for (IThreadWorkQueue ta : lists.keySet())
\r
351 if (!ta.currentThreadAccess())
\r