动态线程池oneThread系统 — 第七部分 阻塞队列更新

eve2333 发布于 8 天前 16 次阅读


阻塞队列是对队列的衍生,支持阻塞的插入方法:队列容量满时,插入元素线程会被阻塞,直到队列有多余容量为止;支持阻塞的移除方法:当队列中无元素时,移除元素的线程会被阻塞,直到队列有元素可被移除

文章以 LinkedBlockingQueue 为例。是一个以 单向链表组成的队列

如图所示是继承了上面的东西,以自上而下的方式,先分析一波 Queue 接口里都定义了哪些方法

// 如果队列容量允许,立即将元素插入队列,成功后返回
// 🌟如果队列容量已满,则抛出异常
boolean add(E e);

//  如果队列容量允许,立即将元素插入队列,成功后返回
// 🌟如果队列容量已满,则返回 false
// 当使用有界队列时,offer 比 add 方法更何时
boolean offer(E e);

// 检索并删除队列的头节点,返回值为删除的队列头节点
// 🌟如果队列为空则抛出异常
E remove();

// 检索并删除队列的头节点,返回值为删除的队列头节点
// 🌟如果队列为空则返回 null
E poll();

// 检查但不删除队列头节点
// 🌟如果队列为空则抛出异常
E element();

// 检查但不删除队列头节点
// 🌟如果队列为空则返回 null
E peek();

Queue 接口的方法,分为三个大类:

  1. 新增元素到队列容器中:add、offer
  2. 从队列容器中移除元素:remove、poll
  3. 查询队列头节点是否为空:element、peek

BlockingQueue 接口继承自 Queue 接口,所以有些语义相同的 API 接口就没有放上来解读

// 将指定元素插入队列,如果队列已满,等待直到有空间可用;通过 throws 异常得知,可在等待时打断
// 🌟相对于 Queue 接口而言,是一个全新的方法
void put(E e) throws InterruptedException;

// 将指定元素插入队列,如果队列已满,在等待指定的时间内等待腾出空间;通过 throws 异常得知,可在等待时打断
// 🌟相当于是 offer(E e) 的扩展方法
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

// 检索并除去此队列的头节点,如有必要,等待直到元素可用;通过 throws 异常得知,可在等待时打断
take() throws InterruptedException;

// 检索并删除此队列的头,如果有必要使元素可用,则等待指定的等待时间;通过 throws 异常得知,可在等待时打断
// 🌟相当于是 poll() 的扩展方法
poll(long timeout, TimeUnit unit) throws InterruptedException;

// 返回队列剩余容量,如果为无界队列,返回 Integer.MAX_VALUE
int remainingCapacity();

// 如果此队列包含指定的元素,则返回 true
public boolean contains(Object o);

// 从此队列中删除所有可用元素,并将它们添加到给定的集合中
int drainTo(Collection<? super E> c);

// 从此队列中最多移除给定数量的可用元素,并将它们添加到给定的集合中
int drainTo(Collection<? super E> c, int maxElements);

可以看到 BlockingQueue 接口中个性化的方法还是挺多的。

再看 LinkedBlockingQueue 的源码;为了保证并发添加、移除等操作,使用了 JUC 包下的 ReentrantLock、Condition 控制

// take, poll 等移除操作需要持有的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列没有数据时,删除元素线程被挂起
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等新增操作需要持有的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列为空时,添加元素线程被挂起
private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue(ABQ)内部元素个数字段为什么使用的是 int 类型的 count 变量?不担心并发么

  1. 因为 ABQ 内部使用的一把锁控制入队、出队操作,同一时刻只会有单线程执行 count 变量修改
  2. LBQ 使用的两把锁,所以会出现两个线程同时修改 count 数值,如果像 ABQ 使用 int 类型,两个流程同时执行修改 count 个数,会造成数据不准确,所以需要使用并发原子类修饰

为什么要用原子类统计数量?

原子类,老生常谈的,官方说发是解决并发场景下无锁的方式保证单一变量的数据一致性多个线程同时读写同一个共享数据时存在多线程并发问题;解决并发安全问题的方式有很多种方式, 著名的就是 JDK 并发包 concurrent, 为了并发而存在的

这个就比如循环叠加一个数字,在多线程情况下可能会出现少加的情况

