动态线程池oneThread系统 — 第二部分 项目启动与概念

eve2333 发布于 9 天前 14 次阅读


我选择在本地已有的ubuntu22版本中配置nacos,其实这台虚拟机中包含了mq,redis,pgsql,minio等等中间件,还有一点就是千万不要在ubuntu桌面操作,因为不知道什么程序占用大量资源卡的要命

将当前用户加入 docker 用户组 sudo usermod -aG docker $USER
刷新权限(无需重启)newgrp docker

docker run -d \
  --name nacos \
  -p 8848:8848 \
  -p 9848:9848 \
  -e TIME_ZONE='Asia/Shanghai' \
  -e MODE=standalone \
  -e JVM_XMS=128m \
  -e JVM_XMX=256m \
  nacos/nacos-server:v2.1.0

Docker 启动配置参数说明:

  • -d 后台运行模式 :以守护进程方式运行容器,不占用当前终端窗口。
  • --name nacos 容器名称标识 :指定容器名称为 nacos,便于后续管理操作。
  • -p 8848:8848 关键端口映射 :将宿主机的 8848端口 映射到 Nacos 容器的 HTTP 接口和控制台端口。
  • -p 9848:9848 gRPC通信端口 :Nacos 2.0+ 必需端口(用于客户端与服务器通信),不开放会导致服务不可用。
  • -e TIME_ZONE='Asia/Shanghai' 时区 :强制容器使用 中国时区(GMT+8)
  • -e MODE=standalone 运行模式配置 :设置环境变量启动 单机模式 (集群模式需改为 cluster)。
  • -e JVM_XMS=128m 初始堆内存分配 :Nacos 启动时立即占用的内存。
  • -e JVM_XMX=128m 最大堆内存上限 :允许 Nacos JVM 使用的最大内存。
  • nacos/nacos-server:2.1.0 镜像定义 :指定使用的官方 Nacos 镜像及版本号。

将配置适配到本地 Ubuntu 虚拟机中的 Nacos,核心在于将**服务器地址(Server Addr)**从公网域名改为你虚拟机的 IP 地址

以下是具体的修改步骤和 IDEA 中的配置写法:

第一步:获取虚拟机 IP 地址

在你的 Ubuntu 虚拟机终端中输入:

    ip addr
# 或者
ifconfig
  

找到类似 192.168.x.x 或者 10.x.x.x 的 IPv4 地址(假设你的 IP 是 192.168.50.10)。


第二步:修改 IDEA 中的 VM Options

你需要修改两个应用的启动参数。建议将 unique-name 改为一个本地特定的后缀(例如 -local),以示区分。

1. NacosCloudExampleApplication

在 IDEA 的 Run/Debug Configurations 中,找到该应用,在 VM options 中填入: code Textdownloadcontent_copyexpand_less

    -Dunique-name=-local
-Dspring.cloud.nacos.server-addr=192.168.50.10:8848
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
  
  • 注意:请将 192.168.50.10 替换为你第一步获取的真实虚拟机 IP。
  • unique-name 我改为了 -local,你也可以用其他的,只要和下面 Nacos 建配置时保持一致即可。

2. DashboardDevApplication

同样在 VM options 中填入:

    -Donethread.nacos.server-addr=192.168.50.10:8848
  
  • 同样替换 IP 地址。

