如何优雅地异步编程
引言
还得到刚毕业呆的第一家公司,是做医疗软件的。大家可能有所了解,因为年代比较久远,也为了稳定长久运行,医疗软件一般都是使用以前的技术框架来做,比如整体框架SSH,部署是远程windows手动的等。我在码代码的时候,有的代码就会看到new Thread。现在想想,得亏并发不高线上没啥问题。
可能有的兄弟就有疑问了,我业务照样走,new Thread会有什么不妥?我们又应该如何正确的进行异步编程呢?以下我就用一个个demo来分析和理解(大佬有更好的意见欢迎留言建议指正)
以下使用api + postman的方式进行demo演示
1.错误之源 new Thread
@PostMapping("/oldAsync")
public void oldAsync(@RequestParam("sleepMillTime") Integer sleepMillTime){
new Thread(() -> {
log.info("古老版本的线程使用方法,浪费线程资源,线程缺乏管理,性能差,坚决不用");
ThreadUtil.sleep(sleepMillTime);
log.info("thread sleep end");
}).start();
}
看到这么多弊端的我们应该知道这货不能用
2.线程池
平常我们在公众号文章看到的池化,大部分指的就是使用线程池来合理使用线程,完成异步编程任务。当然,线程池可不止这些好处,如下图:
其中说的定时/定期执行涉及 ScheduledThreadPoolExecutor,大家也可以查询了解一下
2.1 Executors类快速创建线程池
@PostMapping("/executors")
public void executors(@RequestParam("sleepMillTime") Integer sleepMillTime){
// ...
// 业务代码
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Executors.newCachedThreadPool();
// Executors.newFixedThreadPool(2);
executorService.submit( () -> {
log.info("线程池异步【不关心返回值】,不推荐使用Executors的方式创建线程池");
ThreadUtil.sleep(sleepMillTime);
log.info("thread sleep end");
});
}
很多时候,为了方便,会使用Executors类来快速创建线程池,但是在重点业务或者高并发的场景下,这样创建出来的线程池就会有很大的隐患,这里并不推荐这样的方式创建线程池。
不推荐Executors创建线程池的理由 :
- 1:FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
- 2:CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
- 3:无法根据业务需求来自定义线程池参数,无法为线程池命名,定位问题困难
2.2 自定义线程池
建议在不同的业务线上使用不同的自定义线程池
2.2.1 自定义线程池配置类 ThreadPoolConfig.class
@Configuration
public class ThreadPoolConfig {
private static final String THREAD_POOL_NAME_PREFIX = "demo-thread-pool";
private static final Integer THREAD_POOL_CORE_THREAD_SIZE = 8;
private static final Integer THREAD_POOL_MAX_THREAD_SIZE = 8;
private static final Integer THREAD_POOL_KEEP_ALIVE_TIME = 8;
private static final Integer THREAD_POOL_QUEUE_CAPACITY = 100;
@Bean
public ExecutorService createThreadPool() {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat(THREAD_POOL_NAME_PREFIX + "-%d")
.setDaemon(true).build();
return new ThreadPoolExecutor(
THREAD_POOL_CORE_THREAD_SIZE,
THREAD_POOL_MAX_THREAD_SIZE,
THREAD_POOL_KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(THREAD_POOL_QUEUE_CAPACITY),
factory,
new ThreadPoolExecutor.AbortPolicy());
}
private static class MyAbortPolicy implements RejectedExecutionHandler {
MyAbortPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException(
"【线程默认饱和策略】:" + r.toString() + "线程被" + executor.toString() + "线程执行器拒绝");
}
}
}
我们一般是通过 ThreadPoolExecutor 的构造函数来创建线程池,然后提交任务给线程池执行就可以了。
ThreadPoolExecutor 构造函数如下:
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.2.2 线程池使用
@Autowired
private ExecutorService executorService;
@PostMapping("/threadPoolAsync")
public void threadPoolAsync(@RequestParam("sleepMillTime") Integer sleepMillTime){
// ...
// 业务代码
executorService.submit( () -> {
log.info("线程池异步【不关心返回值】,日常使用注意合理配置线程池参数");
ThreadUtil.sleep(sleepMillTime);
log.info("thread sleep end");
});
}
线程池在配置时需要结合业务点进行参数配置,但使用起来还是很方便的。
线程池时常可以应用于单应用或者业务解耦的场景,例如,支付成功在第三方支付回调流程中异步发送短信通知服务告知消费者等,减少主流程花费时间
2.3 线程池实践知识点
实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
线程数设置
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
线程池命名
每个业务线需要线程池时最好都各自使用对应的线程池,防止互相影响,线程池命名可根据业务进行命名,方便调试定位问题
2.4 线程池获取结果
线程池在使用submit()
方法时,会返回Future
类型(实际为FutureTask
类型)的结果
获取线程执行方法的返回结果
当我们需要获取线程执行结束返回的结果时,就可以通过Future来获取,实际使用如下:
@PostMapping("/threadPoolFutureAsync")
public void threadPoolFutureAsync(@RequestParam("sleepMillTime") Integer sleepMillTime){
// ...
// 业务代码
Future<String> future = executorService.submit(() -> {
log.info("线程池异步【关心返回值】,日常使用注意合理配置线程池参数");
Thread.sleep(sleepMillTime);
return "returnVal";
});
try {
String result = future.get();
log.info("Future同步阻塞当前线程来获取返回值");
log.info("缺点:需要catch异常处理,代码不够优雅,当前线程");
log.info("Future result:{}",result);
} catch (InterruptedException | ExecutionException e) {
log.error("线程异常:{}",ExceptionUtil.stacktraceToString(e));
}
}
上述代码,当我们使用postman调用此api时,主线程会通过同步阻塞的方式来等待获取异步线程返回结果(这种方式被经常成为假异步),当笔者第一次使用这种方式获取结果时,着实很困惑,既然还会阻塞我还不如直接同步进行,还省个线程..无法忍受之后就去找解决办法了(就是等会之后介绍的思路用法)
当我们需要获取异步线程的返回值时,不建议使用Future来获取
3.Spring @Async
异步编程,大名鼎鼎的Spring当然也来掺一脚,我们先来看下示例:
AnnotationAsyncService.java
@Slf4j
@Component
@EnableAsync // @Async注解需要先使用该注解来开启功能
public class AnnotationAsyncService {
@Async("testThreadPool") // 指定线程池,若无指定则使用默认创建的线程池执行
public void testAnnotationMethod(String param,Integer sleepMillTime){
log.info("线程池异步【关心返回值】,日常使用注意合理配置线程池参数");
// 为了灵活测试使用的参数param
if("error".equals(param)){
throw new RuntimeException("测试异常");
}
ThreadUtil.sleep(sleepMillTime);
}
}
AsyncController.java
@Autowired
private AnnotationAsyncService asyncService;
@PostMapping("/annotationAsync")
public void annotationAsync(@RequestParam("param") String param,
@RequestParam("sleepMillTime") Integer sleepMillTime){
asyncService.testAnnotationMethod(param, sleepMillTime);
log.info("annotationAsync Test");
}
使用@Async
要点
@Async
也是通过AOP(切面)实现的,与@Transactional
相同- 添加
@Async
注释的方法必须是public。因为AOP的本质是动态代理,动态代理要求方法必须是public @Async
必须是跨类调用,原因也是同类直接调用无法被动态代理(导致会调用对象本身的方法而不是代理类的方法)- 需要添加@EnableAsync注解
- 使用
@Async
的类需要被Spring所管理,即需要标注@Component
@Async
标注的方法必须返回void或者Future- 建议设置BlockingQueue的大小,默认设置容量为Integer.MAX_VALUE,也就是无界队列,可能会任务堆积导致内存溢出(容量大于0时
@Async
绑定的线程池队列为LinkedBlockingQueue,否则为SynchrousQueue),
其实
@Async
也有获取返回值的功能,但是因为同样是同步阻塞式获取,这里就不展示了。
代码上看,注解用起来还是很爽的。当异步方法出现异常时,我们可以配置来捕获异常友好输出到日志,这里需要进行额外配置,配置如下
@Slf4j
@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.info("------我是Async无返回方法的异常处理方法---------");
}
}
}
4.Guava ListenableFuture
他来了她来了,他迈着大步走来了。前面我们提到,jdk为我们留下的疑难杂症:Future同步阻塞来获取异步线程返回值。
谷歌的开发人员给出了他们的答案 – Guava ListenableFuture,看到这个名称,大伙应该心里有了个想法,没错,就是常用的监听、回调的思路,如下:
@PostMapping("/threadPoolGuavaAsync")
public void threadPoolGuavaFutureAsync(@RequestParam("param") String param,
@RequestParam("sleepMillTime") Integer sleepMillTime){
// ...
// 业务代码
// guava在线程池上包装一层监听器
ListeningExecutorService listenServicePool = MoreExecutors.listeningDecorator(executorService);
ListenableFuture<String> listenableFuture = listenServicePool.submit(() -> {
log.info("线程池异步【关心返回值】,日常使用注意合理配置线程池参数");
ThreadUtil.sleep(sleepMillTime);
// 为了灵活测试使用的参数param
if("error".equals(param)){
throw new RuntimeException("测试异常");
}
return "returnVal" ;
});
// 为指定线程池线程 新增回调函数
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
// 成功
log.info("Guava result:{}",result);
}
@Override
public void onFailure(Throwable t) {
// 异常
log.error("系统异常:{}",ExceptionUtil.stacktraceToString(t));
}
},listenServicePool);
}
这一做法完美地解决了同步阻塞获取结果的问题,我们可以使用
不过单单从代码量角度,下面jdk1.8的completableFuture
则更胜一筹
5.JDK1.8 CompletableFuture
@PostMapping("/completableFutureAsync")
public String completableFuture(@RequestParam("param") String param,
@RequestParam("sleepMillTime") Integer sleepMillTime) throws ExecutionException, InterruptedException {
// ...
// 业务代码
CompletableFuture<String> future = CompletableFuture
// 异步发起话费充值
.supplyAsync(() -> completableTest(param, sleepMillTime),executorService)
// 记录异常
.exceptionally(throwable -> {
log.error("【系统异常】:{}",ExceptionUtil.stacktraceToString(throwable));
return "error";
});
return future.get();
}
private String completableTest(String param,Integer sleepMillTime){
log.info("线程池异步【关心返回值】,日常使用注意合理配置线程池参数,线程池名称:{}",Thread.currentThread().getName());
// 为了灵活测试使用的参数param
if("error".equals(param)){
throw new RuntimeException("测试异常");
}
ThreadUtil.sleep(sleepMillTime);
return "CompletableFuture result";
}
代码优雅,可读性高,还不知道选啥嘛!
两个字,舒服!
6.demo代码
【参考链接】:
1:浅谈Java Future - 知乎
2:CompletableFuture:让你的代码免受阻塞之苦
3:你还在写同步程序?Java异步编程了解一下
4:浅谈Java异步编程 - 知乎
5:笑了,面试官问我知不知道异步编程的Future
6:guava并发工具
7:拿来即用的线程池最佳实践
8:Java线程池实现原理及其在美团业务中的实践
9:异步线程CompletableFuture让你的代码速度快到飞起