可以使用 JDK 自带的 synchronized, 通过 互斥锁的方式 同步执行 NUM++ 这个代码

不使用锁来解决上面的非原子自增问题, 可以这么来

AtomicInteger 是 JDK 并发包下提供的 操作 Integer 类型原子类, 通过调用底层 Unsafe 的 CAS 相关方法实现原子操作;基于乐观锁的思想实现的一种无锁化原子操作, 保障了多线程情况下单一变量的线程安全问题

硬件级别的指令避免加锁,还有互斥锁在synchronized 在并发比较严重情况下, 会将锁 升级到重量级锁,避免用户态到内核态转变消耗太多时间;

AtomicInteger 有两个构造方法, 分别是一个无参构造及有参构造

  • 无参构造的 value 就是 int 的默认值 0
  • publicAtomicInteger() { }
  • 有参构造会将 value 赋值
  • publicAtomicInteger(int initialValue) {
    value = initialValue;
    }

AtomicInteger 有三个重要的变量, 分别是:

  • Unsafe: 可以理解它对于 Java 而言, 是一个 "BUG" 的存在, 在 AtomicInteger 里的最大作用就是直接操作内存进行值替换
  • value: 使用 int 类型存储 AtomicInteger 计算的值, 通过 volatile 进行修饰, 提供了内存可见性及防止指令重排序
  • valueOffset: value 的内存偏移量

常用API, 核心实现思路都是一致的, 会着重讲getAndIncrement() // 获取当前值, 并进行自增1

public final int getAndIncrement() {
   return unsafe.getAndAddInt(this, valueOffset, 1);
}

/**
 * unsafe.getAndAddInt
 *
 * @param var1 AtomicInteger 对象
 * @param var2 value 内存偏移量
 * @param var4 增加的值, 比如在原有值上 + 1
 * @return
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        // 内存中 value 最新值
        var5 = this.getIntVolatile(var1, var2);
    } while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

精髓的地方就在于 compareAndSwapInt(...)

/**
 * 比较 var1 的 var2 内存偏移量处的值是否和 var4 相等, 相等则更新为 var5
 *
 * @param var1 AtomicInteger 对象
 * @param var2 value 内存偏移量
 * @param var4 value 原本的值
 * @param var5 期望将 value 设置的值
 * @return
 */
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

由于是 native 关键字修饰, 我们无法查看其源码, 说明一下方法思路

1、通过 var1(AtomicInteger) 获取到 var2 (内存偏移量) 的 value 值

2、将 value(内存中值) 与 var4(线程内获取的value值) 进行比较

3、如果相等将 var5(期望值) 设置为内存中新的 value 并返回 True

4、不想等返回 False 继续尝试执行循环

要理解这段代码,首先要明白它背后的指导思想。

  • synchronized (悲观锁): 假设一定会发生冲突。我去拿数据的时候,先把门锁上,你们谁都别进,等我改完了,我再开门。
    • 代价: 线程挂起、上下文切换,性能开销大。
  • CAS (乐观锁): 假设不会发生冲突。我直接去拿数据,改的时候我再看一眼:“现在的内存值是不是还是我刚才拿到的那个值?”
    • 如果是:说明没人捣乱,我直接修改。
    • 如果不是:说明有人插队改过了,那我这次修改失败,我重新获取最新值,再试一次(这就是自旋)。

getAndAddInt 就是典型的乐观锁实现。


二、 源码深度拆解

我们将你提供的代码拆解为三个关键动作:

1. 获取内存地址 (Setup)

// 在 AtomicInteger 初始化时就已经计算好了
long valueOffset = unsafe.objectFieldOffset
    (AtomicInteger.class.getDeclaredField("value"));
  • 通透理解: CAS 操作的是内存。Java 无法直接操作内存,但 Unsafe 类可以通过“偏移量”直接找到对象在堆内存中的具体位置。valueOffset 就是 value 这个字段在 AtomicInteger 对象内存中的“门牌号”。