第三步:在本地 Nacos 新建配置

  1. 访问 Nacos 控制台
    在宿主机(你运行 IDEA 的电脑)浏览器访问:http://192.168.50.10:8848/nacos
    • 如果访问不通,请检查 Ubuntu 的防火墙是否放行了 8848 端口(见文末提示)。
  2. 新建配置
    点击“配置管理” -> “配置列表” -> “+”号新建。
    • Data ID: onethread-nacos-cloud-example-local.yaml
      • 注意:这里的后缀 -local 必须与你在 IDEA VM Options 里写的 -Dunique-name 的值保持一致。
    • Group: DEFAULT_GROUP
    • 配置格式: 选择 YAML
  3. 配置内容
    复制教程中的 YAML 内容,并根据本地情况做如下微调(粘贴进去):
    onethread:
  nacos:
    # 这里的 data-id 要和你当前创建的文件名一致
    data-id: onethread-nacos-cloud-example-local.yaml
    group: DEFAULT_GROUP
  config-file-type: yaml
  web:
    core-pool-size: 10
    maximum-pool-size: 200
    keep-alive-time: 60
    notify:
      # 这里改成你的手机号或随意填,本地测试不重要
      receives: 13800000000
  notify-platforms:
    platform: DING
    # 如果本地测试不需要发钉钉消息,这里可以不管,或者填个假的
    url: https://oapi.dingtalk.com/robot/send?access_token=xxx
  executors:
    - thread-pool-id: onethread-producer
      core-pool-size: 12
      maximum-pool-size: 24
      keep-alive-time: 19999
      work-queue: ResizableCapacityLinkedBlockingQueue
      queue-capacity: 10000
      rejected-handler: CallerRunsPolicy
      allow-core-thread-time-out: false
      notify:
        receives: 13800000000
        interval: 5
      alarm:
        enable: true
        queue-threshold: 80
        active-threshold: 80
    - thread-pool-id: onethread-consumer
      core-pool-size: 10
      maximum-pool-size: 20
      keep-alive-time: 9999
      work-queue: LinkedBlockingQueue
      queue-capacity: 1024
      rejected-handler: AbortPolicy
      allow-core-thread-time-out: true
      notify:
        receives: 13800000000
        interval: 5
      alarm:
        enable: true
        queue-threshold: 80
        active-threshold: 80
  

重要提示:网络连通性

Ubuntu 默认可能开启了防火墙。你需要放行 8848 和 9848 端口,确保 Ubuntu 里的 Nacos 是以单机模式启动 sh startup.sh -m standalone

你的虚拟机网络模式是 Bridged (桥接) 或者 NAT 模式且做了端口转发,保证宿主机能 ping 通虚拟机的 IP。或者你足够有钱可以买个4C8G的云服务器(或者用校企合作的hwtx送的代金券,不要白不要避免时间过期了)

依次启动 NacosCloudExampleApplicationDashboardDevApplication 服务 VM options 

-Dunique-name=-local -Dspring.cloud.nacos.server-addr=192.168.50.10:8848 --add-opens=java.base/java.util.concurrent=ALL-UNNAMED

Q:unique-name 参数是什么?

A:我们通过 Nacos 配置中心读取各自对应的配置,如果不使用 unique-name 区分,大家的 配置名称是一致的,那么就会出现配置读取错乱的情况。各自改个不会和别人冲突的,比如自己名称英文加数字之类的。

Q:--add-opens 参数是什么?

A:在 Java 9+ 中,模块需要显式声明它们导出了哪些包(exports)以及开放哪些包允许深度反射(opens)。java.base 默认只 exports 了它的公共API(如 java.util.concurrent 包的公共类和方法),但不会自动opens任何包 。这里放开 JUC 包下的权限,方便后续业务进行

前端

如果你是 Windows 电脑,可以通过 Nginx 快速启动的方式部署 oneThread 前端工程。下载这个 Nginx 压缩包,并解压。http://localhost:5176/ 跳转 oneThread 登录页面,既为前端启动成功,可以正常使用系统。

或者下载 onethread-dashboard 前端项目,依次执行下述命令。

1. 通过 npm 安装依赖

进入 onethread-dashboard 项目的根目录。

# 使用项目指定的pnpm版本进行依赖安装
npm i -g corepack
​
# 安装依赖
pnpm install

2. 启动项目

# 启动项目
pnpm dev

oneThread 是基于 配置中心 构建的动态可观测 Java 线程池框架,这种设计是没有前端控制台的,所有操作围绕配置中心展开。为了帮助大家更好理解动态线程池,oneThread 在基于配置中心的基础上,抽象了一层控制台。简单一句话说明就是,基于 Nacos 配置中心和注册中心实现的控制台。

大家看咱们项目结构时候,如果 module 命名后面跟着 -dev 就是基于控制台的个性化开发。常规公司使用基于配置中心的动态线程池是没有这块设计的。

因为 Nginx 启动和前端源码启动访问方式不同,这里再补充下:

用户名和密码配置在 dashboard-dev 模块的 application.yaml 中修改:

oneThread 项目中使用了 SaToken 框架来实现登录功能,一款非常优秀的开源框架。由于我们并未使用 Redis 或 JWT 等持久化存储方案

控制台界面

实例数量 :表示该项目在 Nacos 注册中心注册的实例数量。例如,本地仅启动一个实例时,显示为 1;若启动两个实例,则显示为 2。

