Java线程池进阶

木小丰 2022年02月26日 8,971次浏览

线程池是日常开发中常用的技术,使用也非常简单,不过想使用好线程池也不是件容易的事,开发者需要不断探索底层的实现原理,才能在不同的场景中选择合适的策略,最大程度发挥线程池的作用以及避免踩坑。

一、线程池工作流程

以下是Java线程池的工作流程,涉及创建线程的参数及拒绝策略,如果读者对这部分内容不太了解,可参考其他的文档,本文不在赘述。

image-20220226153333763

二、线程池进阶

1、线程池的创建

需要手动通过ThreadPoolExecutor创建,使用者要非常明确业务场景并定制线程池,避免误用可能导致的问题。

以下是阿里巴巴Java开发手册中的描述:

image-20220226153449939

ThreadFactory:推荐使用guava中的ThreadFactoryBuilder创建:

new ThreadFactoryBuilder().setNameFormat("name-%d").build();

2、阻塞队列在线程池中的使用

很多同学一看到阻塞队列就自然的认为出入队列都是阻塞的,使用的阻塞队列也就没必要关心拒绝策略了,其实不然,阻塞队列在任务提交和任务获取阶段使用了不同的策略。

任务提交阶段:调用的阻塞队列的offer方法,这个方法是非阻塞的,如果插入队列失败会直接返回false,并触发拒绝策略;

获取任务阶段:使用的是take方法,此方法是阻塞的;

3、保证提交阶段任务不丢失

有三种方法:使用CallerRunsPolicy拒绝策略、自定义拒绝策略、使用MQ系统保证任务不丢失。

(1)CallerRunsPolicy拒绝策略

ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理

这种是最简单的策略,但需要注意的是如果任务耗时较长,会阻塞提交任务的线程,可能会成为系统瓶颈。

(2)自定义拒绝策略

既然Java线程默认使用的是offer提交任务,那我们可以自定义拒绝策略在任务提交失败时改为put阻塞提交。

缺点也是会阻塞提交线程,不过相比CallerRunsPolicy策略更能发挥多线程的优势。

 RejectedExecutionHandler executionHandler = (r, executor) -> {
   try {
​     executor.getQueue().put(r);
   } catch (InterruptedException e) {
​     Thread.currentThread().interrupt();
​     throw new RejectedExecutionException("Producer thread interrupted", e);
   }
 };

(3)配合MQ保证任务不丢失

使用默认的ThreadPoolExecutor.AbortPolicy策略,如果抛出RejectedExecutionException异常则返回给MQ消费失败,MQ会保证自动重试。

4、保证队列、未执行完成的任务不丢失

当服务停止的时候,线程池中队列和活跃线程中未执行完成的任务可能会造成数据丢失,首先说下结论:无论采取任何策略,在Java层都不能100%保证不丢,比如机器突然断电的情况。我们还是可以采取一定的措施尽量避免任务丢失。

(1)线程池关闭

线程池关闭有两个方法:

shutdownNow方法:线程池拒绝接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行,并抛出InterruptedException异常。

shutdown方法:线程池拒绝接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。

(2)注册关闭钩子

使用以下方法注册JVM进程关闭钩子,在钩子方法中执行线程池关闭、未处理完成的任务持久化保存等。

Runtime.getRuntime().addShutdownHook()

需要注意的是:钩子方法在使用kill -9杀死进程时不会执行,一般的杀进程的方式是先执行kill,等待一段时间,如果进程还没杀死,再执行kill -9。

要保证队列中的任务不丢失,需要消费队列中的数据,发送到外部MQ中;

保证未执行完成的任务不丢失,需要在抛出InterruptedException异常后,将任务参数保证到MQ中;

需要注意的是:1)尽量不要把未完成的任务保存到本地磁盘,尤其是在经常扩缩容的弹性集群里;2)捕获InterruptedException异常后,不要做重试等耗时操作;3)需要监控任务都发送到MQ中的时间,以便调整kill -9强制执行前的等待时间。

(3)使用MQ保证任务必须执行完成

通过上面介绍的两种方式,可以处理大部分正常停止服务丢数据的任务。不过对于极端情况下,比如断电、断网等,需要严格保证任务不丢失的场景还是不能满足业务需要,这种情况下就需要依赖MQ。

方案是使用线程池的submit方法提交任务,通过future获取到任务执行完成再返回给MQ消费完成。在MQ中如何保证数据不丢失是另外一个复杂的话题了,这里不再深入探讨。

需要注意的是,如果采用这种方案,需要保证处理任务的幂等性,在操作步骤比较多的时候,复杂性也会很高。

5、ThreadLocal变量

ThreadLocal中变量的作用域是当前线程,使用线程池后会因跨线程导致数据不能传递,如果业务中使用了ThreadLocal,需要额外处理这种场景。

(1)InheritableThreadLocal

InheritableThreadLocal是在父子线程中自动传递参数,在线程池场景中不适用。

(2)手动处理

在提交任务前把ThreadLocal中的值取出来,在线程池执行时再set到线程池中线程的ThreadLocal中,并且在finally中清理数据。

缺点是每个线程池都要处理一遍,如果对上下文不熟悉,有漏传的风险。

(3)TransmittableThreadLocal

阿里开源地址:TransmittableThreadLocal

原理是通过javaagent自动处理ThreadLocal跨线程池传参,对业务开发者无感知,也是推荐的方案。

6、异常处理

(1)异常感知

execute方法:抛异常会被提交任务线程感知;

submit方法:抛异常不会被提交任务线程感知,在Future.get()执行时会被感知;

(2)统一处理方案1:异步任务里统一catch

在线程池的执行逻辑最外层,包装try、catch,处理所有异常。

缺点是: 1)所有的不同任务都要trycatch,增加了代码量。2)不存在checkedexception的地方也需要都trycatch起来,代码丑陋。

(3)统一处理方案2:覆写统一异常处理方法

此方案有两种常用实现:1)自定义线程池,继承ThreadPoolExecutor并覆写其afterExecute方法;2)创建线程池时自定义ThreadFactory,在实现里手动创建线程池,并调用Thread.setUncaughtExceptionHandler注册统一异常处理器。

(4)统一处理方案3:Future

任务提交都使用submit,并在Future.get()时捕获所有异常。

三、总结

本文从创建线程池、队列注意事项、如何保证任务不丢失、ThreadLocal、异常等方面总结了笔者的一些思考,各位读者可以对照下自己的使用场景,看本文提到的问题是否都考虑到了呢,或者你还有什么线程池方面的使用经验,欢迎交流分享。

本文链接:Java线程池进阶

作者简介:木小丰,美团Java技术专家,专注分享软件研发实践、架构思考。欢迎关注公共号:Java研发

更多精彩文章:
从MVC到DDD的架构演进
平台化建设思路浅谈
构建可回滚的应用及上线checklist实践
Maven依赖冲突问题排查经验