2. 自旋循环 (The Spin)

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        // 动作 A: 获取快照
        var5 = this.getIntVolatile(var1, var2); 
    } while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); // 动作 B: 比较并交换
    return var5;
}
  • 动作 A (getIntVolatile): 去内存那个“门牌号”里看一眼,现在的 value 是多少?记下来,记作 var5(预期值)。
    • 注意: 这里用了 Volatile,保证了可见性,看到的一定是主内存中最新的值。
  • 动作 B (compareAndSwapInt): 这是原子操作。
    • 它拿着 var5(我刚看到的旧值)和内存里现在真正的值比对。
    • CAS 的核心台词: “内存兄,我刚才看你是 0 (var5),如果你现在还是 0,请帮我改成 1 (var5+var4);如果你已经不是 0 了(被别人改了),那就别动,告诉我失败。”

3. 循环的意义

  • while(!...):如果 CAS 返回 false(失败了),取反变成 true,循环继续。
  • 再次循环时: 重新执行 getIntVolatile,拿到最新的值(比如被别人改成 1 了),这次我就基于 1 再尝试改成 2。
  • 直到成功: 直到没有人跟我抢,我成功修改,退出循环。

三、 图文场景的“上帝视角”

让我们用更生活化的例子来重现你提到的图文分析:

场景: 此时内存里 value = 0

  1. 线程 A 进场:
    • 执行 getIntVolatile,拿到 var5 = 0
    • 此时,线程 A 被 CPU 挂起了(这种情况很常见)。
  2. 线程 B 进场:
    • 执行 getIntVolatile,拿到 var5 = 0
    • 执行 compareAndSwap(0, 1)
    • 检查: 内存确实是 0,匹配成功!
    • 修改: 内存变为 1。
    • 返回: True,线程 B 欢天喜地地走了。
  3. 线程 A 醒了:
    • 继续执行 compareAndSwap(0, 1)
    • 检查: 傻眼了,内存里现在是 1,不是我预期的 0。
    • 结果: 比较失败,不准修改,返回 False。
  4. 线程 A 的“重试”(自旋):
    • while 循环生效。
    • 重新 getIntVolatile,这次拿到 var5 = 1
    • 执行 compareAndSwap(1, 2)
    • 检查: 内存是 1,匹配成功。
    • 修改: 内存变为 2。
    • 结束。

四、 底层到底发生了什么?(Native)

你提到了 native 方法无法看源码,但我们要通透,就得知道操作系统层面干了啥。

在 HotSpot 虚拟机中,compareAndSwapInt 最终会对应到 CPU 的汇编指令。
在 x86 架构下,这条指令通常是:

lock cmpxchg ...
  1. cmpxchg (Compare and Exchange): 这是一个 CPU 指令,硬件层面保证了“比较”和“交换”这两个动作是不可拆分的。要么全做,要么全不做。
  2. lock 前缀: 在多核 CPU 下,为了防止多个 CPU 同时执行 cmpxchg 导致乱序,会加上 lock 前缀。它会锁定总线(或利用缓存一致性协议 MESI),确保同一时刻只有一个 CPU 能修改这块内存。

结论:CAS 的原子性是由 CPU 硬件指令保证的。


五、 缺陷的深层次解读

你提到的不足之处非常精准,这里做进一步的扩展:

1. CPU 开销 (The Spin Lock Problem)

  • 现象: 如果有 1000 个线程同时竞争一个变量,同一时刻只有一个能成功,剩下 999 个都在 do-while 里狂转。
  • 后果: CPU 使用率飙升,但没干正事(都在重试)。
  • 解决:LongAdder
    • 原理: 分散热点。既然大家抢一个 value 此时太挤,那我就搞一个数组 Cell[]。线程 A 抢 Cell[0],线程 B 抢 Cell[1]… 最后取值的时候把数组里的值加起来。这就是空间换时间

2. ABA 问题 (The "Ex-Girlfriend" Problem)

  • 通俗解释:
    • 你有一个前女友(value = A)。
    • 你离开了一会儿。
    • 她找了个新男朋友(value 变成了 B)。
    • 她又和新男朋友分手了,恢复了单身(value 又变回了 A)。
    • 你回来了,看她还是单身(value = A),你以为她没变过,于是复合了。
    • 问题: 虽然结果看起来没变,但过程中状态已经发生过变化,这在某些业务逻辑中是致命的(比如栈操作、内存回收等)。
  • 解决:AtomicStampedReference
    • 原理: 加版本号。
    • 不光比对值,还要比对版本号。
    • A(v1) -> B(v2) -> A(v3)。
    • 你回来时拿着 A(v1) 去比对 A(v3),发现版本不对,CAS 失败。

