动态线程池oneThread系统 — 第一部分

eve2333 发布于 20 天前 48 次阅读


首先看美团技术团队在2020年发布的一篇文章Java线程池实现原理及其在美团业务中的实践 - 美团技术团队,详细讲解了线程池的来历与原理,这个一句话带过;接下来是jdk1.8的线程池原理,java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor实现的顶层接口是Executor,把任务解耦分成两部分;用户只需提供Runnable对象,运行逻辑提交到执行器Executor中自动运行;ExecutorService接口可以自己补充任务异步future方法,扩充执行任务;还可以管控线程池;

AbstractExecutorService则是上层将整个过程穿起来,最下层的实现类ThreadPoolExecutor实现最主要的功能,这也是下面详细讲述的内容

线程池构建了一个生产者消费者模型,将线程和任务两者解耦,生产者接收到消息后就1直接执行2放到队列里面等待执行3拒绝执行;消费者则等待生产者的任务,然后进行线程的分配使用,执行完任务然后重新任务或者回收

线程池运行的状态是在线程池的运行中内部设定。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。 在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

很牛,一个变量维护两个数字,ctl这个AtomicInteger,里面高3位保存runState,低29位保存workerCount,两个变量之间互不干扰,并且经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式

ThreadPoolExecutor的运行状态有5种:

RUNNING能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING所有的任务都已终止了,workerCount(有效线程数)为0。
TERMINATED在terminated()方法执行完后进入该状态。

一般来说是running,然后你代码选择调用shutdown()就shutdown停止并继续完成队列的任务了,shutdownNow()会立刻停止到stop,然后有效线程数为0时就tidying这种休眠状态,最后terminated调用则是彻底休眠了

任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常

任务和线程两者解耦,对于后续的分配工作,线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。阻塞队列支持两个附加操作的队列。这两个附加的操作是:1.在队列为空时,获取元素的线程会等待队列变为非空。2.当队列满时,存储元素的线程会等待队列可用。生产者往队列里添加元素的线程,消费者从队列里拿元素的线程,如此循环往复实现了线程的高效复用,大大减少了销毁创建线程是时间

名称描述
ArrayBlockingQueue一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。
LinkedBlockingQueue一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险。
PriorityBlockingQueue一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
DelayQueue一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
SynchronousQueue一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
LinkedTransferQueue一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
LinkedBlockingDeque一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,他获取当前的线程状态数量等情况,判断线程池是否已停止运行,线程数现阶段是否过多,该线程是否为可回收线程等等

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

可以说自己定制,或选择已有的四个策略


worker线程工作线程

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
}

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建

线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期;​Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态,反应线程的生命周期

  • 1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  • 2.如果正在执行任务,则不应该中断线程。
  • 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  • 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

回收线程就是忙碌的线程Running: 正在干活,把门反锁了。线程回收请求获取锁失败,就知道有人忙,不打扰,跳过

空闲的线程Idle: 没干活,没锁。获取锁成功,于是进去把关闭回收线程


增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,运行返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

有一个比较复杂的流程图片就不放了


Worker线程回收依赖jvm自动回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除;worker创建后不断轮询要来任务执行,没有获得任务后就worker就自动化消除线程池引用

核心线程可以无限等待获取任务,非核心线程要限时获取任务,这句话什么意思?

在 Java 中,一个线程(Thread对象)只要它的 run() 方法还没有执行结束,它就是“活着”的,JVM 甚至 GC都不会去回收它。线程池内部有一个集合(通常是 HashSet<Worker> workers),用来存放所有的工作线程对象(Worker)。只要 Worker 里的 run() 方法一直在循环运行(死循环),这个线程就不会结束,JVM 就无法回收它。这就是线程“复用”的基础。核心线程调用队列的 take() 方法。如果队列为空,它会一直阻塞挂在这里,不会返回 null,也不会退出循环。线程因此一直存活。非核心线程调用队列的 poll(keepAliveTime) 方法。如果队列为空,它只等一段时间。如果超时还没拿到任务,方法返回 null。循环结束后,代码会执行 processWorkerExit() 方法。这个方法会将当前这个 Worker 对象从线程池的 HashSet 集合中移掉;下一次 GC时,JVM 就会把这个对象占用的内存回收掉。

我问了问ai大致回答如下

1. 误区一:“程序启动...就会默认有核心线程一直在?”

  • 你的理解:程序一启动,核心线程(假设配置了5个)就立刻创建好坐在那里等任务。
  • 实际情况默认为“懒加载” (Lazy Loading)
    • 在标准的 JDK ThreadPoolExecutor 中,线程池刚创建时,里面是空的(0个线程)。
    • 只有当你调用 execute() 或 submit() 提交第一个任务时,线程池才会创建一个核心线程。
    • 提交第二个任务,创建第二个...直到达到核心线程数(CorePoolSize)。
    • 注:除非你专门调用了 prestartAllCoreThreads() 方法,才会一启动就预热好所有核心线程。

