X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2FSyncListenerList.java;h=2b643ee32643b149fbb36a46982d0f3831d6e58d;hb=d33a5a61e2c056a3d48733819257890406ee9d52;hp=17a158e2bf803f96078291aa22dce5d3175ae1bb;hpb=8ada31c956ee02aef38627ba4deaaae3eecb623a;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/SyncListenerList.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/SyncListenerList.java index 17a158e2b..2b643ee32 100644 --- a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/SyncListenerList.java +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/SyncListenerList.java @@ -1,356 +1,356 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -/* - * - * @author Toni Kalajainen - */ -package org.simantics.utils.threads; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.simantics.utils.threads.internal.ListenerList; - -/** - * - * @author Toni Kalajainen - * - * @see ListenerList Simple listener list - * - * @param - */ -public class SyncListenerList { - - /** internal use */ - Map> lists = - new HashMap>(2); - - /** Snapshot version */ - Map snapshot; - - Boolean requiresThreadSwitching = Boolean.FALSE; - - /** - * The class of T - */ - private final Class componentType; - - @SuppressWarnings("unchecked") - public SyncListenerList(Class componentType) - { - this.componentType = (Class) componentType; - } - - /** - * Add listener to the list - * @param thread thread to use to handle the event - * @param listener - */ - public synchronized void add(IThreadWorkQueue thread, T listener) - { - if (listener==null) - throw new IllegalArgumentException("null"); - - snapshot = null; - ListenerList list = lists.get(thread); - if (list==null) { - list = new ListenerList(componentType); - lists.put(thread, list); - } - list.add(listener); - if (thread!=CurrentThread.getThreadAccess()) - requiresThreadSwitching = Boolean.TRUE; - } - - /** - * Contains elements that require thread switching. - * @return true if contains elements with thread access other than current thread - */ - public synchronized Boolean containsContextSwitchingListeners() { - if (requiresThreadSwitching==null) - requiresThreadSwitching = _HasContextSwitching(); - return requiresThreadSwitching; - } - - /** - * - * @return true if contains listeners that require context switching - */ - private boolean _HasContextSwitching() - { - if (lists.size()!=0) return false; - if (lists.size()>1) return false; - Entry> e = lists.entrySet().iterator().next(); - if (e.getKey()!=CurrentThread.getThreadAccess()) return false; - return !e.getValue().isEmpty(); - } - - /** - * Add listener to the list. Listener will be invoked in the thread that - * happens to be running at the time of events. - * - * @param listener - */ - public void add(T listener) - { - if (listener==null) - throw new IllegalArgumentException("null"); - add(CurrentThread.getThreadAccess(), listener); - } - - /** - * Remove listener from the list - * @param thread thread to use to handle the event - * @param listener - */ - public synchronized void remove(IThreadWorkQueue thread, T listener) - { - snapshot = null; - ListenerList list = lists.get(thread); - if (list==null) return; - list.remove(listener); - if (list.isEmpty()) - lists.remove(thread); - if (isEmpty()) { - requiresThreadSwitching = Boolean.FALSE; - return; - } - if (requiresThreadSwitching==null) return; - if (!requiresThreadSwitching) return; - if (thread==CurrentThread.getThreadAccess()) return; - requiresThreadSwitching = null; - } - - public void remove(T listener) - { - remove(CurrentThread.getThreadAccess(), listener); - } - - public synchronized boolean isEmpty() - { - return lists.size()==0; - } - - public synchronized void clear() - { - requiresThreadSwitching = Boolean.FALSE; - lists.clear(); - snapshot = null; - } - - - public synchronized Map getSnapshot() - { - if (snapshot==null) { - snapshot = new HashMap(lists.size()); - for (Entry> e : lists.entrySet()) - { - T[] list = e.getValue().getListeners(); - snapshot.put(e.getKey(), list); - } - } - return snapshot; - } - - public void fireEventAsync(final Method m, final Object ... args) - { - if (m==null) - throw new IllegalArgumentException("null"); - if (isEmpty()) return; - Map snapshot = getSnapshot(); - for (Entry e : snapshot.entrySet()) - { - final IThreadWorkQueue thread = e.getKey(); - final T[] list = e.getValue(); - Runnable r = new Runnable() { - @Override - public void run() { - for (T t : list) - try { - m.invoke(t, args); - } catch (RuntimeException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (InvocationTargetException e) { - e.getCause().printStackTrace(); - } - } - }; - ThreadUtils.asyncExec(thread, r); - } - } -/* - * Version that does not use ThreadUtils - public void fireEventSync(final Method m, final Object ... args) - { - Map snapshot = getSnapshot(); - final Semaphore s = new Semaphore(0); - - int countAsyncThreads = 0; - - // todo use snapshot version of lists - for (Entry e : snapshot.entrySet()) - { - final IThreadAccess thread = e.getKey(); - if (!thread.currentThreadAccess()) { - countAsyncThreads++; - continue; - } - } - // Start async prosessing - for (Entry e : snapshot.entrySet()) - { - final IThreadAccess thread = e.getKey(); - if (thread.currentThreadAccess()) { - countAsyncThreads++; - continue; - } - final T[] list = e.getValue(); - Runnable r = new Runnable() { - @Override - public void run() { - try { - for (T t : list) - { - try { - m.invoke(t, args); - } catch (RuntimeException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (InvocationTargetException e) { - e.printStackTrace(); - } - } - } finally { - // Signal ready - s.release(); - } - } - }; - ThreadUtils.asyncExec(thread, r); - } - // Start local thread processing - for (Entry e : snapshot.entrySet()) - { - final T[] list = e.getValue(); - for (T t : list) - { - try { - m.invoke(t, args); - } catch (RuntimeException e1) { - e1.printStackTrace(); - } catch (IllegalAccessException e2) { - e2.printStackTrace(); - } catch (InvocationTargetException e3) { - e3.printStackTrace(); - } - } - } - - // wait until all threads are ready - try { - s.acquire(countAsyncThreads); - } catch (InterruptedException e) { - } - } - */ - - public void addExecutables(Collection container, final Method m, final Object ... args) - { - Map snapshot = getSnapshot(); - - for (Entry e : snapshot.entrySet()) - { - IThreadWorkQueue thread = e.getKey(); - final T[] list = e.getValue(); - Runnable r = new Runnable() { - @Override - public void run() { - for (T t : list) - { - try { - m.invoke(t, args); - } catch (RuntimeException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (InvocationTargetException e) { - e.getCause().printStackTrace(); - } - } - } - }; - - container.add( new Executable(thread, r) ); - } - } - - public Executable[] getExecutables(final Method m, final Object ... args) - { - Map snapshot = getSnapshot(); - List container = - new ArrayList(snapshot.size()); - addExecutables(container, m, args); - return container.toArray(new Executable[container.size()]); - } - - // Version that uses thread utils - public void fireEventSync(final Method m, final Object ... args) - { - if (m==null) - throw new IllegalArgumentException("null"); - if (isEmpty()) return; - ThreadUtils.multiSyncExec(getExecutables(m, args)); - } - - public static Method getMethod(Class clazz, String name) - { - int count = 0; - Method result = null; - for (Method m : clazz.getMethods()) - { - if (!m.getName().equals(name)) continue; - count++; - result = m; - } - if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName()); - return result; - } - - public synchronized T[] getListenersByThread(IThreadWorkQueue ta) - { - ListenerList l = lists.get(ta); - if (l==null) return null; - return l.getListeners(); - } - - /** - * Is the list executable in current thread. - * - * @return true if executable - */ - public synchronized boolean executableInCurrentThread() { - for (IThreadWorkQueue ta : lists.keySet()) - if (!ta.currentThreadAccess()) - return false; - return true; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +/* + * + * @author Toni Kalajainen + */ +package org.simantics.utils.threads; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.simantics.utils.threads.internal.ListenerList; + +/** + * + * @author Toni Kalajainen + * + * @see ListenerList Simple listener list + * + * @param + */ +public class SyncListenerList { + + /** internal use */ + Map> lists = + new HashMap>(2); + + /** Snapshot version */ + Map snapshot; + + Boolean requiresThreadSwitching = Boolean.FALSE; + + /** + * The class of T + */ + private final Class componentType; + + @SuppressWarnings("unchecked") + public SyncListenerList(Class componentType) + { + this.componentType = (Class) componentType; + } + + /** + * Add listener to the list + * @param thread thread to use to handle the event + * @param listener + */ + public synchronized void add(IThreadWorkQueue thread, T listener) + { + if (listener==null) + throw new IllegalArgumentException("null"); + + snapshot = null; + ListenerList list = lists.get(thread); + if (list==null) { + list = new ListenerList(componentType); + lists.put(thread, list); + } + list.add(listener); + if (thread!=CurrentThread.getThreadAccess()) + requiresThreadSwitching = Boolean.TRUE; + } + + /** + * Contains elements that require thread switching. + * @return true if contains elements with thread access other than current thread + */ + public synchronized Boolean containsContextSwitchingListeners() { + if (requiresThreadSwitching==null) + requiresThreadSwitching = _HasContextSwitching(); + return requiresThreadSwitching; + } + + /** + * + * @return true if contains listeners that require context switching + */ + private boolean _HasContextSwitching() + { + if (lists.size()!=0) return false; + if (lists.size()>1) return false; + Entry> e = lists.entrySet().iterator().next(); + if (e.getKey()!=CurrentThread.getThreadAccess()) return false; + return !e.getValue().isEmpty(); + } + + /** + * Add listener to the list. Listener will be invoked in the thread that + * happens to be running at the time of events. + * + * @param listener + */ + public void add(T listener) + { + if (listener==null) + throw new IllegalArgumentException("null"); + add(CurrentThread.getThreadAccess(), listener); + } + + /** + * Remove listener from the list + * @param thread thread to use to handle the event + * @param listener + */ + public synchronized void remove(IThreadWorkQueue thread, T listener) + { + snapshot = null; + ListenerList list = lists.get(thread); + if (list==null) return; + list.remove(listener); + if (list.isEmpty()) + lists.remove(thread); + if (isEmpty()) { + requiresThreadSwitching = Boolean.FALSE; + return; + } + if (requiresThreadSwitching==null) return; + if (!requiresThreadSwitching) return; + if (thread==CurrentThread.getThreadAccess()) return; + requiresThreadSwitching = null; + } + + public void remove(T listener) + { + remove(CurrentThread.getThreadAccess(), listener); + } + + public synchronized boolean isEmpty() + { + return lists.size()==0; + } + + public synchronized void clear() + { + requiresThreadSwitching = Boolean.FALSE; + lists.clear(); + snapshot = null; + } + + + public synchronized Map getSnapshot() + { + if (snapshot==null) { + snapshot = new HashMap(lists.size()); + for (Entry> e : lists.entrySet()) + { + T[] list = e.getValue().getListeners(); + snapshot.put(e.getKey(), list); + } + } + return snapshot; + } + + public void fireEventAsync(final Method m, final Object ... args) + { + if (m==null) + throw new IllegalArgumentException("null"); + if (isEmpty()) return; + Map snapshot = getSnapshot(); + for (Entry e : snapshot.entrySet()) + { + final IThreadWorkQueue thread = e.getKey(); + final T[] list = e.getValue(); + Runnable r = new Runnable() { + @Override + public void run() { + for (T t : list) + try { + m.invoke(t, args); + } catch (RuntimeException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.getCause().printStackTrace(); + } + } + }; + ThreadUtils.asyncExec(thread, r); + } + } +/* + * Version that does not use ThreadUtils + public void fireEventSync(final Method m, final Object ... args) + { + Map snapshot = getSnapshot(); + final Semaphore s = new Semaphore(0); + + int countAsyncThreads = 0; + + // todo use snapshot version of lists + for (Entry e : snapshot.entrySet()) + { + final IThreadAccess thread = e.getKey(); + if (!thread.currentThreadAccess()) { + countAsyncThreads++; + continue; + } + } + // Start async prosessing + for (Entry e : snapshot.entrySet()) + { + final IThreadAccess thread = e.getKey(); + if (thread.currentThreadAccess()) { + countAsyncThreads++; + continue; + } + final T[] list = e.getValue(); + Runnable r = new Runnable() { + @Override + public void run() { + try { + for (T t : list) + { + try { + m.invoke(t, args); + } catch (RuntimeException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } + } + } finally { + // Signal ready + s.release(); + } + } + }; + ThreadUtils.asyncExec(thread, r); + } + // Start local thread processing + for (Entry e : snapshot.entrySet()) + { + final T[] list = e.getValue(); + for (T t : list) + { + try { + m.invoke(t, args); + } catch (RuntimeException e1) { + e1.printStackTrace(); + } catch (IllegalAccessException e2) { + e2.printStackTrace(); + } catch (InvocationTargetException e3) { + e3.printStackTrace(); + } + } + } + + // wait until all threads are ready + try { + s.acquire(countAsyncThreads); + } catch (InterruptedException e) { + } + } + */ + + public void addExecutables(Collection container, final Method m, final Object ... args) + { + Map snapshot = getSnapshot(); + + for (Entry e : snapshot.entrySet()) + { + IThreadWorkQueue thread = e.getKey(); + final T[] list = e.getValue(); + Runnable r = new Runnable() { + @Override + public void run() { + for (T t : list) + { + try { + m.invoke(t, args); + } catch (RuntimeException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.getCause().printStackTrace(); + } + } + } + }; + + container.add( new Executable(thread, r) ); + } + } + + public Executable[] getExecutables(final Method m, final Object ... args) + { + Map snapshot = getSnapshot(); + List container = + new ArrayList(snapshot.size()); + addExecutables(container, m, args); + return container.toArray(new Executable[container.size()]); + } + + // Version that uses thread utils + public void fireEventSync(final Method m, final Object ... args) + { + if (m==null) + throw new IllegalArgumentException("null"); + if (isEmpty()) return; + ThreadUtils.multiSyncExec(getExecutables(m, args)); + } + + public static Method getMethod(Class clazz, String name) + { + int count = 0; + Method result = null; + for (Method m : clazz.getMethods()) + { + if (!m.getName().equals(name)) continue; + count++; + result = m; + } + if (count!=1) throw new Error("Unexpected method \""+name+"\" count in class "+clazz.getName()); + return result; + } + + public synchronized T[] getListenersByThread(IThreadWorkQueue ta) + { + ListenerList l = lists.get(ta); + if (l==null) return null; + return l.getListeners(); + } + + /** + * Is the list executable in current thread. + * + * @return true if executable + */ + public synchronized boolean executableInCurrentThread() { + for (IThreadWorkQueue ta : lists.keySet()) + if (!ta.currentThreadAccess()) + return false; + return true; + } + +}