线程池数量 :配置文件中定义的线程池实例数量。

Web线程池 :标识配置文件中是否包含 Web 线程池配置;若包含,则支持 Web 线程池的动态调整。

线程池管理

1. 线程池列表

当前页面展示所有命名空间和服务中包含的线程池配置,也是登录后的默认页面。页面可以向右滑动,因为配置较多,所以才用了滑动方式展示。刚登录没什么项目

列表字段说明:

  • 线程池标识 :对应线程池配置项 onethread.executors[x].thread-pool-id 的值。
  • 实例数量 :当前线程池关联的已启动服务实例数量。

编辑功能:

image-20250629213354688.png

若修改上述参数,变更请求会通过 dashboard-dev 服务组装参数并调用 Nacos 接口,更新对应的配置文件。各客户端应用通过监听 Nacos 配置中心,可实现线程池配置的实时刷新。

实例列表功能:点击可跳转至线程池实例详情页面;若实例数量为 0,则无法跳转。

2. 线程池实例

该列表用于展示当前线程池在各服务实例中的实时运行参数。这个刚开始没什么东西

image-20250629213844775.png

查看详情功能:

image-20250629220942760.png

线程池所在服务中最新的全量参数展示,点击“刷新”按钮,可向线程池实例所在的应用发起请求,获取并展示最新的运行时参数。

线程池监控

该页面依托 Prometheus 存储和采集线程池监控数据,并通过 Grafana 进行可视化展示。

image-20250629214716579.png

目前默认调用我封装的 Grafana 服务。如果需要使用自定义的 Grafana 实例,可通过修改 dashboard-dev 服务 application.yaml 文件中的 grafana.url 参数进行配置。

onethread:users:# 避免再使用数据库,用户名直接固定写到配置文件- admin,admin
    - test,test
  nacos:
    server-addr: http://127.0.0.1:8848user-login:
    exclude:
      interfaces:|-
        /api/onethread-dashboard/auth/login
  namespaces:# 需要从 Nacos 中读取以下命名空间(自定义)配置文件检索动态线程池- public
    - framework
    - common
    - test
    - prod
  grafana:# 如果本地有安装 Grafana 展示,可以替换为本地路径
    url: http://grafana.nageoffer.com/d/gxBvKxYNz/7adffa3?orgId=1&from=now-6h&to=now&timezone=browser&var-application_name=nacos-cloud-example&var-dynamic_thread_pool_id=onethread-consumer&refresh=5s&theme=light&kiosk=true

Web 线程池管理

1. 线程池列表

当前页面展示所有命名空间和服务中包含的 Web 线程池配置。

image-20250629215733509.png

列表字段说明:

  • 命名空间/服务名称 :含义同上,已在前文说明。
  • 数据ID :对应 Nacos 中的 data-id 字段。
  • 分组标识 :对应 Nacos 中的 group 字段。
  • Web容器名称 :见名知意。
  • 实例数量 :当前 Web 线程池关联的已启动服务实例数量。

编辑功能:

image-20250629220151374.png

若修改上述参数,变更请求会通过 dashboard-dev 服务组装参数并调用 Nacos 接口,更新对应的配置文件。各客户端应用通过监听 Nacos 配置中心,可实现 Web 线程池配置的实时刷新。

实例列表功能:点击可跳转至 Web 线程池实例详情页面;若实例数量为 0,则无法跳转。

2. 线程池实例

该列表用于展示当前 Web 线程池在各服务实例中的实时运行参数。

image-20250629220237339.png

查看详情功能:

image-20250629220907385.png

Web 线程池所在服务中最新的全量参数展示,点击“刷新”按钮,可向 Web 线程池实例所在的应用发起请求,获取并展示最新的运行时参数。

相关概念

其实这玩意章在第一片文章中已经参数过了,大体意思就是线程池线程池可以减少创建销毁线程的开销,方便短时间内的大量请求处理。有两个常见的应用场景,分别是:快速响应用户请求和一个快速处理批量任务。

快速响应用户请求

通过线程池的方式并行查询,那查询全部商品信息的时间就取决于多个流程中最慢的那一条。然后是 两个代码,我们仔细看一下

这是一个非常经典的高并发低延迟(Low Latency)场景优化案例,通常被称为“Scatter-Gather”(分散-聚合)模式

