动态线程池oneThread系统 — 第十六部分 观察者模式重构配置动态刷新

eve2333 发布于 19 小时前 7 次阅读


这篇文章深入剖析了 oneThread 动态线程池框架中如何利用 观察者模式(Observer Pattern) 来解决架构中的模块解耦问题。这是一个非常经典的架构设计案例,特别是结合 Spring 的事件机制来实现,既优雅又实用。

为了让你透彻理解,我将内容拆解为四个部分进行详细讲解:

  1. 架构挑战与痛点:为什么要用观察者模式?
  2. 核心原理:观察者模式与 Spring 事件机制。
  3. 源码实现解析:oneThread 中是如何落地实现的?
  4. 扩展性优势:如何利用该模式轻松扩展新功能?

第一部分:架构挑战与痛点 —— 为什么要解耦?

在微服务架构或组件化开发中,跨模块通信一直是个难题。在 oneThread 框架中,这个问题尤为突出。

1. 模块结构现状

oneThread 的模块分为两类角色:

  • 发布者(配置中心):负责监听 Nacos、Apollo 等配置中心的变化(如 nacos-cloud-spring-boot-starter)。
  • 订阅者(具体组件):负责实际执行线程池的修改(如 web-spring-boot-starter 修改 Tomcat 线程池,或业务代码中的自定义线程池)。

2. 如果不使用观察者模式(直接依赖)

如果采用传统的直接调用方式,代码会变成这样:

// 配置中心模块的代码
public class AbstractDynamicThreadPoolRefresher {

    // 1. 必须直接依赖所有可能的下游组件
    @Autowired(required = false)
    private WebThreadPoolService webService; // Tomcat/Jetty线程池

    @Autowired(required = false) 
    private RocketMQThreadPoolService rocketMQService; // MQ线程池

    public void refreshConfig(String config) {
        // 2. 每次配置变更,都要显式调用每个组件的方法
        if (webService != null) {
            webService.update(config);
        }
        if (rocketMQService != null) {
            rocketMQService.update(config);
        }
        // 如果未来加了 Dubbo 线程池,这里还得改代码!
    }
}

3. 这种方式的致命缺陷

  1. 强耦合:配置中心模块必须“认识”所有下游组件。这就好比报社必须认识每一个读者的脸才能发报纸。
  2. 违反“开闭原则”:每当增加一种新的线程池类型(比如新增 Dubbo 支持),就必须修改配置中心的核心代码(refreshConfig 方法)。
  3. 包体积膨胀:为了编译通过,配置中心可能需要引入所有下游组件的依赖。

解决方案:我们需要一种机制,让配置中心只管“喊一声”配置变了,至于谁关心这个变化,由关心者自己去处理。这就是观察者模式


第二部分:核心原理 —— Spring 事件驱动模型

oneThread 并没有从零手写观察者模式(维护一个 List),而是巧妙利用了 Spring 容器原生的 事件机制(ApplicationEvent)

1. 角色映射

Spring 的事件机制天然就是观察者模式的实现:

观察者模式角色Spring 组件oneThread 中的实现职责
Subject (主题/被观察者)ApplicationEventPublisherAbstractDynamicThreadPoolRefresher负责解析配置,并“广播”变更事件。
Event (事件/消息)ApplicationEventThreadPoolConfigUpdateEvent承载数据的载体(包含了最新的线程池配置参数)。
Observer (观察者/监听者)ApplicationListenerWebThreadPoolRefreshListener监听特定事件,收到通知后执行具体的更新逻辑。
Event Bus (事件总线)ApplicationContextSpring 容器负责将事件路由给所有匹配的监听器。

2. 流程图解

  1. 配置变更:Nacos/Apollo 收到新配置。
  2. 解析与发布:Refresher 解析配置,调用 publishEvent() 发布事件。
  3. 自动路由:Spring 容器查找所有监听了 ThreadPoolConfigUpdateEvent 的 Bean。
  4. 各司其职:WebListener 更新 Tomcat,CommonListener 更新业务线程池。

第三部分:源码落地解析 —— 代码层面的精妙之处

让我们看看 oneThread 具体是如何写代码的。

1. 定义事件(信封)

