Spring中事件广播原理

SpringFramework 的事件模型

  • ApplicationEventPublisher:事件发布器,是用来接受事件,并交给事件广播器处理
  • ApplicationEventMulticaster:事件广播器,拿到事件发布器的事件,并广播给监听器

ApplicationContext 接口继承了 ApplicationEventPublisher ,拥有事件发布的功能;ApplicationContext 的第一个抽象实现类 AbstractApplicationContext 组合了一个 ApplicationEventMulticaster ,拥有事件广播的能力。综合来看,ApplicationContext 的落地实现就已经能够完成事件驱动模型中的 “观察者” 身份了

publishEvent()

该方法的实现在 AbstractApplicationContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");

// Decorate event as an ApplicationEvent if necessary
// 这里要给普通的对象封装为PayloadApplicationEvent
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}

// Multicast right now if possible - or lazily once the multicaster is initialized
// 添加事件广播(earlyApplicationEvents太过于复杂,会考虑后续加餐内容解释)
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

// Publish event via parent context as well...
// 通知父容器发布事件
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

广播事件的行为可以分为三部分:

  1. 适配 payload 类型的 ApplicationEvent,即如果入参不是 Event 的子类则包装成 PayloadApplicationEvent
  2. 在本容器中广播事件
  3. 通知父容器发布事件(正好知道了为什么子容器的事件会广播到父容器)

multicastEvent()

该方法位于 ApplicationEventMulticaster 的唯一落地实现类 SimpleApplicationEventMulticaster 中:

1
2
3
4
5
6
7
8
9
10
11
12
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
  • ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)):包装 ApplicationEvent 的具体类型,目的是可以更方便的获取对象和类的一些信息(父类、接口、泛型等)
  • getApplicationListeners(event, type):获取该事件对应的 Listeners
  • invokeListener(listener, event):调用 doInvokeListener(listener, event) 方法,最终调用 ListeneronApplicationEvent(E event) 方法实现监听器方法的调用

getApplicationListeners(event, type)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {

Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

// Potential new retriever to populate
CachedListenerRetriever newRetriever = null;

// Quick check for existing entry on ConcurrentHashMap
CachedListenerRetriever existingRetriever = this.retrieverCache.get(cacheKey);
if (existingRetriever == null) {
// Caching a new ListenerRetriever if possible
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
newRetriever = new CachedListenerRetriever();
existingRetriever = this.retrieverCache.putIfAbsent(cacheKey, newRetriever);
if (existingRetriever != null) {
newRetriever = null; // no need to populate it in retrieveApplicationListeners
}
}
}

if (existingRetriever != null) {
Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners();
if (result != null) {
return result;
}
// If result is null, the existing retriever is not fully populated yet by another thread.
// Proceed like caching wasn't possible for this current local attempt.
}

return retrieveApplicationListeners(eventType, sourceType, newRetriever);
}
  • 先从 retrieverCache 中获取,如果能获取到则直接返回
  • 获取不到则调用 retrieveApplicationListeners(eventType, sourceType, newRetriever) ,并将结果放入 retrieverCache

retrieveApplicationListeners(eventType, sourceType, newRetriever)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) {

List<ApplicationListener<?>> allListeners = new ArrayList<>();
Set<ApplicationListener<?>> filteredListeners = (retriever != null ? new LinkedHashSet<>() : null);
Set<String> filteredListenerBeans = (retriever != null ? new LinkedHashSet<>() : null);

Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.defaultRetriever) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}

// Add programmatically registered listeners, including ones coming
// from ApplicationListenerDetector (singleton beans and inner beans).
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
filteredListeners.add(listener);
}
allListeners.add(listener);
}
}

// Add listeners by bean name, potentially overlapping with programmatically
// registered listeners above - but here potentially with additional metadata.
if (!listenerBeans.isEmpty()) {
ConfigurableBeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
try {
if (supportsEvent(beanFactory, listenerBeanName, eventType)) {
ApplicationListener<?> listener =
beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
if (beanFactory.isSingleton(listenerBeanName)) {
filteredListeners.add(listener);
}
else {
filteredListenerBeans.add(listenerBeanName);
}
}
allListeners.add(listener);
}
}
else {
// Remove non-matching listeners that originally came from
// ApplicationListenerDetector, possibly ruled out by additional
// BeanDefinition metadata (e.g. factory method generics) above.
Object listener = beanFactory.getSingleton(listenerBeanName);
if (retriever != null) {
filteredListeners.remove(listener);
}
allListeners.remove(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}

AnnotationAwareOrderComparator.sort(allListeners);
if (retriever != null) {
if (filteredListenerBeans.isEmpty()) {
retriever.applicationListeners = new LinkedHashSet<>(allListeners);
retriever.applicationListenerBeans = filteredListenerBeans;
}
else {
retriever.applicationListeners = filteredListeners;
retriever.applicationListenerBeans = filteredListenerBeans;
}
}
return allListeners;
}
  1. 筛选出由编程式注入到 IOC 容器的,监听当前发布事件的监听器
  2. 筛选出由声明式 / 配置式注入到 IOC 容器的,监听当前发布事件的监听器
  3. 监听器排序