在电商详情页(PDP)、首页聚合接口等场景中,后端往往需要调用多个下游服务(库存、价格、评论、推荐、优惠券等)并将结果汇总返回给前端。

下面我将从核心原理、线程池配置玄机、代码执行流程、以及生产环境注意事项四个方面,为你详细、通透地讲解这段代码背后的逻辑。


一、 核心原理:串行 vs 并行

1. 串行模式(Serial)

也就是你第一段代码的逻辑。

  • 生活类比:你去快餐店点餐。你先去排队买汉堡(50ms),拿到后再去排队买可乐(80ms),拿到后再去排队买薯条(50ms)。
  • 总耗时:50ms + 80ms + 50ms = 180ms(理论最小值)。
  • 缺点:CPU 在等待 I/O(如网络请求、数据库查询)时是空闲的,没有利用好资源,用户等待时间长。

2. 并行模式(Parallel)

也就是你第二段代码的逻辑。

  • 生活类比:你带了两个朋友一起去快餐店。你排队买汉堡,朋友A排队买可乐,朋友B排队买薯条。大家同时开始排队。
  • 总耗时:取决于最慢的那个人。虽然汉堡和薯条50ms就拿到了,但必须等买可乐的朋友(80ms)回来了,大家才能一起吃饭。
  • 公式:Max(TaskA, TaskB, TaskC)。
  • 优点:大幅压缩了响应时间(RT),提升用户体验。

二、 线程池配置的玄机(核心考点)

你提到的配置如下:

    new ThreadPoolExecutor(
    6,                  // corePoolSize: 核心线程数
    9,                  // maximumPoolSize: 最大线程数
    1024, TimeUnit.SECONDS,
    new SynchronousQueue<>() // workQueue: 任务队列
);
  

这段配置是为了响应优先”而设计的,有几个关键点需要深度解读:

1. 为什么使用 SynchronousQueue?

这是这段配置的灵魂所在。

  • 普通队列(如 LinkedBlockingQueue):当任务数量超过核心线程数时,新任务会被放入队列等待。这在追求吞吐量的后台批处理任务中没问题,但在追求低延迟的 Web 接口中是致命的。任务进队列意味着它在“排队”,这会增加接口的响应时间。
  • SynchronousQueue:这是一个容量为 0 的队列。它不存储任务。
    • 当一个任务提交时,如果有空闲线程,直接由空闲线程接手。
    • 如果没有空闲的核心线程,它不会缓冲任务,而是尝试创建新的线程(直到达到 maximumPoolSize)。
    • 效果:任务要么立即被执行,要么因为线程池满而被拒绝(抛出异常),绝对不会在队列里浪费时间排队。这正是“最快时间将结果响应给用户”的体现。

2. 核心线程数(6)与最大线程数(9)

  • corePoolSize = 6:你的业务场景有3个任务(库存、优惠、评论)。设置6意味着系统在负载较低时,完全有能力利用核心线程直接处理并发请求,无需创建新线程的开销。
  • maximumPoolSize = 9:结合 SynchronousQueue,这意味着系统同一时刻最多只能处理 9 个并发任务。
    • 注意:如果第 10 个任务同时到来,因为队列不缓冲,且线程数已达上限,线程池会执行拒绝策略(默认抛出 RejectedExecutionException)。在生产环境中,这里需要根据压测结果谨慎设置,通常会调大这个数值,或者配合降级策略。

三、 代码执行流程深度解析