首先,定义一个事件类,把配置信息包在里面。

// 继承 ApplicationEvent,使其成为 Spring 体系内的事件
public class ThreadPoolConfigUpdateEvent extends ApplicationEvent {

    @Getter
    @Setter
    private BootstrapConfigProperties bootstrapConfigProperties;

    public ThreadPoolConfigUpdateEvent(Object source, BootstrapConfigProperties properties) {
        super(source);
        this.bootstrapConfigProperties = properties;
    }
}

2. 发布事件(发信人)

在配置中心模块的基类中,解析完配置后,只做一件事:发布事件。它完全不知道谁会消费这个事件。

public abstract class AbstractDynamicThreadPoolRefresher {

    public void refreshThreadPoolProperties(String configInfo) {
        // 1. 解析配置文件(将 YAML/JSON 转为 Java 对象)
        BootstrapConfigProperties properties = parseConfig(configInfo);

        // 2. 【关键】发布事件!
        // 这一行代码实现了完全解耦。我只管发,不管谁收。
        ApplicationContextHolder.publishEvent(
            new ThreadPoolConfigUpdateEvent(this, properties)
        );
    }
}

3. 监听事件(收信人)

不同的模块实现各自的监听器。

场景 A:更新 Web 容器(Tomcat)线程池
这个类位于 web-spring-boot-starter 包中。

@RequiredArgsConstructor
public class WebThreadPoolRefreshListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> {

    private final WebThreadPoolService webThreadPoolService;

    @Override
    public void onApplicationEvent(ThreadPoolConfigUpdateEvent event) {
        // 1. 从事件中拿出配置
        var config = event.getBootstrapConfigProperties().getWeb();

        // 2. 校验配置是否有效
        if (config == null) return;

        // 3. 执行 Web 相关的更新逻辑
        webThreadPoolService.updateThreadPool(config);
    }
}

场景 B:更新普通业务线程池
这个类位于 common-spring-boot-starter 包中。

public class DynamicThreadPoolRefreshListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> {
    @Override
    public void onApplicationEvent(ThreadPoolConfigUpdateEvent event) {
        // 逻辑类似,但处理的是通用线程池对象
        // ...
    }
}

第四部分:扩展性与总结 —— 为什么要这么做?

1. 极致的扩展性(Open/Closed 原则)

假设现在你需要增加对 RocketMQ 消费者线程池 的动态调整支持,你不需要修改 AbstractDynamicThreadPoolRefresher(发布者)的任何一行代码。

你只需要做两件事:

  1. 在配置类中增加 RocketMQ 的字段。
  2. 新建一个类 RocketMQListener 实现 ApplicationListener<ThreadPoolConfigUpdateEvent>
@Component
public class RocketMQListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> {
    @Override
    public void onApplicationEvent(ThreadPoolConfigUpdateEvent event) {
        // 只要配置里有 rocketMQ 的配置,我就更新
        var mqConfig = event.getBootstrapConfigProperties().getRocketMQ();
        if (mqConfig != null) {
            rocketMQService.resize(mqConfig);
        }
    }
}

Spring 会自动扫描到这个新组件,并把它加入到观察者列表中。这种“插件式”的开发体验是非常爽的。

2. Spring 事件 vs Guava EventBus

文中提到了 Guava,两者的区别在于:

  • Spring Events:零依赖(只要是 Spring 项目就有),天然集成 IOC 容器(监听器自动注入),IDE 支持好(在发布事件的地方可以看到谁在监听)。
  • Guava:轻量级,但需要额外引入 Jar 包,且需要手动注册监听器。
  • 结论:在 Spring Boot 项目中,优先使用 Spring Events。

除了前面提到的架构解耦、扩展性优势之外,关于 Spring 事件驱动(观察者模式) 在实际生产落地时,还有几个“隐藏的坑”“高阶技巧”值得深入探讨。

这也是面试中或者做架构评审时,经常会被问到的细节:

1. 同步 vs 异步:性能与一致性的博弈

默认行为
Spring 的 ApplicationEvent 默认是 同步(Synchronous) 的。
这意味着:publishEvent 方法被调用时,发布者的线程(例如 Nacos 的监听线程)会阻塞,直到所有监听器(WebListener、RocketMQListener…)的 onApplicationEvent 方法都执行完毕,代码才会继续往下走。