2. 误区二:“服务量增加就自动加非核心线程?” (最容易踩的坑)

  • 你的理解:核心线程忙不过来了 -> 马上创建非核心线程帮忙。
  • 实际情况先排队,队满才加人
    • Java 线程池的默认逻辑是:
      1. 核心线程满了? -> 把新任务扔到阻塞队列(WorkQueue)里排队
      2. 队列也满了(装不下了)? -> 这时候才会创建非核心线程来救急。
    • 后果:如果你配置了一个无界队列(比如 LinkedBlockingQueue),非核心线程永远不会被创建,因为队列永远装不满。哪怕有100万个任务在排队,也只有核心线程在慢慢干活。

3. 误区三:“完全没人,就只保留核心线程空转死循环?”

  • 你的理解:核心线程没事干时,在搞“死循环”(Spinning),CPU 狂转。
  • 实际情况是“阻塞” (Blocking),不是“空转”
    • 代码层面:确实是一个 while 循环。
    • 操作系统层面:当调用 workQueue.take() 发现队列没任务时,线程会进入 WAITING 状态。
    • 形象解释:线程此时被操作系统“挂起”了(就像睡着了一样)。它不占用任何 CPU 时间片,CPU 使用率是 0%。
    • 直到有新任务进队,操作系统才会把它“叫醒”(Notify)。
    • 所以,核心线程没事干时,占用的只是内存资源,不占用 CPU 算力

线程回收的工作是在processWorkerExit方法完成的。

图10 线程销毁流程

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。


在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

1.while循环不断地通过getTask()方法获取任务。

2.getTask()方法从阻塞队列中取任务。申请worker非重入锁

3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。

4.执行任务。释放worker非重入锁,任务结束

5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

线程池在业务中的实践

1.快速响应业务请求,为了快速减少响应时间,依次调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务,不要缓存序列,因为当代人耐心是有限的

2.快速处理批量任务,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,使用多线程策略,并行计算但是不需要很高的及时性,应该关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

线程池使用面临的核心的问题在于:线程池的参数并不好配置,有点类似与凭感觉与经验,没有成熟规则;作者提出延展几个方向:

1.不用线程池?对于Actor框架,协程框架,Disruptor框架等来获得都不成熟,因此只能用线程池

2.参数设置规律总结?作者说:我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。

3.线程池参数动态化?将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

这也是本项目动态线程池的来源,说明大部分大厂都结合自身实际对现有的东西都进行了一定程度的魔改来使用

动态化线程池

线程池主要是三个参数corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。依旧是上面两个运用场景,1并行执行要求快速反应2并行执行大批量任务

  1. 为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
  2. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

动态化线程池提供如下功能:

动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。 任务监控:支持应用粒度、线程池粒度、任务粒度的Transaction监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。 负载告警:线程池队列任务积压到一定值的时候会通过大象(美团内部通讯工具)告知应用开发负责人;当线程池负载数达到一定阈值的时候会通过大象告知应用开发负责人。 操作监控:创建/修改和删除线程池都会通知到应用的开发负责人。 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。 权限校验:只有应用开发负责人才能够修改应用的线程池参数。

图18 动态化线程池功能架构

参数动态化

JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法,如下图所示:

图19 JDK 线程池参数设置接口

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:

图20 setCorePoolSize方法执行流程

线程池内部会处理好当前状态做到平滑修改,其他几个方法限于篇幅,这里不一一介绍。重点是基于这几个public方法,我们只需要维护ThreadPoolExecutor的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:

图21 可动态修改线程池参数

用户可以在管理平台上通过线程池的名字找到指定的线程池,然后对其参数进行修改,保存后会实时生效。目前支持的动态参数包括核心数、最大值、队列长度等。除此之外,在界面中,我们还能看到用户可以配置是否开启告警、队列等待任务告警阈值、活跃度告警等等。关于监控和告警,我们下面一节会对齐进行介绍。

线程池监控

除了参数动态化之外,为了更好地使用线程池,我们需要对线程池的运行状况有感知,比如当前线程池的负载是怎么样的?分配的资源够不够用?任务的执行情况是怎么样的?是长任务还是短任务?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。

1. 负载监控和告警

线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。对于这个问题,我们可以从事前和事中两个角度来看。事前,线程池定义了“活跃度”这个概念,来让用户在发生Reject异常之前能够感知线程池负载问题,线程池活跃度计算公式为:线程池活跃度 = activeCount/maximumPoolSize。这个公式代表当活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。事中,也可以从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。以上两种情况发生了都会触发告警,告警信息会通过大象推送给服务所关联的负责人。

图22 大象告警通知

2. 任务级精细化监控

在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下图所示:

图23 线程池任务执行监控

3. 运行时状态实时查看

用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:

图24 线程池实时运行情况

动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。效果如下图所示:

图25 线程池实时运行情况

面对业务中使用线程池遇到的实际问题,我们曾回到支持并发性问题本身来思考有没有取代线程池的方案,也曾尝试着去追求线程池参数设置的合理性,但面对业界方案具体落地的复杂性、可维护性以及真实运行环境的不确定性,我们在前两个方向上可谓“举步维艰”。最终,我们回到线程池参数动态化方向上探索,得出一个且可以解决业务问题的方案,虽然本质上还是没有逃离使用线程池的范畴,但是在成本和收益之间,算是取得了一个很好的平衡。成本在于实现动态化以及监控成本不高,收益在于:在不颠覆原有线程池使用方式的基础之上,从降低线程池参数修改的成本以及多维度监控这两个方面降低了故障发生的概率。希望本文提供的动态化线程池思路能对大家有帮助。