/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ /* * * @author Toni Kalajainen */ package org.simantics.utils.threads; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.simantics.utils.threads.internal.ListenerList; /** * * @author Toni Kalajainen * * @see ListenerList Simple listener list * * @param */ public class SyncListenerList { /** internal use */ Map> lists = new HashMap>(2); /** Snapshot version */ Map snapshot; Boolean requiresThreadSwitching = Boolean.FALSE; /** * The class of T */ private final Class componentType; @SuppressWarnings("unchecked") public SyncListenerList(Class componentType) { this.componentType = (Class) componentType; } /** * Add listener to the list * @param thread thread to use to handle the event * @param listener */ public synchronized void add(IThreadWorkQueue thread, T listener) { if (listener==null) throw new IllegalArgumentException("null"); snapshot = null; ListenerList list = lists.get(thread); if (list==null) { list = new ListenerList(componentType); lists.put(thread, list); } list.add(listener); if (thread!=CurrentThread.getThreadAccess()) requiresThreadSwitching = Boolean.TRUE; } /** * Contains elements that require thread switching. * @return true if contains elements with thread access other than current thread */ public synchronized Boolean containsContextSwitchingListeners() { if (requiresThreadSwitching==null) requiresThreadSwitching = _HasContextSwitching(); return requiresThreadSwitching; } /** * * @return true if contains listeners that require context switching */ private boolean _HasContextSwitching() { if (lists.size()!=0) return false; if (lists.size()>1) return false; Entry> e = lists.entrySet().iterator().next(); if (e.getKey()!=CurrentThread.getThreadAccess()) return false; return !e.getValue().isEmpty(); } /** * Add listener to the list. Listener will be invoked in the thread that * happens to be running at the time of events. * * @param listener */ public void add(T listener) { if (listener==null) throw new IllegalArgumentException("null"); add(CurrentThread.getThreadAccess(), listener); } /** * Remove listener from the list * @param thread thread to use to handle the event * @param listener */ public synchronized void remove(IThreadWorkQueue thread, T listener) { snapshot = null; ListenerList list = lists.get(thread); if (list==null) return; list.remove(listener); if (list.isEmpty()) lists.remove(thread); if (isEmpty()) { requiresThreadSwitching = Boolean.FALSE; return; } if (requiresThreadSwitching==null) return; if (!requiresThreadSwitching) return; if (thread==CurrentThread.getThreadAccess()) return; requiresThreadSwitching = null; } public void remove(T listener) { remove(CurrentThread.getThreadAccess(), listener); } public synchronized boolean isEmpty() { return lists.size()==0; } public synchronized void clear() { requiresThreadSwitching = Boolean.FALSE; lists.clear(); snapshot = null; } public synchronized Map getSnapshot() { if (snapshot==null) { snapshot = new HashMap(lists.size()); for (Entry> e : lists.entrySet()) { T[] list = e.getValue().getListeners(); snapshot.put(e.getKey(), list); } } return snapshot; } public void fireEventAsync(final Method m, final Object ... args) { if (m==null) throw new IllegalArgumentException("null"); if (isEmpty()) return; Map snapshot = getSnapshot(); for (Entry e : snapshot.entrySet()) { final IThreadWorkQueue thread = e.getKey(); final T[] list = e.getValue(); Runnable r = new Runnable() { @Override public void run() { for (T t : list) try { m.invoke(t, args); } catch (RuntimeException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.getCause().printStackTrace(); } } }; ThreadUtils.asyncExec(thread, r); } } /* * Version that does not use ThreadUtils public void fireEventSync(final Method m, final Object ... args) { Map snapshot = getSnapshot(); final Semaphore s = new Semaphore(0); int countAsyncThreads = 0; // todo use snapshot version of lists for (Entry e : snapshot.entrySet()) { final IThreadAccess thread = e.getKey(); if (!thread.currentThreadAccess()) { countAsyncThreads++; continue; } } // Start async prosessing for (Entry e : snapshot.entrySet()) { final IThreadAccess thread = e.getKey(); if (thread.currentThreadAccess()) { countAsyncThreads++; continue; } final T[] list = e.getValue(); Runnable r = new Runnable() { @Override public void run() { try { for (T t : list) { try { m.invoke(t, args); } catch (RuntimeException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } finally { // Signal ready s.release(); } } }; ThreadUtils.asyncExec(thread, r); } // Start local thread processing for (Entry e : snapshot.entrySet()) { final T[] list = e.getValue(); for (T t : list) { try { m.invoke(t, args); } catch (RuntimeException e1) { e1.printStackTrace(); } catch (IllegalAccessException e2) { e2.printStackTrace(); } catch (InvocationTargetException e3) { e3.printStackTrace(); } } } // wait until all threads are ready try { s.acquire(countAsyncThreads); } catch (InterruptedException e) { } } */ public void addExecutables(Collection container, final Method m, final Object ... args) { Map snapshot = getSnapshot(); for (Entry e : snapshot.entrySet()) { IThreadWorkQueue thread = e.getKey(); final T[] list = e.getValue(); Runnable r = new Runnable() { @Override public void run() { for (T t : list) { try { m.invoke(t, args); } catch (RuntimeException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.getCause().printStackTrace(); } } } }; container.add( new Executable(thread, r) ); } } public Executable[] getExecutables(final Method m, final Object ... args) { Map snapshot = getSnapshot(); List container = new ArrayList(snapshot.size()); addExecutables(container, m, args); return container.toArray(new Executable[container.size()]); } // Version that uses thread utils public void fireEventSync(final Method m, final Object ... args) { if (m==null) throw new IllegalArgumentException("null"); if (isEmpty()) return; ThreadUtils.multiSyncExec(getExecutables(m, args)); } public static Method getMethod(Class clazz, String name) { int count = 0; Method result = null; for (Method m : clazz.getMethods()) { if (!m.getName().equals(name)) continue; count++; result = m; } if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName()); return result; } public synchronized T[] getListenersByThread(IThreadWorkQueue ta) { ListenerList l = lists.get(ta); if (l==null) return null; return l.getListeners(); } /** * Is the list executable in current thread. * * @return true if executable */ public synchronized boolean executableInCurrentThread() { for (IThreadWorkQueue ta : lists.keySet()) if (!ta.currentThreadAccess()) return false; return true; } }