--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+/*\r
+ *\r
+ * @author Toni Kalajainen\r
+ */\r
+package org.simantics.utils.threads;\r
+\r
+import java.lang.reflect.InvocationTargetException;\r
+import java.lang.reflect.Method;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.HashMap;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Map.Entry;\r
+\r
+import org.simantics.utils.threads.internal.ListenerList;\r
+\r
+/**\r
+ * \r
+ * @author Toni Kalajainen\r
+ *\r
+ * @see ListenerList Simple listener list\r
+ *\r
+ * @param <T>\r
+ */\r
+public class SyncListenerList<T> {\r
+\r
+ /** internal use */\r
+ Map<IThreadWorkQueue, ListenerList<T>> lists =\r
+ new HashMap<IThreadWorkQueue, ListenerList<T>>(2);\r
+ \r
+ /** Snapshot version */\r
+ Map<IThreadWorkQueue, T[]> snapshot;\r
+ \r
+ Boolean requiresThreadSwitching = Boolean.FALSE;\r
+ \r
+ /** \r
+ * The class of T\r
+ */\r
+ private final Class<T> componentType; \r
+ \r
+ @SuppressWarnings("unchecked")\r
+ public SyncListenerList(Class<?> componentType)\r
+ {\r
+ this.componentType = (Class<T>) componentType; \r
+ }\r
+ \r
+ /**\r
+ * Add listener to the list\r
+ * @param thread thread to use to handle the event\r
+ * @param listener\r
+ */\r
+ public synchronized void add(IThreadWorkQueue thread, T listener)\r
+ {\r
+ if (listener==null)\r
+ throw new IllegalArgumentException("null");\r
+ \r
+ snapshot = null;\r
+ ListenerList<T> list = lists.get(thread);\r
+ if (list==null) {\r
+ list = new ListenerList<T>(componentType);\r
+ lists.put(thread, list);\r
+ }\r
+ list.add(listener);\r
+ if (thread!=CurrentThread.getThreadAccess())\r
+ requiresThreadSwitching = Boolean.TRUE;\r
+ }\r
+ \r
+ /**\r
+ * Contains elements that require thread switching.\r
+ * @return true if contains elements with thread access other than current thread\r
+ */\r
+ public synchronized Boolean containsContextSwitchingListeners() {\r
+ if (requiresThreadSwitching==null)\r
+ requiresThreadSwitching = _HasContextSwitching();\r
+ return requiresThreadSwitching;\r
+ }\r
+ \r
+ /**\r
+ * \r
+ * @return true if contains listeners that require context switching\r
+ */\r
+ private boolean _HasContextSwitching()\r
+ {\r
+ if (lists.size()!=0) return false;\r
+ if (lists.size()>1) return false;\r
+ Entry<IThreadWorkQueue, ListenerList<T>> e = lists.entrySet().iterator().next();\r
+ if (e.getKey()!=CurrentThread.getThreadAccess()) return false;\r
+ return !e.getValue().isEmpty();\r
+ }\r
+\r
+ /**\r
+ * Add listener to the list. Listener will be invoked in the thread that\r
+ * happens to be running at the time of events.\r
+ * \r
+ * @param listener\r
+ */\r
+ public void add(T listener)\r
+ {\r
+ if (listener==null)\r
+ throw new IllegalArgumentException("null");\r
+ add(CurrentThread.getThreadAccess(), listener);\r
+ }\r
+\r
+ /**\r
+ * Remove listener from the list\r
+ * @param thread thread to use to handle the event\r
+ * @param listener\r
+ */\r
+ public synchronized void remove(IThreadWorkQueue thread, T listener)\r
+ {\r
+ snapshot = null;\r
+ ListenerList<T> list = lists.get(thread);\r
+ if (list==null) return;\r
+ list.remove(listener);\r
+ if (list.isEmpty())\r
+ lists.remove(thread);\r
+ if (isEmpty()) {\r
+ requiresThreadSwitching = Boolean.FALSE;\r
+ return;\r
+ }\r
+ if (requiresThreadSwitching==null) return; \r
+ if (!requiresThreadSwitching) return;\r
+ if (thread==CurrentThread.getThreadAccess()) return;\r
+ requiresThreadSwitching = null;\r
+ }\r
+\r
+ public void remove(T listener)\r
+ {\r
+ remove(CurrentThread.getThreadAccess(), listener);\r
+ }\r
+ \r
+ public synchronized boolean isEmpty()\r
+ {\r
+ return lists.size()==0;\r
+ }\r
+ \r
+ public synchronized void clear()\r
+ {\r
+ requiresThreadSwitching = Boolean.FALSE;\r
+ lists.clear();\r
+ snapshot = null;\r
+ }\r
+ \r
+ \r
+ public synchronized Map<IThreadWorkQueue, T[]> getSnapshot()\r
+ {\r
+ if (snapshot==null) {\r
+ snapshot = new HashMap<IThreadWorkQueue, T[]>(lists.size());\r
+ for (Entry<IThreadWorkQueue, ListenerList<T>> e : lists.entrySet())\r
+ {\r
+ T[] list = e.getValue().getListeners();\r
+ snapshot.put(e.getKey(), list);\r
+ }\r
+ }\r
+ return snapshot;\r
+ }\r
+\r
+ public void fireEventAsync(final Method m, final Object ... args)\r
+ {\r
+ if (m==null)\r
+ throw new IllegalArgumentException("null");\r
+ if (isEmpty()) return; \r
+ Map<IThreadWorkQueue, T[]> snapshot = getSnapshot(); \r
+ for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())\r
+ {\r
+ final IThreadWorkQueue thread = e.getKey();\r
+ final T[] list = e.getValue(); \r
+ Runnable r = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ for (T t : list)\r
+ try {\r
+ m.invoke(t, args);\r
+ } catch (RuntimeException e) {\r
+ e.printStackTrace();\r
+ } catch (IllegalAccessException e) {\r
+ e.printStackTrace();\r
+ } catch (InvocationTargetException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ };\r
+ ThreadUtils.asyncExec(thread, r); \r
+ }\r
+ }\r
+/* \r
+ * Version that does not use ThreadUtils\r
+ public void fireEventSync(final Method m, final Object ... args)\r
+ {\r
+ Map<IThreadAccess, T[]> snapshot = getSnapshot(); \r
+ final Semaphore s = new Semaphore(0);\r
+ \r
+ int countAsyncThreads = 0;\r
+ \r
+ // todo use snapshot version of lists\r
+ for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())\r
+ {\r
+ final IThreadAccess thread = e.getKey();\r
+ if (!thread.currentThreadAccess()) {\r
+ countAsyncThreads++;\r
+ continue;\r
+ }\r
+ }\r
+ // Start async prosessing\r
+ for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())\r
+ {\r
+ final IThreadAccess thread = e.getKey();\r
+ if (thread.currentThreadAccess()) {\r
+ countAsyncThreads++;\r
+ continue;\r
+ }\r
+ final T[] list = e.getValue(); \r
+ Runnable r = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ try {\r
+ for (T t : list)\r
+ {\r
+ try {\r
+ m.invoke(t, args);\r
+ } catch (RuntimeException e) {\r
+ e.printStackTrace();\r
+ } catch (IllegalAccessException e) {\r
+ e.printStackTrace();\r
+ } catch (InvocationTargetException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ } finally {\r
+ // Signal ready\r
+ s.release();\r
+ }\r
+ }\r
+ };\r
+ ThreadUtils.asyncExec(thread, r);\r
+ }\r
+ // Start local thread processing\r
+ for (Entry<IThreadAccess, T[]> e : snapshot.entrySet())\r
+ {\r
+ final T[] list = e.getValue(); \r
+ for (T t : list)\r
+ {\r
+ try {\r
+ m.invoke(t, args);\r
+ } catch (RuntimeException e1) {\r
+ e1.printStackTrace();\r
+ } catch (IllegalAccessException e2) {\r
+ e2.printStackTrace();\r
+ } catch (InvocationTargetException e3) {\r
+ e3.printStackTrace();\r
+ }\r
+ }\r
+ } \r
+ \r
+ // wait until all threads are ready\r
+ try {\r
+ s.acquire(countAsyncThreads);\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+ */\r
+ \r
+ public void addExecutables(Collection<Executable> container, final Method m, final Object ... args)\r
+ {\r
+ Map<IThreadWorkQueue, T[]> snapshot = getSnapshot(); \r
+ \r
+ for (Entry<IThreadWorkQueue, T[]> e : snapshot.entrySet())\r
+ {\r
+ IThreadWorkQueue thread = e.getKey();\r
+ final T[] list = e.getValue();\r
+ Runnable r = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ for (T t : list)\r
+ {\r
+ try {\r
+ m.invoke(t, args);\r
+ } catch (RuntimeException e) {\r
+ e.printStackTrace();\r
+ } catch (IllegalAccessException e) {\r
+ e.printStackTrace();\r
+ } catch (InvocationTargetException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ };\r
+ \r
+ container.add( new Executable(thread, r) );\r
+ }\r
+ }\r
+ \r
+ public Executable[] getExecutables(final Method m, final Object ... args)\r
+ {\r
+ Map<IThreadWorkQueue, T[]> snapshot = getSnapshot();\r
+ List<Executable> container =\r
+ new ArrayList<Executable>(snapshot.size());\r
+ addExecutables(container, m, args);\r
+ return container.toArray(new Executable[container.size()]);\r
+ }\r
+ \r
+ // Version that uses thread utils\r
+ public void fireEventSync(final Method m, final Object ... args)\r
+ {\r
+ if (m==null)\r
+ throw new IllegalArgumentException("null");\r
+ if (isEmpty()) return;\r
+ ThreadUtils.multiSyncExec(getExecutables(m, args));\r
+ }\r
+ \r
+ public static Method getMethod(Class<?> clazz, String name)\r
+ {\r
+ int count = 0;\r
+ Method result = null;\r
+ for (Method m : clazz.getMethods())\r
+ {\r
+ if (!m.getName().equals(name)) continue;\r
+ count++;\r
+ result = m;\r
+ }\r
+ if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName());\r
+ return result;\r
+ }\r
+ \r
+ public synchronized T[] getListenersByThread(IThreadWorkQueue ta) \r
+ {\r
+ ListenerList<T> l = lists.get(ta);\r
+ if (l==null) return null;\r
+ return l.getListeners();\r
+ }\r
+ \r
+ /**\r
+ * Is the list executable in current thread. \r
+ * \r
+ * @return <code>true</code> if executable\r
+ */\r
+ public synchronized boolean executableInCurrentThread() {\r
+ for (IThreadWorkQueue ta : lists.keySet())\r
+ if (!ta.currentThreadAccess())\r
+ return false;\r
+ return true;\r
+ }\r
+ \r
+}\r