getAndIncrement 的源码本质就是:

  1. 取值(Volatile 保证可见性)
  2. 比对并交换(CAS 硬件指令保证原子性)
  3. 失败则重试(自旋保证最终一致性)

CAS 虽然能够实现无锁编程, 在一般情况下对性能做出了提升, 但是并不是没有局限性或缺点

1、CPU 自旋开销较大

在高并发情况下, 自旋 CAS 如果长时间不成功, 会给 CPU 带来非常大的执行开销

如果是实现高并发下的计数, 可以使用 LongAdder, 设计的高并发思想真的强!

2、著名的 "ABA" 问题

CAS 需要在操作值的时候检查下值有没有发生变化, 如果没有发生变化则更新

但是如果一个值原来是A, 变成了B, 又变成了A, 那么使用 CAS 进行检查时会发现它的值没有发生变化, 但是实际上却变化了

如果感兴趣的小伙伴可以去看下 JUCA 原子包下的 AtomicStampedReference


马哥有介绍了相关源码的结构体

// 绑定的容量,如果无界,则为 Integer.MAX_VALUE
private final int capacity;
// 当前队列中元素个数
private final AtomicInteger count = new AtomicInteger();
// 当前队列的头节点
transient Node<E> head;
// 当前队列的尾节点
private transient Node<E> last;
看到 head 和 last 元素,是不是对 LBQ 就有个大致的雏形了,这个时候还差一个结构体 Node

static class Node<E> {
    // 节点存储的元素
    E item;
    // 当前节点的后继节点
    LinkedBlockingQueue.Node<E> next;

    Node(E x) { item = x; }
}

构造器分析

这里画一张图来理解下 LBQ 默认构造方法是如何初始化队列的

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

说实在的马哥写的确实透彻详细,包你能够看懂的;

可以看出,默认构造方法会将容量设置为 Integer.MAX_VALUE,也就是大家常说的无界队列

内部其实调用的是重载的有参构造,方法内部设置了容量大小,以及初始化了 item 为空的 Node 节点,把 head last 两节点进行一个关联;

我就不干拾人牙慧这种破事了,详尽信息回到原文查看吧,看完必定懂得

1.1w字,10图彻底掌握阻塞队列(并发必备)

jdk里面的阻塞队列如下

如何实现阻塞队列热更新?

1. 阻塞队列不支持更新容量

在日常线程池调优过程中,我们可能会遇到一个真实的问题:

队列被塞满了,线程池也跑满了,但我又不能轻易重启服务,只是想把队列容量调大点,临时抗一波压力,有没有办法?

如果使用的是 LinkedBlockingQueue,可能会发现它的容量是固定的,根本不支持动态调整 ——这就是我们今天要讲的问题。

LinkedBlockingQueue 阻塞队列的容量是 int 类型,如果不设置容量默认是 Integer.MAX_VALUE,设置容量后,因为字段类型是 final,是没有办法变更容量的。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

}

2. 动态变更阻塞队列场景

讲具体原理前,还是要和大家聊一下,阻塞队列动态变更是否有用?

如果阻塞队列容量固定,遇到访问量超过预期,可能很快就会被打满,造成任务拒绝。而如果能在运行时动态调大阻塞队列容量 ,就能临时缓解系统压力,避免雪崩。

举例:默认线程池队列容量为 1000。在访问激增或黑产攻击爆发时,通过管理平台将队列扩容至 10000,有效缓冲流量高峰,避免拒绝关键任务。

image-20250720171346619.png

那可能有同学问了:为什么不直接调大线程池,而是调整阻塞队列容量?这就涉及到两者调整的预期目标:

  • 调大线程 :提高并发执行能力,前提是底层资源能够承受这么大的并发。要不然出问题就是雪崩了。
  • 调大阻塞队列容量 :增强任务缓冲能力(更多任务排队),可能会慢处理,但是会最终处理。属于是通过空间换高可用一种方案。
调参方式核心作用风险/代价适用场景
增加线程池大小增加并发处理能力高:CPU竞争、内存压力、线程切换开销CPU 不敏感、I/O 密集型或延迟敏感场景
增加队列容量增加任务缓冲能力低:只是任务排队变多,但延迟可能增加弹性应对流量突发、避免任务被拒绝或丢失