我们来看看代码具体是怎么跑的:

  1. 提交任务 (submit): code Javadownloadcontent_copyexpand_less Future<Object> future = threadPoolExecutor.submit(Main::getProductInventory);
    • 主线程将任务丢给线程池,线程池分配一个线程去执行 getProductInventory。
    • 关键点:submit 方法是非阻塞的,它会立刻返回一个 Future 对象。主线程并没有等待,而是立刻往下走,去提交第二个、第三个任务。
    • 此时,3个子线程正在服务端并行“跑”着。
  2. 获取结果 (result.get()): code Javadownloadcontent_copyexpand_less for (Future<Object> result : results) { result.get(); // 阻塞点 }
    • 这是一个同步屏障(Barrier)
    • 主线程遍历 Future 列表。当调用 future.get() 时,如果该任务还没运行完,主线程会阻塞在这里等待。
    • 时间分析
      • 假设遍历顺序是:库存(50ms) -> 优惠(80ms) -> 评论(50ms)。
      • 主线程等待库存:等待50ms。此时优惠任务已经跑了50ms,还剩30ms。
      • 主线程等待优惠:再等待30ms(因为前50ms是重叠的)。此时总耗时80ms。
      • 主线程等待评论:评论任务耗时50ms,早在主线程等优惠的时候就已经结束了,所以这里立马返回结果,不耗时。
    • 最终耗时:就是最慢的那个任务(80ms)加上极少量的上下文切换开销。

四、 生产环境的进阶建议(通透版)

虽然上面的代码演示了原理,但在真实的生产级代码(如阿里的双11场景)中,还有几个大坑需要填:

1. 必须设置超时时间 (Timeout)

你现在的代码是 result.get(),这是无限等待

  • 风险:如果“获取优惠信息”的服务挂了,或者网络卡顿导致一直不返回,你的主线程就会一直卡死在这里,最终导致整个 Web 容器线程耗尽,服务雪崩。
  • 修正
// 最多等 100ms,超时就抛异常,执行降级逻辑(比如返回无优惠) 
result.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // 记录日志,返回默认值 return new DefaultPromotion(); }

2. 使用 CompletableFuture (Java 8+)

Future 和 ThreadPoolExecutor 是 Java 5 的产物。现代 Java 开发通常使用 CompletableFuture,它功能更强大,代码更优雅。

优化后的代码示例:

    public static void main(String[] args) {
    long startTime = System.currentTimeMillis();

    // 1. 定义任务
    CompletableFuture<Object> task1 = CompletableFuture.supplyAsync(Main::getProductInventory, threadPoolExecutor);
    CompletableFuture<Object> task2 = CompletableFuture.supplyAsync(Main::getProductPromotions, threadPoolExecutor);
    CompletableFuture<Object> task3 = CompletableFuture.supplyAsync(Main::getProductReviews, threadPoolExecutor);

    // 2. 等待所有任务完成 (allOf)
    CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

    // 3. 阻塞直到所有完成(或者设置超时)
    try {
        allTasks.get(200, TimeUnit.MILLISECONDS); 
        
        // 4. 获取结果(此时 join 不会阻塞,因为已经确认完成了)
        Object inventory = task1.join();
        Object promotion = task2.join();
        Object review = task3.join();
        
    } catch (Exception e) {
        // 异常处理逻辑
    }

    System.out.println("总耗时:" + (System.currentTimeMillis() - startTime));
}
  

3. 线程池参数的计算 (IO密集型)

  • 你的任务全是 Thread.sleep,模拟的是网络 I/O。这是典型的 IO密集型 任务。
  • 公式:线程数 = CPU核数 * (1 + 线程等待时间 / 线程CPU时间)。
  • 因为等待时间(80ms)远大于CPU计算时间(可能不到1ms),所以你需要配置远大于 CPU 核数的线程池。如果你的机器是 4核,配置 6 或 9 个线程可能偏少(取决于并发量),生产环境可能需要几十甚至上百个线程来处理高并发的 IO 等待。

并行化(Parallelism)利用了多核 CPU 的优势,通过SynchronousQueue 避免了任务排队带来的额外延迟,通过调大线程数应对 IO 密集型的阻塞。这套组合拳是提升 Web 接口响应速度的标准范式。只要加上超时控制和异常降级,这就是一个生产级的解决方案。

快速处理批量任务

快速处理批量任务场景比较多,包括不限于以下举得例子:

  • 公司举办周年庆,需要给每个员工发送邮件说明。
  • 短信平台后台通过上传 Excel 给一批用户发送短信

主线程需要阻塞的等待所有任务执行完成,因此必须要尽可能减少等待时间,而前者的使用场景中则完全不需要考虑这个问题,因此我们可以设置一个合适的阻塞队列用来缓冲任务。

大量任务堆积线程池阻塞队列场景下,可能会遇到项目发布重启或者意外宕机等情况,进而导致任务丢失风险。

这是一个典型的高吞吐量、异步解耦”场景。

与上一个“商品详情页”场景(追求低延迟、用户在线等待结果)完全不同,批量发送短信属于后台任务或“Fire-and-Forget”(发送即不管)模式。我们不需要立即告诉用户“短信已发送成功”,只需要保证系统能吞下所有请求并最终处理完即可。

下面我将从核心设计理念、阻塞队列的作用、执行流程、以及与上一场景的对比四个维度,为你通过透彻讲解。


一、 核心设计理念:削峰填谷与解耦

1. 串行模式的瓶颈

在第一段代码中:

  • 逻辑:你拿着一份名单,给第一个人发短信(等50ms),发完后再给第二个人发。
  • 公式:总耗时 = 任务数 N * 50ms。
  • 问题:如果名单有 10,000 人,主线程将被阻塞 10000 * 0.05s = 500秒。在这 500 秒内,主线程无法做任何其他事情,且 CPU 利用率极低(大部分时间在等 I/O)。

2. 线程池+缓冲队列模式

在第二段代码中:

  • 逻辑:你是一个老板(主线程),手里有 10,000 个发短信的任务。你不需要自己去发,而是把这些任务全部扔到一个**巨大的收件箱(阻塞队列)**里。
  • 解耦:你扔任务的速度非常快(纳秒级),扔完你就可以去干别的了(比如响应前端“发送请求已受理”)。
  • 消费者:线程池里雇佣了 10 个工人(线程),他们盯着收件箱,一旦里面有信,就拿出来去发。

二、 为什么选择 LinkedBlockingQueue?

这是本场景与上一场景最大的区别。

1. 缓冲(Buffering)

  • 上一个场景(商品详情):我们用 SynchronousQueue(容量为0),因为用户在等,任务必须立即处理,处理不过来就报错,不能让用户干等。
  • 本场景(批量短信):我们用 LinkedBlockingQueue(有界队列)。
    • 流量洪峰:假设瞬间来了 10,000 个请求,但线程池只有 10 个线程,一秒钟只能处理 10 * (1000/50) = 200 个。
    • 蓄水池作用:剩下的 9,800 个任务不会被丢弃,也不会报错,而是乖乖在队列里排队。
    • 削峰填谷:队列像一个水库,把瞬间的高并发请求存起来,让 10 个线程以稳定的速度慢慢消化。

2. 生产者-消费者模式

  • 生产者(Main线程):生产速度极快(循环塞入队列)。
  • 消费者(线程池):消费速度受限于网络 I/O(50ms 一个)。
  • 阻塞队列:是两者之间的缓冲区,保证了生产者不会因为消费者慢而被拖死。

三、 线程池配置深度解析

    new ThreadPoolExecutor(
    10,  // corePoolSize
    10,  // maximumPoolSize
    1024, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100000) // 巨大的缓冲队列
);
  