这里有两个潜在风险

  1. 性能阻塞:如果某个监听器里执行了耗时操作(比如发邮件、请求外部 API),会卡住整个配置刷新流程,甚至导致 Nacos/Apollo 客户端的心跳超时。
  2. 事务问题:如果在事务内发布事件,默认也是在同一个事务上下文中(虽然动态线程池刷新通常不涉及数据库事务,但在其他业务场景需注意)。

oneThread 的选择
对于动态线程池配置刷新,保持同步通常是更好的选择。

  • 原因:我们需要确保“配置变更”立刻生效。如果异步执行,可能会导致配置中心显示“已发布”,但应用内部实际上还没切过去,造成状态不一致。
  • 优化:如果监听器中有发邮件/发短信这种非核心逻辑(如流程图中的 EmailSubscriber),建议在监听器内部使用 @Async 或者扔到独立的线程池去执行,不要阻塞核心参数的修改。

2. 异常隔离:防止“一颗老鼠屎坏了一锅粥”

由于默认是同步执行的,这就带来了一个异常传播的问题。

场景
假设我们有 3 个监听器:

  1. WebThreadPoolListener (Tomcat)
  2. RocketMQThreadPoolListener (MQ)
  3. CommonThreadPoolListener (业务)

如果 WebThreadPoolListener 代码有 Bug,抛出了一个 RuntimeException,会发生什么?

后果
Spring 的事件广播循环会被中断,剩下的 MQ 和 业务线程池监听器将永远不会收到通知,而且异常会冒泡到发布者(配置中心组件),导致整个刷新任务失败。

最佳实践
在实现监听器时,务必进行 Try-Catch 兜底

@Component
@Slf4j
public class WebThreadPoolRefreshListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> {

    @Override
    public void onApplicationEvent(ThreadPoolConfigUpdateEvent event) {
        try {
            // 核心业务逻辑
            webThreadPoolService.update(event.getConfig());
        } catch (Exception e) {
            // 吞掉异常,打印错误日志,绝不影响其他监听器
            log.error("Failed to update Web ThreadPool", e);
        }
    }
}

3. 控制顺序:谁先谁后?

虽然观察者模式通常不依赖顺序,但在某些特定场景下,我们可能希望有序执行
比如:先刷新核心业务线程池,再刷新 Web 容器线程池,最后执行日志记录。

解决方案
Spring 的监听器支持 @Order 注解。

@Component
@Order(1) // 优先级最高,先执行
public class HighPriorityListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> { ... }

@Component
@Order(99) // 优先级低,后执行
public class LowPriorityListener implements ApplicationListener<ThreadPoolConfigUpdateEvent> { ... }

4. 避免“事件风暴” (Event Storm)

在动态线程池场景下,配置变更通常频率较低。但在某些极端情况下(例如配置中心发生抖动,或者运维人员误操作连续点击保存),可能会在短时间内触发大量 ThreadPoolConfigUpdateEvent

后果:频繁创建和销毁线程池(虽然通常只是改参数),或者频繁打印日志、发送报警,会给系统造成压力。

高阶处理
可以在发布者(Refresher)或监听器层面做防抖(Debounce)处理。

  • 例如:收到变更后,等待 5秒,如果期间没有新变更,再发布事件。

5. 调试技巧:怎么知道谁监听了?

使用 Spring 事件的一个小缺点是代码跳转不直观。你点击 publishEvent,IDE 不会直接跳到 onApplicationEvent
IntelliJ IDEA 的 Ultimate 版(旗舰版)对 Spring 支持非常好。在 publishEvent 代码行号左侧,通常会有一个绿色的豆子图标耳机图标,点击它,IDEA 会列出所有订阅了这个事件的监听器类,方便快速跳转。

运行时监控
可以在日志中打印监听器列表,或者在启动时验证:

// 获取所有监听该事件的 Bean
String[] listeners = applicationContext.getBeanNamesForType(
    ResolvableType.forClassWithGenerics(ApplicationListener.class, ThreadPoolConfigUpdateEvent.class)
);
log.info("Current Listeners: {}", Arrays.toString(listeners));