这里额外提一下,线程不是越多越好,线程切换、上下文切换、内存开销都是真金白银的成本。

  • 每个线程需要占用栈空间(默认 1MB~2MB),线程过多可能直接 OOM;
  • 大量线程争抢 CPU,会频繁发生上下文切换,吞吐反而下降
  • 对 I/O 密集型系统还好,但对 CPU 密集型任务,线程数一多性能就崩。

线程池中任务的调度遵循的是“先执行,后排队”的策略,但一旦队列排满,才会创建新线程。因此:

  • 想要低延迟 响应(少排队),可以增加线程数
  • 想要高吞吐 或防止拒绝任务 ,可以加大队列容量
  • 想要限流保护主系统 ,反而要限制队列大小+设置拒绝策略

所以调参的前提一定是基于系统业务模型:

目标建议调参方式
提高并发处理速度增加线程数(有限度)
增加任务缓冲能力增加队列容量
限制资源使用、防雪崩缩小线程数和队列容量,合理配置拒绝策略

3. 初步方案:反射修改字段实现扩容

实现阻塞队列热更新方式其实有很多,我们先通过一个想的比较简单版本抛砖引玉下:继承 JDK 的 LinkedBlockingQueue,然后通过反射修改其 capacity 字段。

iShot_2025-07-19_13.07.25.png

这个类创建在了 core 包的单元测试目录,代码如下所示:

import lombok.extern.slf4j.Slf4j;

/**
 * 支持动态修改容量的阻塞队列实现
 *
 * <p>说明:JDK 原生 LinkedBlockingQueue 的 capacity 字段为 final,
 * 无法直接变更,因此通过反射方式动态调整
 */
@Slf4j
public class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    public ResizableCapacityLinkedBlockingQueue(int capacity) {
        super(capacity);
    }

    /**
     * 动态设置队列容量
     *
     * @param newCapacity 新的容量值
     * @return 设置是否成功
     */
    public boolean setCapacity(Integer newCapacity) {
        if (newCapacity == null || newCapacity <= 0) {
            log.warn("非法容量值: {}", newCapacity);
            return false;
        }

        try {
            // 通过反射修改 final 字段
            ReflectUtil.setFieldValue(this, "capacity", newCapacity);
            log.info("成功修改阻塞队列容量为: {}", newCapacity);
            return true;
        } catch (Exception ex) {
            log.error("动态修改阻塞队列容量失败,newCapacity={}", newCapacity, ex);
            return false;
        }
    }
}

这个版本能不能实现阻塞队列容量动态变更?能。有没有问题?有。朴实无华的总结了下这个版本。

4. 反射方案的风险与改进方向

虽然实现起来不难,但这里有几个重要的点需要说明:

  • 高版本开启模块安全检查后,无法修改 final 字段;使用反射需要添加安全机制。
  • 修改的是字段,不影响队列中已有元素,也就是队列中已有元素不会丢失。只影响后续 put()offer() 的入队行为限制。
  • 修改 capacity 只影响后续 put() 和 offer() 的容量检查,不影响已有元素。但若队列已满,阻塞的 put() 线程可能不会自动唤醒,因为 LinkedBlockingQueue 依赖 notFull 条件变量通知。
  • 依赖 JDK 内部实现:反射方案直接操作 LinkedBlockingQueue 的私有字段 capacity,依赖其内部实现。如果 JDK 版本升级(如从 Java 8 到 17),字段名、锁机制或其他内部实现可能变更,导致代码失效。

反射方案存在的问题还是比较复杂的,有多块需要重点注意的点,这个咱们放到下节内容说明。

第12小节:阻塞队列容量热更新策略下的“坑”

RabbitMQ 的工作线程池会处理来自大量客户端的请求,通过动态扩容任务队列 ,防止因短期流量冲击造成系统雪崩。RabbitMQ 运行过程中,会监控如线程池活跃线程数、队列使用情况、任务阻塞率等指标,达到设置阈值自动进行扩容。客户端通过自动扩容这里大家有兴趣可以具体看看。

rocketmq和kafka呢?RabbitMQ 团队在面临同样需求时,选择了一个更优雅的做法:直接复制并修改LinkedBlockingQueue源码,做成可变容量版本