1. 固定线程数(Fixed Size)

这里核心线程数和最大线程数都是 10。

  • 原因:这是一种资源管控策略。发送短信是 IO 密集型任务,但由于不需要实时响应,我们没必要为了瞬间的流量去创建几百个线程(线程切换和资源占用也是成本)。我们希望机器以一个稳定的负载长期运行,慢慢处理队列里的积压任务。

2. 队列容量 (100,000)

  • 有界队列的重要性:虽然用无界队列(不传参数默认是 Integer.MAX_VALUE)看似省事,但在生产环境极其危险。如果任务堆积过多,会撑爆内存导致 OOM(Out Of Memory)
  • 设置 10万:意味着系统能容忍 10万条短信的积压。如果超过 10万,才会触发拒绝策略(抛出异常或由调用者运行)。

四、 代码执行流程透视图

  1. 提交阶段
    • 主线程执行 for 循环。
    • threadPoolExecutor.execute(...):这个方法是非阻塞的(除非队列满了)。
    • 任务被封装成 Runnable 对象,瞬间被塞入 LinkedBlockingQueue 的尾部。
    • 循环结束后,主线程任务完成(此时短信可能一条都没发出去,但任务都在队列里了)。
  2. 执行阶段(异步)
    • 线程池中的 10 个核心线程处于活跃状态。
    • 它们不断轮询 LinkedBlockingQueue 头部。
    • 线程 A 拿到一个任务 -> 调用 sendPhoneSms -> 睡眠 50ms -> 任务结束 -> 回到队列取下一个。
    • 10 个线程并行工作,整体处理能力是单线程的 10 倍。

五、 生产环境的隐患与进阶(通透版)

虽然这种方式比串行强得多,但在企业级开发中,直接用内存队列(LinkedBlockingQueue)做批量任务有致命弱点:数据丢失风险

