图源:
Spring 实现了一个简单、实用的事件框架,利用它我们可以在多个组件之间进行松耦合式的通信。
简单示例
让我们从一个简单的示例开始:
public record Email(String address, String content) {
}
public class EmailsReceivedEvent extends ApplicationEvent {
private List<Email> emails = Collections.emptyList();
public EmailsReceivedEvent(Object source) {
super(source);
}
}
这里的Email
表示一个电子邮件,EmailsReceivedEvent
表示我们的系统收到电子邮件后在内部会发布的事件,它可以包含多封邮件。
这里的事件
EmailsReceivedEvent
继承自ApplicationEvent
,这在早期 Spring (4.2之前)是必须的。
Publisher
我们需要定义一个 bean 负责事件的发送:
public class EmailsReceivedEventPublisher {
private ApplicationEventPublisher applicationEventPublisher;
/**
* 发布多个邮件收到的事件
* @param emails
*/
public void publish( List<Email> emails){
this.doPublishEvent(emails);
}
/**
* 发布邮件收到的事件(单个邮件)
* @param email
*/
public void publish( Email email){
List<Email> emails = Collections.singletonList(email);
this.doPublishEvent(emails);
}
private void doPublishEvent(List<Email> emails){
EmailsReceivedEvent event = new EmailsReceivedEvent(this);
event.setEmails(emails);
applicationEventPublisher.publishEvent(event);
}
}
具体的事件发送我们需要使用ApplicationEventPublisher.publishEvent
,这里通过依赖注入获取一个ApplicationEventPublisher
的引用。
如果你因为某些原因不方便那么做(比如因为 bean 在 Spring 启动的早期阶段实例化,无法使用依赖注入),可以使用ApplicationEventPublisherAware
获取依赖:
public class EmailsReceivedEventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
// ...
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}
当然,像上边这样为某个时间封装一个事件发布用的 bean 并非必须,你完全可以用自己喜欢的方式调用ApplicationEventPublisher.publish
来发送事件,但封装事件发布可以让代码更清晰易用。
Listener
下面需要添加我们自定义事件的监听器,具体方式是向 ApplicationContext 中添加一个 bean,并实现ApplicationListener
接口:
public class EmailsReceivedEventListener implements ApplicationListener<EmailsReceivedEvent> {
public void onApplicationEvent(EmailsReceivedEvent event) {
List<String> addresses = event.getEmails().stream()
.map(Email::address)
.collect(Collectors.toList());
System.out.printf("收到多个电子邮件:%s%n", addresses);
}
}
接口ApplicationListener
是泛型的,所以这里实现的onApplicationEvent
方法的参数是类型安全的,换言之该方法只会处理EmailsReceivedEvent
类型的已发布事件。
最后,我们使用ApplicationRunner
进行测试:
public class WebConfig {
private EmailsReceivedEventPublisher emailsReceivedEventPublisher;
ApplicationRunner applicationRunner(){
return args -> {
List<Email> emails = List.of(
new Email("tom@qq.com","123"),
new Email("lilei@qq.com","hello"),
new Email("hanmeimei@qq.com","good day"));
emailsReceivedEventPublisher.publish(emails);
};
}
}
关于
ApplicationRunner
,可以阅读我的。
注解驱动
从 Spring 4.2 开始,Spring 提供以注解的方式来定义事件监听器:
public class EmailsReceivedEventListener {
public void handleEvent(EmailsReceivedEvent event) {
List<String> addresses = event.getEmails().stream()
.map(Email::address)
.collect(Collectors.toList());
System.out.printf("收到多个电子邮件:%s%n", addresses);
}
}
@EventListener
注解标记的方法将作为事件处理方法,且 Spring 会根据参数(事件)的类型来确定该方法处理何种事件。此时无需再让监听器实现ApplicationListener
,事件处理方法的命名也可以更灵活。
也可以在@EventListener
注解中直接指定要处理的事件类型,无论方法有没有具体事件作为参数,都可以处理该事件:
public class EmailsReceivedEventListener {
// ...
EmailsReceivedEvent.class)
( public void receivedTip(){
System.out.println("some emails get.");
}
}
可以在@EventListener
中指定多个要处理的事件类型,可以通过这种方式让同一个方法处理多种类型的事件:
public class EmailsReceivedEventListener {
// ...
EmailsReceivedEvent.class, ContextRefreshedEvent.class})
({ public void receivedTip(Object event){
if (event instanceof EmailsReceivedEvent){
System.out.println("some emails get.");
}
else if (event instanceof ContextRefreshedEvent){
System.out.println("ApplicationContext is refreshed.");
}
else{
;
}
}
}
过滤
如果在测试代码中添加上下面的语句:
emailsReceivedEventPublisher.publish(Collections.emptyList());
我们就会看到下面的输出:
收到多个电子邮件:[tom@qq.com, lilei@qq.com, hanmeimei@qq.com] 收到多个电子邮件:[]
显然,这是因为没有在事件处理器中判断邮件队列是否为空导致的。当然,加上相应的判断很容易,但相比之下,我们有个更简单的实现方式——使用@EventListener
注解对事件进行过滤:
@Component
public class EmailsReceivedEventListener {
@EventListener(condition = "#erEvent.getEmails().isEmpty() == false")
public void handleEvent(EmailsReceivedEvent erEvent) {
List<String> addresses = erEvent.getEmails().stream()
.map(Email::address)
.collect(Collectors.toList());
System.out.printf("收到多个电子邮件:%s%n", addresses);
}
// ...
}
@EventListener
的condition
属性是一个SpEL,这个 SpEL 的评估结果将决定是否执行事件处理方法。在这个示例中,#erEvent
表示事件处理方法的erEvent
参数(事件对象),利用这个参数可以构建“邮件列表不为空”的SpEL表达式,即#erEvent.getEmails().isEmpty() == false
。
注意,这里的事件处理方法形参命名改为
erEvent
,而不是之前的event
,这是因为在这里的SpEL中,event
是一个预定义的变量,指代当前事件。可以通过多种方式定义SpEL表达式,比如上边的示例,也可以是
!#erEvent.getEmails().isEmpty()
关于 SpEL 的更多介绍,可以阅读我的。
新的事件
使用注解声明事件处理方法的另一个额外好处是,事件处理方法可以返回一个值,作为“新的事件”。
看下面这个示例:
public class WasteEmailsReceivedEvent extends ApplicationEvent {
@Getter
private final List<Email> emails;
public WasteEmailsReceivedEvent(Object source, @NonNull List<Email> emails) {
super(source);
this.emails = emails;
}
}
@Component
public class WasteEmailsReceivedEventListener {
@EventListener(condition = "!#werEvent.getEmails().isEmpty()")
public void handleEvent(WasteEmailsReceivedEvent werEvent){
werEvent.getEmails().forEach(email -> System.out.printf("将邮件%s移入垃圾邮件%n", email));
}
}
WasteEmailsReceivedEvent
是一个表示收到了垃圾邮件的事件,WasteEmailsReceivedEventListener
监听器负责处理该事件。
用一个 bean 来判断某个邮件是否为黑名单中的邮件:
@Component
public class EmailBlacklist {
private final Set<String> addresses = Set.of("lilei@qq.com");
public boolean inBlackList(String address){
return addresses.contains(address);
}
}
修改邮件接收事件的监听器,检查收到的邮件中是否有在黑名单中的,如果有,就返回相应的垃圾邮件事件:
@Component
public class EmailsReceivedEventListener {
@Autowired
private EmailBlacklist emailBlacklist;
@EventListener(condition = "!#erEvent.getEmails().isEmpty()")
public WasteEmailsReceivedEvent handleEvent(EmailsReceivedEvent erEvent) {
List<String> addresses = erEvent.getEmails().stream()
.map(Email::address)
.collect(Collectors.toList());
System.out.printf("收到多个电子邮件:%s%n", addresses);
List<Email> wasteEmails = erEvent.getEmails().stream()
.filter(email -> emailBlacklist.inBlackList(email.address()))
.toList();
if (wasteEmails.isEmpty()) {
return null;
} else {
return new WasteEmailsReceivedEvent(this, wasteEmails);
}
}
// ...
}
因为垃圾邮件事件监听器中加了空邮件过滤,所以这里其实可以不用判断
wasteEmails
是否为空,直接返回new WasteEmailsReceivedEvent(this, wasteEmails)
,示例中这样做是为了展示在不需要产生新消息的情况下可以返回一个null
。
现在运行示例,因为有一个垃圾邮件lilei@qq.com
,所以处理EmailsReceivedEvent
的监听器会产生一个新的WasteEmailsReceivedEvent
事件,后者也会继续触发监听器WasteEmailsReceivedEventListener
。
如果需要在事件处理方法中生成多个后续事件,可以返回一个包含多个事件的容器(Collection)或者数组:
@Component
public class EmailsReceivedEventListener {
// ...
@EventListener(condition = "!#erEvent.getEmails().isEmpty()")
public List<WasteEmailsReceivedEvent> handleEvent(EmailsReceivedEvent erEvent) {
// ...
return wasteEmails.stream()
.map(email -> new WasteEmailsReceivedEvent(EmailsReceivedEventListener.this, List.of(email)))
.collect(Collectors.toList());
}
// ...
}
这个示例中,为每个垃圾邮件单独生成一个WasteEmailsReceivedEvent
事件,并返回一个WasteEmailsReceivedEvent
事件组成的List
。
Event
从 Spring 4.2 开始,不再强制要求自定义事件必须继承自ApplicationEvent
,因此我们的示例可以改写为:
public class EmailsReceivedEvent {
@Setter
@Getter
private List<Email> emails = Collections.emptyList();
private final Object source;
public EmailsReceivedEvent(Object source) {
this.source = source;
}
}
相应的,ApplicationEventPublisher
接口也添加了一个发布Object
类型事件的重载方法:
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
this.publishEvent((Object)event);
}
void publishEvent(Object event);
}
事务绑定事件
从 Spring 4.2 开始,我们可以使用一个特殊的@TransactionalEventListener
,利用它可以监听事务特定阶段产生的事件。
@TransactionalEventListener
是@EventListener
的扩展。
看一个示例:
@Data
@TableName("email")
public class EmailEntity {
@TableId(type = IdType.AUTO)
private long id;
@TableField
@NonNull
private String address;
@TableField
@NonNull
private String content;
public Email toEmail() {
return new Email(this.getAddress(), this.getContent());
}
}
public interface EmailMapper extends BaseMapper<EmailEntity> {
}
public class EmailAddEvent extends ApplicationEvent {
@Getter
private final EmailEntity email;
public EmailAddEvent(Object source, EmailEntity email) {
super(source);
this.email = email;
}
}
@Service
public class EmailService {
@Autowired
private EmailMapper emailMapper;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* 将电子邮件批量添加到持久层
*/
@Transactional
public void addEmails(List<Email> emails){
this.addEmailEntities(emails.stream()
.map(Email::toEmailEntity)
.collect(Collectors.toList()));
}
@Transactional
public void addEmailEntities(List<EmailEntity> emailEntities){
emailEntities.forEach(emailEntity -> {
eventPublisher.publishEvent(new EmailAddEvent(EmailService.this, emailEntity));
});
emailEntities.forEach(emailEntity -> emailMapper.insert(emailEntity));
}
}
注意,这里事件发布动作必须在数据库插入相关API调用之前,否则事务回滚后产生的异常会阻止事件发布相关代码的运行,也就无法监听和处理事务回滚后的相关事件。
示例中使用 MyBatisPlus 向持久层批量加入电子邮件记录,每条记录添加时都会发送一个EmailAddEvent
事件,表示添加电邮记录这个行为的发生。
因为是批量插入,为了确保数据库的数据一致性,我们需要使用事务,在这里就是用@Transactional
标记批量插入方法。
因为 Spring 事务是通过 AOP 实现,所以关联的"自调用"方法(这里是
addEmails
)同样要用@Transactional
标记。要运行这个示例,需要添加数据库相关依赖,并且连接上一个数据库。
关于如何在 Spring Boot 中使用 MyBatisPlus 和数据库,可以看我的相关文章:
现在,如果所有数据成功插入,就会正常提交事务,否则就会触发事务回滚,数据库会恢复到插入前。
创建一个监听器用于监听在这个事务中所发布的事件:
@Component
public class EmailAddEventListener {
@TransactionalEventListener
public void addSuccess(EmailAddEvent eaEvent){
System.out.printf("Email %s is already add to db.%n", eaEvent.getEmail());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void addFail(EmailAddEvent eaEvent){
System.out.printf("Email %s add to db failed.%n", eaEvent.getEmail());
}
}
addSuccess
用于监听事务成功提交后事务产生的EmailAddEvent
事件,addFail
用于监听事务回滚后事务产生的EmailAddEvent
事件。
实际上是通过@TransactionalEventListener
的phase
属性决定在事务的哪个阶段触发事件监听:
-
AFTER_COMMIT
,事务成功提交,默认值。 -
AFTER_ROLLBACK
,事务失败,回滚。 -
AFTER_COMPLETION
,事务完成(无论失败还是成功)。 -
BEFORE_COMMIT
,事务提交之前。
可以用下边的表结构进行测试:
CREATE TABLE `email` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`address` varchar(255) NOT NULL,
`content` text NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `address_idx` (`address`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
这个表的address
字段设置了唯一索引,如果重复添加相同的数据,就会触发事务回滚以及相应事件监听。
要说明的是,TransactionalEventListener
仅用于监听事务中发布的事件,如果没有事务也就不起作用。
排序
可以用@Order
对监听器进行排序,以确保某个监听器在另一个之前被调用:
@Component
public class EmailsReceivedEventListener {
@Autowired
private EmailBlacklist emailBlacklist;
@Order(2)
@EventListener(condition = "!#erEvent.getEmails().isEmpty()")
public List<WasteEmailsReceivedEvent> handleEvent(EmailsReceivedEvent erEvent) {
// ...
}
@Order(1)
@EventListener({EmailsReceivedEvent.class, ContextRefreshedEvent.class})
public void receivedTip(Object event) {
// ...
}
}
数字越小越先被执行。
异步
默认情况下,Spring 的事件模型是同步的(单线程),这样的好处是事件发布和监听都是顺序执行的,并且可以很容易地在事件中返回新的事件来触发新的后续处理。此外,前边介绍的事务绑定事件也只能在这种情况下生效。
我们看下边的示例:
@Component
public class WasteEmailsReceivedEventListener {
@EventListener(condition = "!#werEvent.getEmails().isEmpty()")
public void handleEvent(WasteEmailsReceivedEvent werEvent) throws InterruptedException {
Thread.sleep(1000);
werEvent.getEmails().forEach(email -> System.out.printf("将邮件%s移入垃圾邮件%n", email));
}
}
现在,处理垃圾邮件的监听器每次执行都要等待1秒,运行测试很容易能看到这种“迟滞”。
@Async
如果每封垃圾邮件移入回收站这个动作都可以并行执行,那我们就可以用异步执行(@Async
)来改善性能:
@EnableAsync
public class WebConfig {
// ...
}
@Component
public class WasteEmailsReceivedEventListener {
@Async
@EventListener(condition = "!#werEvent.getEmails().isEmpty()")
public void handleEvent(WasteEmailsReceivedEvent werEvent) throws InterruptedException {
// ...
}
}
再次运行就能看到垃圾处理过程是有多么的迅速。
需要注意的是,因为是异步执行,所以如果事件处理方法中产生异常,调用方是无法捕获这个异常的。此外,异步执行的时候也不能通过返回事件的方式发布新的事件,而是需要手动发布事件:
public class WasteEmailRemovedEvent extends ApplicationEvent {
@Getter
private final Email email;
public WasteEmailRemovedEvent(Object source, Email email) {
super(source);
this.email = email;
}
}
@Component
public class WasteEmailsReceivedEventListener {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Async
@EventListener(condition = "!#werEvent.getEmails().isEmpty()")
public void handleEvent(WasteEmailsReceivedEvent werEvent) throws InterruptedException {
// ...
werEvent.getEmails().forEach(email -> {
this.eventPublisher.publishEvent(new WasteEmailRemovedEvent(WasteEmailsReceivedEventListener.this, email));
});
}
}
@Component
public class WasteEmailRemovedEventListener {
@EventListener
public void eventHandler(WasteEmailRemovedEvent werEvent){
System.out.printf("Email %s is already removed.%n", werEvent.getEmail());
}
}
ApplicationEventMulticaster
除了使用@Async
让事件处理方法异步执行外,我们还可以修改事件模型的默认策略,让所有的事件监听都异步进行,比如:
@Configuration
public class WebConfig {
// ...
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster =
new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}
此时并没有用@EnableAsync
开启异步相关注解,也没有用@Async
标记相应方法,但运行示例就能发现所有事件监听都很快发生,几乎没有延迟。
这样做会导致事务绑定事件(
@TransactionalEventListener
)无法正常使用。
Application Event
Sprring 本身就定义了很多事件,用于内部的处理,比如 Spring 在启动阶段产生的ContextRefreshedEvent
等,我们可以用同样的方式监听这些事件,以在特定阶段执行某些任务。
相关的内容我在中有过介绍,这里不过多赘述。
泛型事件
可以在定义一个有泛型参数的事件,并利用泛型来区分事件并监听:
public class GenericMsgEvent<T> extends ApplicationEvent {
@Getter
private final T msg;
public GenericMsgEvent(Object source, T msg) {
super(source);
this.msg = msg;
}
}
@Component
public class GenericMsgEventListener {
@EventListener
public void strEventHandler(GenericMsgEvent<String> gmEvent){
System.out.printf("String msg event is get, msg:%s.%n", gmEvent.getMsg());
}
@EventListener
public void intEventHandler(GenericMsgEvent<Integer> gmEvent){
System.out.printf("Int msg event is get, msg:%s.%n", gmEvent.getMsg());
}
}
这里两个方法strEventHandler
和intEventHandler
分别用于泛型参数是String
和泛型参数是Integer
时的事件监听。
看起来很不错,但实际上有一个“陷阱”,假如你像下边这样发布事件:
eventPublisher.publishEvent(new GenericMsgEvent<String>(this, "hello"));
实际上并不会触发任何事件监听,这是因为new GenericMsgEvent<String>(this, "hello")
中的泛型String
仅存在于编译阶段,运行时会对泛型进行类型擦除,实际上这里相当于GenericMsgEvent<Object>
,所以不会触发针对泛型参数是String
或Integer
定义的监听器。
关于 Java 泛型中的类型擦除,可以阅读我的。
所以,使用这类事件的正确方式是派生出一个特定类型后发布该类型的事件,比如:
public class IntMsgEvent extends GenericMsgEvent<Integer>{
public IntMsgEvent(Object source, Integer msg) {
super(source, msg);
}
}
public class StringMsgEvent extends GenericMsgEvent<String> {
public StringMsgEvent(Object source, String msg) {
super(source, msg);
}
}
现在可以用这两个类型发布事件:
eventPublisher.publishEvent(new StringMsgEvent(this, "hello"));
eventPublisher.publishEvent(new IntMsgEvent(this, 11));
这两个事件都会被对应的监听器方法监听到。
当然,这种方式多少有些“无趣”,并且不得不定义大量派生类型,为此,Spring 给我们提供了一种额外方式:
public class EntityCreatedEvent<T> extends ApplicationEvent implements ResolvableTypeProvider {
private final T entity;
public EntityCreatedEvent(Object source, T entity) {
super(source);
this.entity = entity;
}
public ResolvableType getResolvableType() {
ResolvableType resolvableType = ResolvableType.forClassWithGenerics(getClass(), ResolvableType.forInstance(entity));
return resolvableType;
}
}
在这里,泛型事件EntityCreatedEvent<T>
实现了ResolvableTypeProvider
接口,并且在方法getResolvableType
中返回一个“确切的”泛型类型。
在中我提到过
ResolvableType
这个类型,Spring 可以通过它来确认泛型的具体类型。
监听器的写法与之前的示例一致,这里不再赘述。
现在无需使用任何派生类,直接使用泛型事件进行发布:
eventPublisher.publishEvent(new EntityCreatedEvent<>(this, new Email("123@tomcom","sdfdsf")));
总结
Spring 利用经典的观察者模式实现了一个简单的事件机制,我们可以利用这个机制在多个模块之间进行灵活且松耦合式的通信。对于简单的工作来说这样就足够了,如果需要更高的性能和可靠性,就需要借助其他第三方的事件通信模块。
关于观察者模式的更多说明,可以阅读。
The End,谢谢阅读。
本文中的完整示例可以从
文章评论