本章节我们先通过讲解反射带来的问题,然后再看 RabbitMQ 怎么做的。

反射

简单说,反射(Reflection)是 Java 提供的一种机制,允许程序在运行时检查和修改类、方法、属性的行为,甚至可以突破 private 和 final 的限制

(LBQ) 是一个标准的阻塞队列。它的容量(capacity)在构造函数中传入,并且被赋值给一个 final 变量(或者虽然不是final,但没有提供 setter 方法)。// JDK 源码片段
public class LinkedBlockingQueue … {
private final int capacity; // 注意,通常是 final 的,或者不可变
private final AtomicInteger count = new AtomicInteger();

}

反射的做法是:

  1. 获取 capacity 字段对象。
  2. field.setAccessible(true) 暴力破防。
  3. field.set(queue, newCapacity) 修改值。

除了大家常说的“反射性能差”(在 JDK 8+ 以后其实优化得不错了,这不是主要原因),核心原因在于并发安全(Thread Safety)和内存模型(JMM)

  • 破坏可见性(Visibility):
    capacity 字段在 LBQ 源码中通常不仅是 final 的,而且没有被 volatile 修饰。在 Java 内存模型中,线程对非 volatile 变量的修改,其他线程是不一定立即可见的。
    你通过反射在主线程改了容量,但生产者线程(Producer)和消费者线程(Consumer)可能还在用 CPU 缓存里的旧容量值进行判断。这会导致扩容失效,或者出现逻辑错乱。
  • 破坏原子性逻辑(Logic Atomicity):
    LBQ 内部有极其精密的锁配合(takeLock 和 putLock)。
    假设当前队列容量 10,已满(count=10)。
    • 线程 A(反射)正在把容量改为 20。
    • 线程 B(生产者)正在判断 count < capacity。
    • 如果反射修改没有任何锁保护,线程 B 可能在极其微妙的时间点读取到了不一致的状态,导致 Signal(唤醒)逻辑丢失,或者计数器错乱。
  • 破坏 Final 语义:
    JVM 对 final 字段有特殊的优化(重排序规则等)。强行通过反射修改 final 字段,属于未定义行为(Undefined Behavior),虽然在很多 JVM 上能跑通,但这是在“走钢丝”,极不稳定。

我们来看两个非常典型的问题场景:

1. 队列已满修改容量无效

当队列已满,调用线程正阻塞在 put() 方法上等待空位:

while(count.get()== capacity){
    notFull.await();// 阻塞在这里}

此时,我们通过反射动态将 capacity 从 10 修改为 100,期望 unblock 等待线程。但线程仍然阻塞在 notFull.await(),无法及时感知容量变化!

下方截图取自 LinkedBlockingQueue#put 方法:

image-20250720184805923.png

capacity 虽然被改大了,但线程已经卡在了 await() 上,如果没有人手动调用 signalNotFull(),它永远不会被唤醒。

⚠️ 注意:反射只改字段,不会自动触发条件变量通知(Condition#signal)

以下是一个测试用例,我放在了 core 模块 test 目录下,展示通过反射修改 LinkedBlockingQueue 容量的尝试及其问题:

publicclassResizableCapacityLinkedBlockingQueueV1Test{
​
    publicstaticvoidmain(String[] args)throwsException{ResizableCapacityLinkedBlockingQueueV1<String> queue =newResizableCapacityLinkedBlockingQueueV1<>(2);
​
        // 填充队列至满
        queue.put("Element 1");System.out.println("入队列成功,当前大小:"+ queue.size());
        queue.put("Element 2");System.out.println("入队列成功,当前大小:"+ queue.size());
​
        // 尝试添加第三个元素,预期阻塞ExecutorService executor =Executors.newSingleThreadExecutor();
        executor.submit(()->{try{System.out.println("尝试添加 Element 3,队列已满,线程将被阻塞");
                queue.put("Element 3");System.out.println("成功添加 Element 3,队列大小:"+ queue.size());}catch(InterruptedException e){System.out.println("添加 Element 3 失败");}});
​
        // 等待 2 秒,确保线程阻塞TimeUnit.SECONDS.sleep(2);
​
        // 通过反射修改容量try{
            queue.setCapacity(3);System.out.println("通过反射修改容量为:3");}catch(Exception e){System.out.println("反射修改容量失败:"+ e.getMessage());}
​
        // 等待 2 秒,观察是否成功添加TimeUnit.SECONDS.sleep(2);
​
        executor.shutdownNow();}}

⚠️ 注意:如果想在 IDEA 里跑这个单元测试,需要在 IDEA 单元测试中设置 VM 参数:

--add-opens java.base/java.util.concurrent=ALL-UNNAMED

我们期望的日志输出如下:

队列已满,当前大小:1
队列已满,当前大小:2
尝试添加 Element 3,队列已满,线程将被阻塞
18:51:28.979 [main] INFO com.nageoffer.onethread.core.executor.support.ResizableCapacityLinkedBlockingQueueV1 -- 成功修改阻塞队列容量为: 3
通过反射修改容量为:3
成功添加 Element 3,队列大小:3

理想很丰满,但是现实就比较骨感了,实际输出如下:

入队列成功,当前大小:1
入队列成功,当前大小:2
尝试添加 Element 3,队列已满,线程将被阻塞
18:26:34.881 [main] INFO com.nageoffer.onethread.core.executor.support.ResizableCapacityLinkedBlockingQueueV1 -- 成功修改阻塞队列容量为: 3
通过反射修改容量为:3
添加 Element 3 失败

2. 容量变小后无法阻塞

假设当前队列已存入 8 个元素,容量为 10。我们通过反射将容量缩小为 5,期望此后入队操作会被阻塞。

但你会发现:

queue.put(newTask());// 居然成功入队!

仍然可以成功插入——根本没阻塞!

这是因为队列当前 count.get() = 8,我们虽然把 capacity 改成了 5,但 JDK 的 put() 判断仍然是:

while(count.get()== capacity){
    notFull.await();}

此时 count.get() == 8capacity == 5,不相等,所以不会阻塞,直接bypass

队列实际元素数量已经超过了新容量,却还允许插入,这是违背预期的行为。属于是违反了容量限制 的本意。

测试用例如下所示:

publicclassResizableCapacityLinkedBlockingQueueV1Test2{
​
    publicstaticvoidmain(String[] args)throwsException{ResizableCapacityLinkedBlockingQueueV1<String> queue =newResizableCapacityLinkedBlockingQueueV1<>(10);for(int i =0; i <8; i++){
            queue.put("Element "+ i);System.out.println("入队列成功,当前大小:"+ queue.size());}
​
        // 通过反射修改容量try{
            queue.setCapacity(5);System.out.println("通过反射修改容量为:5");}catch(Exception e){System.out.println("反射修改容量失败:"+ e.getMessage());}
​
        ExecutorService executor =Executors.newSingleThreadExecutor();
        executor.submit(()->{try{System.out.println("尝试添加 Element 9,队列已满,线程将被阻塞");
                queue.put("Element 9");System.out.println("成功添加 Element 9,队列大小:"+ queue.size());}catch(InterruptedException e){System.out.println("添加 Element 9 失败");}});
​
        // 等待 2 秒,确保线程阻塞TimeUnit.SECONDS.sleep(2);
​
        executor.shutdownNow();
​
        System.out.println("🔍 最终队列元素数量:"+ queue.size());}}

单元测试执行日志如下所示:

入队列成功,当前大小:1
入队列成功,当前大小:2
入队列成功,当前大小:3
入队列成功,当前大小:4
入队列成功,当前大小:5
入队列成功,当前大小:6
入队列成功,当前大小:7
入队列成功,当前大小:819:11:58.834[main]INFOcom.nageoffer.onethread.core.executor.support.ResizableCapacityLinkedBlockingQueueV1-- 成功修改阻塞队列容量为:5
通过反射修改容量为:5
尝试添加 Element9,队列已满,线程将被阻塞
成功添加 Element3,队列大小:9
🔍 最终队列元素数量:9

LinkedBlockingQueue 使用 == 是因为 capacity 固定,队列满时 count 等于 capacity

相信大家对于使用反射所存在的问题都已经清楚了,那接下来我们着手看 RabbitMQ 是如何解决这个问题的。

RabbitMQ 如何解决热更新问题?

他们选择了一个最笨但最稳的办法:Copy & Paste & Modify