1. 宕机丢数据

  • 场景:主线程把 10,000 个任务塞进了队列。此时系统才处理了 100 个,突然服务器断电或进程崩溃(Kill -9)。
  • 后果:内存中的队列瞬间消失。剩下的 9,900 个用户永远收不到短信,而且你还没法恢复(因为没有持久化)。

2. 优雅关闭(Graceful Shutdown)

如果你要重启服务,不能直接杀进程。必须保证队列里的任务发完。

    // 停止接收新任务
threadPoolExecutor.shutdown(); 
try {
    // 等待现有任务执行完(包括队列里的)
    if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
        threadPoolExecutor.shutdownNow(); // 超时强制关闭
    }
} catch (InterruptedException e) {
    threadPoolExecutor.shutdownNow();
}
  

3. 架构演进:使用消息队列 (MQ)

对于重要的通知类业务,生产环境通常不会只用线程池队列,而是引入中间件(如 RabbitMQ, RocketMQ, Kafka)。

  • 流程:Main线程 -> 发送消息到 MQ (持久化到磁盘) -> 消费者服务监听 MQ -> 提交给线程池执行。
  • 优势:即使服务挂了,消息还在 MQ 里。服务重启后,继续从 MQ 拉取消费,保证消息不丢失

总结对比

特性上一场景(商品详情)本场景(批量短信)
目标低延迟 (Latency) <br> 用户要立刻看到结果高吞吐 (Throughput) <br> 系统要在后台慢慢处理完
队列选择SynchronousQueue <br> (不缓冲,立即执行)LinkedBlockingQueue <br> (大容量缓冲,慢慢消化)
线程策略Core 小,Max 大 <br> (弹性伸缩,应对突发流量)Core = Max <br> (固定工人数,稳定输出)
关键点Future.get() 阻塞等待结果execute() 提交即走,不关心结果

你所展示的代码,正是利用了阻塞队列的缓冲能力,实现了任务提交与执行的解耦,完美适用于对实时性要求不高、但数据量较大的批处理场景。

线程池底层原理

核心在于ThreadPoolExecutor的三大组件:

  • 工作线程(Worker)实现任务执行与复用
  • 阻塞队列缓冲任务洪峰(它扮演者生产者消费者模型中缓冲区的角色。工作线程将会不断的从队列中获取并执行任务。)
  • 位运算变量(ctl)统一管控状态与线程数(线程池本体负责维护运行状态、管理工作线程以及调度任务。)

狭义上指的是 ThreadPoolExectutor 及其子类,而广义上则指整个 Executor 大家族,下面是从上到下所有内容;JDK 基于 ThreadPoolExecutor 实现了 ScheduledThreadPoolExecutor 用于支持任务调度。Tomcat 基于 ThreadPoolExecutor 实现了一个同名的线程池,用于处理 Web 请求。Spring 基于 ExecutorService 接口提供了一个 ThreadPoolTaskExecutor 实现,它仍然基于内置的 ThreadPoolExecutor 运行,在这个基础上提供了不少便捷的方法。

  • Executor:整个体系的最上级接口,定义了 execute 方法。
  • ExecutorService:它在 Executor 接口的基础上,定义了 submit、shutdown 与 shutdownNow 等方法,完善了对 Future 接口的支持。
  • AbstractExecutorService:实现了 ExecutorService 中关于任务提交的方法,将这部分逻辑统一为基于 execute 方法完成,使得实现类只需要关系 execute 方法的实现逻辑即可。
  • ThreadPoolExecutor:线程池实现类,完善了线程状态管理与任务调度等具体的逻辑,实现了上述所有的接口。

  • 如果当前工作线程数小于 核心线程数,则启动一个工作线程 执行任务。
  • 如果当前工作线程数大于等于 核心线程数,且阻塞队列未满 ,则将任务添加到阻塞队列
  • 如果当前工作线程数大于等于 核心线程数,且阻塞队列已满 ,则启动一个工作线程 执行任务。
  • 如果当前工作线程数已达最大值 ,且阻塞队列已满,则触发拒绝策略。

