图源:
在之前的文章中,我多次介绍过在 Spring 中如何使用@Async
注解让方法调用变成“异步执行”:
-
在中,介绍了如何让定时任务使用
@Async
变成异步执行。 -
在中,介绍了如何让事件监听使用
@Async
变成异步执行。
下面,本篇文章将详细探讨@Async
在 Spring 中的用途。
简单示例
老规矩,我们从一个简单示例开始说明:
public class Fibonacci {
/**
* 返回斐波那契数列的第n位的值
*
* @param n 从1开始(包括)
* @return
*/
public int fibonacci(int n) throws InterruptedException {
Thread.sleep(100);
if (n <= 2) {
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
/**
* 打印斐波那契数列第n位的结果到控制台
* @param n 从1开始(包括)
* @throws InterruptedException
*/
public void print(int n) throws InterruptedException {
System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
}
}
这里定义一个 bean Fibonacci
,负责返回或打印斐波那契数列。
为了让产生斐波那契数列元素的过程“更明显”,这里让每一步递归调用都延迟0.1秒(
Thread.sleep(100)
)。
使用ApplicationRunner
测试:
public class WebConfig {
private Fibonacci fibonacci;
public ApplicationRunner applicationRunner() {
return args -> {
fibonacci.print(5);
fibonacci.print(6);
fibonacci.print(7);
};
}
}
输出:
fibonacci 5=5 fibonacci 6=8 fibonacci 7=13
整个测试用例都是顺序执行的,且存在明显的延迟。
可以利用@Async
将相应方法的执行改为异步来改善性能:
public class Fibonacci {
// ...
public void print(int n) throws InterruptedException {
System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
}
}
public class WebConfig {
// ...
}
不要忘了在配置类上添加
@EnableAsync
以启用 Spring 的异步执行功能。
实现原理
实际上 Spring 的异步执行是通过使用代理(JDK 代理或 CGLIB)或者 AspectJ 织入来实现的。
AspectJ 是一个主流的 AOP 框架。
这点可以通过@EnableAsync
注解的定义看出:
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default 2147483647;
}
这些属性有如下用途:
-
annotation
,指定用于标记异步执行方法的注解,默认情况下 Spring 使用@Async
或javax.ejb.Asynchronous
。 -
mode
,实现机制,有两个可选项:-
AdviceMode.PROXY
,用代理实现。 -
AdviceMode.ASPECTJ
,用 AspectJ 实现。
-
-
proxyTargetClass
,是否使用 CGLIB 代理,这个属性只有mode
为AdviceMode.PROXY
时才生效。 -
order
,设置AsyncAnnotationBeanPostProcessor
在BeanPostProcessor
中的执行顺序,默认为最后运行,以便不影响之前可能存在的代理。
我们可以看出,默认情况下 Spring 使用 JDK 代理来实现异步调用,因此它也具备 Spring AOP 相同的限制。
AOP 实现
为了更好的说明问题,我们可以用 AOP 来自己实现一个类似的异步执行机制:
RetentionPolicy.RUNTIME)
(ElementType.METHOD)
(public @interface MyAsync {
}
public class MyAsyncAspect {
value = "execution(void *(..)) && @annotation(annotation)")
( public Object asyncCall(ProceedingJoinPoint pjp, MyAsync annotation) {
new Thread(() -> {
try {
pjp.proceed();
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}).start();
return null;
}
}
public class Fibonacci {
// ...
public void print(int n) throws InterruptedException {
System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
}
}
更多关于 AOP 的内容,可以阅读我的。
限制
在学习 AOP 的时候,我们知道因为 AOP 的实现机制的关系,存在着一些限制。而 Spring 异步执行采用和 Spring AOP 类似的实现原理,所以也存在同样的问题。
借鉴前边学到的内容,我们很容易就能总结出以下限制:
在默认情况下,异步执行使用 JDK 动态代理实现,因此:
-
只能让
public
的方法异步执行(JDK 动态代理使用接口实现)。 -
“自调用”时可能无法异步执行(绕过代理)。
如果使用 CGLIB 代理实现,限制会相对少一些(可以代理protected
方法),但依然存在自调用时的问题。
关于此类限制的讨论和相应的解决方案,可以阅读 ,里边有详细描述,这里不再赘述。
返回结果
通常情况下异步执行方法返回的都是void
,但如果我们需要返回异步执行的结果,要怎么做?
看一个示例:
public class WebConfig {
private Fibonacci fibonacci;
private static final int MAX_FIBONACCI_INDEX = 40;
ApplicationRunner applicationRunner2() throws InterruptedException {
return new ApplicationRunner() {
public void run(ApplicationArguments args) throws Exception {
List<Integer> numbers = new ArrayList<>();
for (int n = 1; n <= MAX_FIBONACCI_INDEX; n++) {
numbers.add(fibonacci.fibonacci(n));
}
System.out.println(numbers);
}
};
}
}
这里获取40个斐波那契元素,然后一起输出。因为其中每次获取斐波那契数都是顺序执行(单线程),所以相当耗时。
最终输出:
[1, 1, 2, ... , 63245986, 102334155] com.example.async.WebConfig$2.run() is called, use 876 mills.
下面我们用异步执行来改善效率。
要让方法异步执行并返回一个值,需要让方法返回一个Future
类型:
public class Fibonacci {
// ...
public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
int result = fibonacci(n);
return CompletableFuture.completedFuture(result);
}
}
这里的CompletableFuture
是 Spring 的一个Future
实现,可以利用CompletableFuture.completedFuture
返回一个包含异步调用结果的Future
对象。
最终,我们需要收集所有异步执行返回的Future
对象,并通过Future.get
方法获取其中的异步执行结果:
public class WebConfig {
// ...
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
public void run(ApplicationArguments args) throws Exception {
List<Integer> numbers = new ArrayList<>();
List<Future<Integer>> futures = new ArrayList<>();
for (int n = 1; n <= MAX_FIBONACCI_INDEX; n++) {
futures.add(fibonacci.asyncFibonacci(n));
}
for (Future<Integer> future : futures) {
numbers.add(future.get());
}
System.out.println(numbers);
}
};
}
// ...
}
输出:
[1, 1, 2, ... , 63245986, 102334155] com.example.async.WebConfig$1.run() is called, use 380 mills.
效率提升了一倍多。
并发相关的经验告诉我们,将并发用于密集计算,计算规模(并行任务数目)越大,性能提升越明显。
ThreadPoolTaskExecutor
默认情况下,Spring 使用ThreadPoolTaskExecutor
执行异步方法:
public class WebConfig {
// ...
private TaskExecutor taskExecutor;
// ...
public ApplicationRunner applicationRunner3(){
return args -> {
System.out.println(taskExecutor);
if (taskExecutor instanceof ThreadPoolTaskExecutor){
var executor = (ThreadPoolTaskExecutor) taskExecutor;
System.out.println("getThreadNamePrefix:%s".formatted(executor.getThreadNamePrefix()));
System.out.println("getActiveCount:%s".formatted(executor.getActiveCount()));
System.out.println("getCorePoolSize:%s".formatted(executor.getCorePoolSize()));
System.out.println("getKeepAliveSeconds:%s".formatted(executor.getKeepAliveSeconds()));
System.out.println("getMaxPoolSize:%s".formatted(executor.getMaxPoolSize()));
System.out.println("getQueueCapacity:%s".formatted(executor.getQueueCapacity()));
System.out.println("getPoolSize:%s".formatted(executor.getPoolSize()));
}
};
}
}
输出:
getThreadNamePrefix:task- getActiveCount:0 getCorePoolSize:8 getKeepAliveSeconds:60 getMaxPoolSize:2147483647 getQueueCapacity:2147483647 getPoolSize:8
ThreadPoolTaskExecutor
的这些 Getter 返回的信息包括:
-
getThreadNamePrefix
,线程名称前缀。 -
getActiveCount
,当前存活的线程数量。 -
getCorePoolSize
,核心线程池大小(超过该值后会扩充线程池,直到最大线程池大小)。 -
getMaxPoolSize
,最大线程池大小(超过该值后会将线程放入等待队列)。 -
getQueueCapacity
,等待队列的容量(被塞满后新的线程将被丢弃)。 -
getKeepAliveSeconds
,线程存活数目。 -
getPoolSize
,当前线程池大小。
总的来说,`ThreadPoolTaskExecutor
可以合理地复用线程:如果所需线程数目超过核心线程池大小,会将线程放入等待队列,以等待核心线程空闲后执行。如果等待队列被塞满,会添加新的线程以期望能够加快线程执行。最后,如果添加的线程数目超过最大线程池大小,才会按照规则丢弃线程。
这个过程可以用下图表示:
图源:
在早期的 Spring 版本,默认使用
simpleAsyncTaskExecutor
执行异步调用,该TaskExecutor
不会进行线程复用,只是简单的增加新的线程。这里比较重要的是核心线程池大小,一般来说设置为执行代码所在机器的CPU核心数即可,我的笔记本是8核的,所以这里 Spring 将该值设置为8。
一般来说,使用默认设置的ThreadPoolTaskExecutor
就可以了,如果需要进行修改,可以:
public class AsyncConfig implements AsyncConfigurer {
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
threadPoolTaskExecutor.setCorePoolSize(8);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
此时在异步方法中打印线程名称:
public class Fibonacci {
public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
// ...
}
// ...
}
就能看到控制台输出的线程名称是ThreadPoolTaskExecutor-x
,而不是之前默认的task-x
。
单独指定 Executor
我们也可以为某些异步方法单独指定一个Executor
,而不是使用全局的Executor
:
public class WebConfig {
// ...
public Executor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("another-ThreadPoolTaskExecutor-");
return threadPoolTaskExecutor;
}
}
public class Fibonacci {
// ...
"threadPoolTaskExecutor")
( public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
// ...
}
// ...
}
就像上面的示例,可以在@Async
中指定一个Executor
类型的 bean,Spring 将用这个 bean 执行这个方法的异步调用。
异常处理
如果异常方法返回的是Future
,且异步调用会产生异常,将通过Future.get
抛出:
public class Fibonacci {
// ...
public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
if (n < 1) {
throw new IllegalArgumentException("n 不能小于1");
}
// ...
}
// ...
}
public class WebConfig {
// ...
public ApplicationRunner applicationRunner3() {
return args -> {
Future<Integer> future = fibonacci.asyncFibonacci(0);
System.out.println(future.get());
};
}
}
这里会抛出一个IllegalStateException
异常。
如果返回类型是void
,Spring 会使用一个默认的“异常处理器”SimpleAsyncUncaughtExceptionHandler
来处理异常:
public class Fibonacci {
// ...
public void print(int n) throws InterruptedException {
if (n < 1) {
throw new IllegalArgumentException("n不能小于1");
}
System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
}
}
public class WebConfig {
public ApplicationRunner applicationRunner3() {
return args -> {
fibonacci.print(0);
};
}
}
错误信息:
2023-06-16T16:52:17.509+08:00 ERROR 27872 --- [lTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.example.async.Fibonacci.print(int) throws java.lang.InterruptedException ...
可以用一个自定义异常处理器作为 Spring 异步调用时的全局异常处理器:
public class AsyncConfig implements AsyncConfigurer {
// ...
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.out.println("Exception message - " + ex.getMessage());
System.out.println("Method name - " + method.getName());
for (Object param : params) {
System.out.println("Parameter value - " + param);
}
}
};
}
}
The End,谢谢阅读。
本文的完整示例可以通过
参考资料
文章评论