VariableLinkedBlockingQueuejava.util.concurrent.LinkedBlockingQueue 的一个克隆版本,扩展支持了运行时修改容量的能力 。与 JDK 原生实现不同,去掉了 capacity 的 final 修饰符,并添加了一个线程安全的 setCapacity 方法。它提供了一个公开的 setCapacity(int) 方法,允许我们在队列运行过程中动态调整其容量上限 ,而无需重建队列或重启线程池。关键在于修改容量时,必须同时持有 putLock 和 takeLock。这是 JDK 原生 LBQ 没做的事

public class VariableLinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // ......

}
  1. 完全的并发控制: 所有的锁逻辑都在掌控之中,不会出现内存可见性问题。
  2. 无黑魔法: 不依赖反射,不依赖 JVM 底层黑科技,代码可读性、可维护性极高。

同时,在保障容量动态更新的基础上,通过以下设计解决了动态容量调整的问题:

1. setCapacity() 中自动唤醒等待线程

VariableLinkedBlockingQueuesetCapacity 方法在修改容量后,主动触发 notFull.signalAll() 来唤醒阻塞线程:

publicvoidsetCapacity(int capacity){finalint oldCapacity =this.capacity;this.capacity = capacity;finalint size = count.get();if(capacity > size && size >= oldCapacity){signalNotFull();}}

当新容量大于当前队列大小(capacity > size)且队列在旧容量下已满或接近满(size >= oldCapacity),说明可能有线程因队列满而阻塞在 put 方法上。调用 signalNotFull() 唤醒这些线程,确保它们重新检查容量条件。

仅在 capacity > sizesize >= oldCapacity 时触发 signalNotFull(),避免无意义的信号开销。

2. put() 使用 >= 判断

LinkedBlockingQueueput 方法在队列满时阻塞:

while(count.get()== capacity){
    notFull.await();}

这里的条件是 count.get() == capacity,表示队列恰好满时才阻塞。VariableLinkedBlockingQueue 则使用:

while(count.get()>= capacity){
    notFull.await();}

使用 >= 是为了应对动态调整容量时的边界情况。如果在 put 操作期间,capacity 被动态缩小到小于当前 count(例如,队列有 1000 个元素,容量缩小到 500),count >= capacity 确保线程继续阻塞,直到队列元素数减少到新容量以下。

文末总结

VariableLinkedBlockingQueue 通过非 finalcapacity 字段、改进的 put 方法(count.get() >= capacity)和 setCapacity 的唤醒逻辑,解决了 LinkedBlockingQueue 无法动态调整容量的问题。其关键设计要点包括:

  • 动态容量支持 :使用非 final 字段,消除反射的兼容性和性能问题。
  • 健壮的阻塞逻辑put 方法使用 >= 条件,适应容量动态缩小场景。
  • 线程安全唤醒setCapacity 在扩容时触发 notFull.signalAll(),确保阻塞线程及时恢复。
  • 高兼容性 :不依赖 JDK 内部实现,适配多版本 JDK。

通过测试用例,我们验证了 VariableLinkedBlockingQueue 在队列满、动态扩容和线程恢复等场景下的正确性。对于需要动态调优的任务队列场景,VariableLinkedBlockingQueue 是一个更灵活、可靠的替代方案。

Q:马哥,为什么 RabbitMQ 叫 VariableLinkedBlockingQueue,咱们代码里是 ResizableCapacityLinkedBlockingQueue

实话实说,因为当时看美团的文章里叫的这个,严格说起来两个名字表达的意思是一样的,都是指支持运行时动态调整容量的 LinkedBlockingQueue,只是命名风格略有差异,后面也是延续叫下来了。

Q:明明动态线程池没有用到阻塞队列 put 方法,为什么要重写?

逻辑其实很简单:一旦你重写了某个类,就必须保证重写后的逻辑不会引入潜在错误。哪怕当前场景下没有用到相关方法,也不能放任问题存在。 原因在于,可变容量的阻塞队列是一个独立的组件,它并不仅限于动态线程池场景。

它可能会被单独使用,或者被他人在 oneThread 动态线程池的基础上进行二次开发。在这些情况下,很可能会用到 put 等方法。

所以我们能做、也必须做的,就是保证写出的代码本身没有 BUG,至于别人是否调用、在什么场景下使用,那是他们的事。