工作线程启动后重复下面逻辑

  1. 通过 getTask 方法从工作队列中获取任务,如果拿不到任务就阻塞一段时间,直到超时或者获取到任务。如果成功获取到任务就进入下一步,否则就直接进入线程退出流程;
  2. 调用 Workerlock 方法加锁,保证一个线程只被一个任务占用;
  3. 调用 beforeExecute 回调方法,随后开始执行任务,如果在执行任务的过程中发生异常则会被捕获;
  4. 任务执行完毕或者因为异常中断,此后调用一次 afterExecute 回调方法,然后调用 unlock 方法解锁;
  5. 如果线程是因为异常中断,那么进入线程退出流程,否则回到步骤 1 进入下一次循环。

参数

public ThreadPoolExecutor 类一共提供了四个构造方法,告诉我们有如下参数

int corePoolSize, // 核心线程数,长期存在并处理工作
int maximumPoolSize, // 最大线程数,核心线程已满,工作队列已满,同时线程池中线程总数未超过最大线程数,会创建非核心线程
long keepAliveTime, // 非核心线程闲置存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 创建线程使用的线程工厂
RejectedExecutionHandler handler // 拒绝策略工作队列已满,且线程池中线程数已经达到最大线程数时,执行的兜底策略

工作线程 Worker

ThreadPoolExecutor 中,每个工作线程都对应的一个内部类 Worker,它们都存放在一个 HashSet

private final HashSet<Worker> workers = new HashSet<Worker>();
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable {
​
    // 线程对象
    final Thread thread;
    // 首个执行的任务,一般执行完任务后就保持为空
    Runnable firstTask;
    // 该工作线程已经完成的任务数
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
        // 默认状态为 -1,禁止中断直到线程启动为止
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
​
    public void run() {
        runWorker(this);
    }
}

Worker 本身实现了 Runnable 接口,创建一个 Worker 实例时,构造函数会通过我们在创建线程池时指定的线程工厂创建一个Thread对象,并把当前的Worker对象作为一个Runnable绑定到线程里面 。当调用它的 run 方法时,它会通过调用线程池的 runWorker反过来启动线程,此时 Worker 就开始运行了。

Worker 类继承了 AbstractQueuedSynchronizer通过AQS的同步机制来保证对工作线程的访问是线程安全

主锁 mainLock

线程池直接使用一个 HashSet 来存储 Worker 示例,而 HashSet 本身却并非线程安全的,那在并发场景下要如何保证线程安全呢?实际上,除了 workers 以外,线程池中还有大量非线程安全的变量,这里再举几个例子:

  • ctl:记录线程池状态与工作线程数。
  • largestPoolSize/corePoolSize:最大/核心工作线程数。
  • completedTaskCount:已完成任务数。
  • keepAliveTime:核心线程超时时间。

这些变量实际上环环相扣,因此很难通过分别将它们改为原子变量/并发容器来保证线程安全 ,因此 ThreadPoolExecutor 选择为整个线程池提供一把主锁 mainLock,每次操作或读取这种全局性变量的时候,都需要获取主锁才能进行:

privatefinalReentrantLock mainLock =newReentrantLock();

比如获取当前工作线程数的时候:

public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 如果线程已经开始停机,则返回 0,否则返回工作线程数量
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}

线程池通过mainLock来保证全局配置的线程安全,而每个工作线程再通过AQS来保证工作线程自己的线程安全。

状态控制

1. ctl

AtomicInteger 类型的成员变量 ctl ,它是 control 的缩写,线程池分别通过 ctl 的高位低位来管理两部分状态信息:

  • 第一部分为高 3 位,用来记录线程池当前的运行状态。
  • 第二部分为低 29 位,用来记录线程池中的工作线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
​
​
// 29(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
​
// ======== 线程数相关常量 ========
// 允许的最大工作线程(2^29-1 约5亿)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
​
// ======== 线程状态相关常量 ========
// 运行状态。线程池接受并处理新任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 关闭状态。线程池不能接受新任务,处理完剩余任务后关闭。调用shutdown()方法会进入该状态。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 停止状态。线程池不能接受新任务,并且尝试中断旧任务。调用shutdownNow()方法会进入该状态。
private static final int STOP       =  1 << COUNT_BITS;
// 整理状态。由关闭状态转变,线程池任务队列为空时进入该状态,会调用terminated()方法。
private static final int TIDYING    =  2 << COUNT_BITS;
// 终止状态。terminated()方法执行完毕后进入该状态,线程池彻底停止。
private static final int TERMINATED =  3 << COUNT_BITS;