]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/SyncListenerList.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / SyncListenerList.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 /*
13  *
14  * @author Toni Kalajainen
15  */
16 package org.simantics.utils.threads;
17
18 import java.lang.reflect.InvocationTargetException;
19 import java.lang.reflect.Method;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26
27 import org.simantics.utils.threads.internal.ListenerList;
28
29 /**
30  * 
31  * @author Toni Kalajainen
32  *
33  * @see ListenerList Simple listener list
34  *
35  * @param <T>
36  */
37 public class SyncListenerList<T> {
38
39         /** internal use */
40         Map<IThreadWorkQueue, ListenerList<T>> lists =
41                 new HashMap<IThreadWorkQueue, ListenerList<T>>(2);
42                 
43         /** Snapshot version */
44         Map<IThreadWorkQueue, T[]> snapshot;
45         
46         Boolean requiresThreadSwitching = Boolean.FALSE;
47                 
48     /** 
49      * The class of T
50      */
51     private final Class<T> componentType;       
52         
53         @SuppressWarnings("unchecked")
54         public SyncListenerList(Class<?> componentType)
55         {
56         this.componentType = (Class<T>) componentType;          
57         }
58         
59         /**
60          * Add listener to the list
61          * @param thread thread to use to handle the event
62          * @param listener
63          */
64     public synchronized void add(IThreadWorkQueue thread, T listener)
65     {
66         if (listener==null)
67                 throw new IllegalArgumentException("null");
68         
69         snapshot = null;
70         ListenerList<T> list = lists.get(thread);
71         if (list==null) {
72                 list = new ListenerList<T>(componentType);
73                 lists.put(thread, list);
74         }
75         list.add(listener);
76         if (thread!=CurrentThread.getThreadAccess())
77                 requiresThreadSwitching = Boolean.TRUE;
78     }
79     
80     /**
81      * Contains elements that require thread switching.
82      * @return true if contains elements with thread access other than current thread
83      */
84     public synchronized Boolean containsContextSwitchingListeners() {
85         if (requiresThreadSwitching==null)
86                 requiresThreadSwitching = _HasContextSwitching();
87         return requiresThreadSwitching;
88     }
89         
90     /**
91      * 
92      * @return true if contains listeners that require context switching
93      */
94     private boolean _HasContextSwitching()
95     {
96         if (lists.size()!=0) return false;
97         if (lists.size()>1) return false;
98         Entry<IThreadWorkQueue, ListenerList<T>> e = lists.entrySet().iterator().next();
99         if (e.getKey()!=CurrentThread.getThreadAccess()) return false;
100         return !e.getValue().isEmpty();
101     }
102
103     /**
104      * Add listener to the list. Listener will be invoked in the thread that
105      * happens to be running at the time of events.
106      * 
107      * @param listener
108      */
109     public void add(T listener)
110     {
111         if (listener==null)
112                 throw new IllegalArgumentException("null");
113         add(CurrentThread.getThreadAccess(), listener);
114     }
115
116         /**
117          * Remove listener from the list
118          * @param thread thread to use to handle the event
119          * @param listener
120          */
121     public synchronized void remove(IThreadWorkQueue thread, T listener)
122     {
123         snapshot = null;
124         ListenerList<T> list = lists.get(thread);
125         if (list==null) return;
126         list.remove(listener);
127         if (list.isEmpty())
128                 lists.remove(thread);
129         if (isEmpty()) {
130                 requiresThreadSwitching = Boolean.FALSE;
131                 return;
132         }
133         if (requiresThreadSwitching==null) return;      
134         if (!requiresThreadSwitching) return;
135         if (thread==CurrentThread.getThreadAccess()) return;
136         requiresThreadSwitching = null;
137     }
138
139     public void remove(T listener)
140     {
141         remove(CurrentThread.getThreadAccess(), listener);
142     }
143     
144     public synchronized boolean isEmpty()
145     {
146         return lists.size()==0;
147     }
148     
149     public synchronized void clear()
150     {
151         requiresThreadSwitching = Boolean.FALSE;
152         lists.clear();
153         snapshot = null;
154     }
155         
156     
157     public synchronized Map<IThreadWorkQueue, T[]> getSnapshot()
158     {
159         if (snapshot==null) {
160                 snapshot = new HashMap<IThreadWorkQueue, T[]>(lists.size());
161                 for (Entry<IThreadWorkQueue, ListenerList<T>> e : lists.entrySet())
162                 {
163                         T[] list = e.getValue().getListeners();
164                         snapshot.put(e.getKey(), list);
165                 }
166         }
167         return snapshot;
168     }
169
170     public void fireEventAsync(final Method m, final Object ... args)
171     {
172         if (m==null)
173                 throw new IllegalArgumentException("null");
174         if (isEmpty()) return;          
175         Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();            
176         for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())
177         {
178                 final IThreadWorkQueue thread = e.getKey();
179                 final T[] list = e.getValue();                  
180                         Runnable r = new Runnable() {
181                                 @Override
182                                 public void run() {
183                                         for (T t : list)
184                                                 try {
185                                                         m.invoke(t, args);
186                                                 } catch (RuntimeException e) {
187                                                         e.printStackTrace();
188                                                 } catch (IllegalAccessException e) {
189                                                         e.printStackTrace();
190                                                 } catch (InvocationTargetException e) {
191                                                         e.getCause().printStackTrace();
192                                                 }
193                                 }
194                         };
195                         ThreadUtils.asyncExec(thread, r);                       
196         }
197     }
198 /*        
199  * Version that does not use ThreadUtils
200     public void fireEventSync(final Method m, final Object ... args)
201     {
202         Map<IThreadAccess, T[]> snapshot = getSnapshot();       
203         final Semaphore s = new Semaphore(0);
204         
205         int countAsyncThreads = 0;
206         
207         // todo use snapshot version of lists
208         for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
209         {
210                 final IThreadAccess thread = e.getKey();
211                 if (!thread.currentThreadAccess()) {
212                         countAsyncThreads++;
213                         continue;
214                 }
215         }
216         // Start async prosessing
217         for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
218         {
219                 final IThreadAccess thread = e.getKey();
220                 if (thread.currentThreadAccess()) {
221                         countAsyncThreads++;
222                         continue;
223                 }
224                 final T[] list = e.getValue();                  
225                         Runnable r = new Runnable() {
226                                 @Override
227                                 public void run() {
228                                         try {
229                                                 for (T t : list)
230                                                 {
231                                                         try {
232                                                                 m.invoke(t, args);
233                                                         } catch (RuntimeException e) {
234                                                                 e.printStackTrace();
235                                                         } catch (IllegalAccessException e) {
236                                                                 e.printStackTrace();
237                                                         } catch (InvocationTargetException e) {
238                                                                 e.printStackTrace();
239                                                         }
240                                                 }
241                                         } finally {
242                                                 // Signal ready
243                                                 s.release();
244                                         }
245                                 }
246                         };
247                         ThreadUtils.asyncExec(thread, r);
248         }
249         // Start local thread processing
250         for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())
251         {
252                 final T[] list = e.getValue();                  
253                         for (T t : list)
254                         {
255                                 try {
256                                         m.invoke(t, args);
257                                 } catch (RuntimeException e1) {
258                                         e1.printStackTrace();
259                                 } catch (IllegalAccessException e2) {
260                                         e2.printStackTrace();
261                                 } catch (InvocationTargetException e3) {
262                                         e3.printStackTrace();
263                                 }
264                         }
265         }       
266         
267         // wait until all threads are ready
268         try {
269                         s.acquire(countAsyncThreads);
270                 } catch (InterruptedException e) {
271                 }
272     }
273         */
274     
275     public void addExecutables(Collection<Executable> container, final Method m, final Object ... args)
276     {
277         Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();            
278         
279         for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())
280         {
281                 IThreadWorkQueue thread = e.getKey();
282                 final T[] list = e.getValue();
283                         Runnable r = new Runnable() {
284                                 @Override
285                                 public void run() {
286                                         for (T t : list)
287                                         {
288                                                 try {
289                                                         m.invoke(t, args);
290                                                 } catch (RuntimeException e) {
291                                                         e.printStackTrace();
292                                                 } catch (IllegalAccessException e) {
293                                                         e.printStackTrace();
294                                                 } catch (InvocationTargetException e) {
295                                                         e.getCause().printStackTrace();
296                                                 }
297                                         }
298                                 }
299                         };
300                         
301                 container.add( new Executable(thread, r) );
302         }
303     }
304     
305     public Executable[] getExecutables(final Method m, final Object ... args)
306     {
307         Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();
308         List<Executable> container =
309                 new ArrayList<Executable>(snapshot.size());
310         addExecutables(container, m, args);
311         return container.toArray(new Executable[container.size()]);
312     }
313     
314     // Version that uses thread utils
315     public void fireEventSync(final Method m, final Object ... args)
316     {
317         if (m==null)
318                 throw new IllegalArgumentException("null");
319         if (isEmpty()) return;
320         ThreadUtils.multiSyncExec(getExecutables(m, args));
321     }
322     
323     public static Method getMethod(Class<?> clazz, String name)
324     {
325         int count = 0;
326         Method result = null;
327         for (Method m : clazz.getMethods())
328         {
329                 if (!m.getName().equals(name)) continue;
330                 count++;
331                 result = m;
332         }
333         if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName());
334         return result;
335     }
336     
337     public synchronized T[] getListenersByThread(IThreadWorkQueue ta) 
338     {
339         ListenerList<T> l = lists.get(ta);
340         if (l==null) return null;
341         return l.getListeners();
342     }
343     
344     /**
345      * Is the list executable in current thread. 
346      *  
347      * @return <code>true</code> if executable
348      */
349     public synchronized boolean executableInCurrentThread() {
350         for (IThreadWorkQueue ta : lists.keySet())
351                 if (!ta.currentThreadAccess())
352                         return false;
353         return true;
354     }
355     
356 }