▪第10小节:开发优惠券模板结束、增加发行量等功能
业务背景
优惠券创建后,针对商家用户对优惠券的查看和管理相关需求,我们开发四个相关联接口:
- 分页查询优惠券模板。
- 查询优惠券模板详情。
- 增加优惠券模板发行量。
- 结束优惠券模板。
活动图如下所示:
编辑
Git 分支
20240818_dev_other-coupon-template_feature_ding.ma
数据横向越权
数据横向越权是一种安全漏洞类型,指的是在系统中,用户能够访问或操作本不属于他们的数据或资源的行为。这种漏洞通常发生在权限检查不严谨或未能正确隔离用户数据的情况下。具体来说,横向越权允许用户访问其他用户的数据。
举个例子:张三创建了一张优惠券,ID 是 Z1,李四知道了这个优惠券 ID,调用停止优惠券接口或者增加发行量接口等使坏。
编辑
以结束优惠券接口举例,我们在代码中验证当前优惠券是否登录用户的店铺,如果不是的话返回异常。
@LogRecord( success = "结束优惠券", type = "CouponTemplate", bizNo = "{{#couponTemplateId}}" ) @Override public void terminateCouponTemplate(String couponTemplateId) { // 验证是否存在数据横向越权 LambdaQueryWrapper<CouponTemplateDO> queryWrapper = Wrappers.lambdaQuery(CouponTemplateDO.class) .eq(CouponTemplateDO::getShopNumber, UserContext.getShopNumber()) .eq(CouponTemplateDO::getId, couponTemplateId); CouponTemplateDO couponTemplateDO = couponTemplateMapper.selectOne(queryWrapper); if (couponTemplateDO == null) { // 一旦查询优惠券不存在,基本可判定横向越权,可上报该异常行为,次数多了后执行封号等处理 throw new ClientException("优惠券模板异常,请检查操作是否正确..."); } // ...... }
数据权限验证活动图如下:
编辑
判断极端情况
还是以结束优惠券举例,很多同学验证完数据横向越权后,就正常操作流程了。但是,还是会有一些问题。比如说,我是商家,然后挺无聊的想刷你数据,我针对单个优惠券重复调用接口进行结束。
因为数据关系是正常的,我对本店铺的优惠券操作,即使操作一次和操作 N 次的结果都是一样,但是在咱们场景中,操作日志会记录多条。
无聊商家时序图如下所示:
编辑
根据这种情况,我们在判断了数据归属关系后,还需要判断数据状态是否正确。
无聊商家受制裁时序图如下所示:
编辑
其实代码很好写,加一个 if 判断的事。代码虽少,但是大家在写代码过程中要考虑方方面面,避免留下细微漏洞被有心人利用。
@LogRecord( success = "结束优惠券", type = "CouponTemplate", bizNo = "{{#couponTemplateId}}" ) @Override public void terminateCouponTemplate(String couponTemplateId) { //...... // 验证优惠券模板是否正常 if (ObjectUtil.notEqual(couponTemplateDO.getStatus(), CouponTemplateStatusEnum.ACTIVE.getStatus())) { throw new ClientException("优惠券模板已结束"); } // ...... }
增加发行量是否存在数据错乱?
增加优惠券模板发行量接口,是在原有库存基础上新增,代码如下所示:
<!-- 增加优惠券模板发行量 --> <update id="increaseNumberCouponTemplate"> UPDATE t_coupon_template SET stock = stock + #{number} WHERE shop_number = #{shopNumber} AND id = #{couponTemplateId} </update>
是否会出现以下问题?流程如下:
- 1. 张三和李四同时操作 ID 为 X1 的模板,X1 之前的库存有 10 个,他们同一时刻分别添加库存 10;
- 2. 张三读出库存是 10,添加 10 后等于 20 回写到数据库;
- 3. 李四读出库存是 10,添加 10 后等于 20 回写到数据库;
- 4. 导致最终结果是 20 库存。正常情况下两个人分别加了 10,最终结果应该是 30 才对。
编辑
其实不会出现这种情况,因为在修改的时候会使用 MySQL 排他锁,执行的时候会读取最新记录,所以最终结果一定是 30 库存。
正确时序图如下所示:
编辑
为了避免认知错误,我还特地写了并发增加优惠券库存测试类。代码如下所示:
@SpringBootTest public class CouponTemplateConcurrentInCreaseNumberTests { @Autowired private CouponTemplateMapper couponTemplateMapper; private CouponTemplateTest couponTemplateTest; private CouponTemplateDO couponTemplateDO; @BeforeEach public void setUp() { couponTemplateTest = new CouponTemplateTest(); couponTemplateDO = couponTemplateTest.buildCouponTemplateDO(); couponTemplateMapper.insert(couponTemplateDO); } @Test public void testConcurrentIncreaseNumber() throws InterruptedException { int threadCount = 12; int increaseAmount = 10; long shopNumber = couponTemplateDO.getShopNumber(); long couponTemplateId = couponTemplateDO.getId(); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++) { executorService.submit(() -> { couponTemplateMapper.increaseNumberCouponTemplate(shopNumber, String.valueOf(couponTemplateId), increaseAmount); }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.MINUTES); LambdaQueryWrapper<CouponTemplateDO> queryWrapper = Wrappers.lambdaQuery(CouponTemplateDO.class) .eq(CouponTemplateDO::getShopNumber, shopNumber) .eq(CouponTemplateDO::getId, couponTemplateDO.getId()); CouponTemplateDO updatedCouponTemplateDO = couponTemplateMapper.selectOne(queryWrapper); int expectedNumber = couponTemplateDO.getStock() + (threadCount * increaseAmount); assertEquals(expectedNumber, updatedCouponTemplateDO.getStock(), "The stock count should match the expected value."); } }
步骤如下:
- 1.
@BeforeEach
注解的方法setUp()
初始化测试数据并插入到数据库中; - 2. 创建一个大小为 200 的线程池;
- 3. 提交 200 个任务,每个任务调用
increaseNumberCouponTemplate
方法增加库存; - 4. 等待所有任务完成;
- 5. 查询数据库中的最新库存数量;
- 6. 计算期望的库存数量,并断言实际库存是否符合预期。
这个测试方法我尝试过很多次,并且线程池的大小设置的 200 也调小到我的机器核心数,最终计算的库存余量都没有问题。
实验 MySQL 行排他锁
纸上得来终觉浅,绝知此事要躬行。即使已经写了并发测试类,但是还不够,需要亲自实验下 MySQL 事务中的排他锁。
我们通过 MySQL 命令行来操作两个 A、B 事务,流程如下:
- 1. A 手动开启事务,A 事务修改优惠券缓存加 10;
- 2. 然后在 A 事务未提交的情况下,开启 B 事务,并操作优惠券缓存加 10;如果阻塞就证明加锁了,反之则没有。
A 事务操作如下所示:
编辑
B 事务操作如下所示:
编辑
提交 A 和 B 事务,查看最终结果也是我们期望的。
编辑
完结撒花 🎉。
▪第11小节:RocketMQ5.x延时消息修改优惠券结束状态
业务背景
在之前的章节里,我们完成了优惠券模板的创建、结束等功能,但是还遗留了一个小功能,那就是如果说优惠券模板有效期结束了之后,我们的模板状态依然是生效中。基于这个功能点考虑,我们需要在定时任务和定时消息中进行选择,以此来满足精准关闭优惠券模板功能。
编辑
使用 RocketMQ 定时消息有如下优势:
- 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。
- 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。
RocketMQ5.x 之前仅支持固定延迟级别,5.x 之后支持了任意延时消息。使用定时任务也能够完成该功能,在这个小节里给大家演示消息队列延时,后面还有个类似的需求,使用 XXL-Job 完成。
使用消息队列时序图如下所示:
编辑
Git 分支
20240821_dev_coupon-template-close_rocketmq5_ding.ma
消息队列介绍
1. 什么是消息队列?
消息队列是一种用于异步通信的机制,允许不同的系统组件或服务之间交换信息。它的主要作用是将消息从发送者传递到接收者,同时解耦这两个组件的直接依赖。
编辑
2. 什么是 RocketMQ?
RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给 Apache 软件基金会。它主要用于高吞吐量、低延迟的消息传递需求。
编辑
RocketMQ 的优点和功能是比较多的,以下是 一些主要特点和功能:
- 高吞吐量和低延迟:RocketMQ 设计用于处理大量的消息,并提供低延迟的消息传递服务,适合需要高性能的场景。
- 分布式架构:RocketMQ 使用分布式架构来支持大规模的消息传递。它可以水平扩展,以处理更大的数据量和更高的并发需求。
- 消息可靠性:RocketMQ 支持消息持久化和多副本机制,确保在系统故障时不会丢失消息。这使得消息的可靠性和一致性得到了保障。
- 高可用性和容错:RocketMQ 提供了高可用性的解决方案,包括多主多从等架构方案,确保系统的稳定性和连续性。
官网写的很详细,架构、基本概念(主题、队列、生产者、消费者、NameServer、Beroker 等)、工作原理等。
推荐大家学习一波:为什么选择RocketMQ | RocketMQ
3. 消息队列都有哪些作用?
3.1 异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:
串行方式:
编辑
数据流动如下所述:
- 1. 注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 2. 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
- 3. 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。
以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。假设每个任务耗时分别为 50ms,则用户需要在注册页面等待总共 150ms 才能登录。
并行形式:
对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。
对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。
编辑
数据流动如下所述:
- 1. 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 2. 注册信息写入注册系统成功后,再发送消息至 RocketMQ。 RocketMQ 会马上返回响应给注册系统,注册完成。用户可立即登录。
- 3. 下游的邮件和短信通知系统订阅 RocketMQ 的此类注册请求消息,即可向用户发送邮件和短信通知,完成所有的注册流程。
用户只需在注册页面等待注册数据写入注册系统和 RocketMQ 的时间,即等待 55ms 即可登录。
3.2 削峰填谷
流量削峰也是 RocketMQ 的常用场景,一般在秒杀或团队抢购活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入 RocketMQ。
编辑
秒杀处理流程如下所述:
- 1. 用户发起海量秒杀请求到秒杀业务处理系统。
- 2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送 RocketMQ。
- 3. 下游的通知系统订阅 RocketMQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
- 4. 用户收到秒杀成功的通知。
3.3 分布式定时/延时调度
RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。
使用 RocketMQ 定时消息有如下优势:
- 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。
- 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。
编辑
RocketMQ 使用安装
RocketMQ 安装涉及 Broker、NameServer、Console,比较占用内存和难以安装,为此,建议大家统一使用星球云中间件版本学习。
云中间件版本 RocketMQ 使用方式参考:https://t.zsxq.com/CgLP7,把 VM 参数配置到应用中即可。
编辑
RocketMQ 代码实战
1. 添加 Maven 依赖
<!-- 消息队列相关依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency>
2. application.yaml 添加配置
application.yaml
中添加 RocketMQ 相关配置。
rocketmq: name-server: 127.0.0.1:9876 # NameServer 地址,如果 VM 参数里设置了星球云服务器 RocketMQ 地址,运行时会替换 producer: # 通用生产者组,其中的 ${unique-name:} 是为了避免大家公用一个 Topic,造成你发的消息被其他同学消费,其他同学发的消息被你消费等问题 group: oneCoupon_merchant-admin${unique-name:}-service_common-message-execute_pg send-message-timeout: 2000 # 发送超时时间 retry-times-when-send-failed: 1 # 同步发送重试次数 retry-times-when-send-async-failed: 1 # 异步发送重试次数
3. 发送 RocketMQ 任意延迟消息
3.1 生产者代码
这一块相当于是生产者,业务代码如下:
private final RocketMQTemplate rocketMQTemplate; private final ConfigurableEnvironment configurableEnvironment; @Override public void createCouponTemplate(CouponTemplateSaveReqDTO requestParam) { // ...... // 使用 RocketMQ5.x 发送任意时间延时消息 // 定义 Topic String couponTemplateDelayCloseTopic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}"; // 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中 couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic); // 定义消息体 JSONObject messageBody = new JSONObject(); messageBody.put("couponTemplateId", couponTemplateDO.getId()); messageBody.put("shopNumber", UserContext.getShopNumber()); // 设置消息的送达时间,毫秒级 Unix 时间戳 Long deliverTimeStamp = couponTemplateDO.getValidEndTime().getTime(); // 构建消息体 String messageKeys = UUID.randomUUID().toString(); Message<JSONObject> message = MessageBuilder .withPayload(messageBody) .setHeader(MessageConst.PROPERTY_KEYS, messageKeys) .build(); // 执行 RocketMQ5.x 消息队列发送&异常处理逻辑 SendResult sendResult; try { sendResult = rocketMQTemplate.syncSendDeliverTimeMills(couponTemplateDelayCloseTopic, message, deliverTimeStamp); log.info("[生产者] 优惠券模板延时关闭 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys); } catch (Exception ex) { log.error("[生产者] 优惠券模板延时关闭 - 消息发送失败,消息体:{}", couponTemplateDO.getId(), ex); } }
3.2 占位符解析
我们在定义 Topic 时,使用了 Spring 的 ConfigurableEnvironment 类解析了自定义占位符。逻辑就是如果咱们 VM 参数里有设置 -Dunique-name=xxx,那么就会把 xxx 替换到这个字符串里。
比如 -Dunique-name=mading,那执行完解析方法字符串等于 one-coupon_merchant-admin-service_coupon-template-delay_topicmading。这么做就是为了避免大家用云中间件时用一个 Topic,消息混发和消费问题。
// 定义 Topic String couponTemplateDelayCloseTopic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}"; // 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中 couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic);
3.3 日志打印
可以看到这里日志参数打印的非常详细,尤其是打印了消息 ID 和 Keys,这两个参数大有用途,可以帮助我们排查生产问题。
// 执行 RocketMQ5.x 消息队列发送&异常处理逻辑 SendResult sendResult; try { sendResult = rocketMQTemplate.syncSendDeliverTimeMills(couponTemplateDelayCloseTopic, message, deliverTimeStamp); log.info("[生产者] 优惠券模板延时关闭 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys); } catch (Exception ex) { log.error("[生产者] 优惠券模板延时关闭 - 消息发送失败,消息体:{}", couponTemplateDO.getId(), ex); }
举个例子,你发送个消息,下游服务说没收到,你说这是谁的问题?如果我们打印了这个消息 ID 和 Keys 就可以去控制台查询消息的详细信息。以下面这个发送举例:
2024-08-21T22:45:26.280+08:00 INFO 78983 --- [io-10010-exec-1] c.n.o.m.a.s.i.CouponTemplateServiceImpl : [生产者] 优惠券模板延时关闭 - 发送结果:SEND_OK,消息ID:2408820760D4CCC06CC04DD27B33332C3487251A69D76BE1483A0000,消息Keys:a88bb1e1-e932-429e-bca6-fbe6fa52cc23
大家记得自己发送,别用这个,RocketMQ Broker 到期自动清理消息。根据 Keys 查询消息流程截图如下:
编辑
点击 Message Detail 按钮,会展示这个消息的详细信息,如下所示。
编辑
通过以上这些信息就能很好定位消息是否发成功、消息是否被消费等问题,可谓是生产自救、甩锅神器。
3.4 缺少自定义 timeput API
这里有个小插曲,因为 RocketMQ5.x 才有的任意延时消息,所以 SpringBoot Starter API 是后适配的,感觉有一个地方没做好,那就是在发送延迟消息接口中不支持携带 timout
参数。而唯一支持的 RocketMQTemplate#syncSend
方法又是私有的,感觉像是把这个参数漏掉了一样。
/** * Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition. * This function is only valid when the broker version is 5.0 or above * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param timeout send timeout with millis * @param delayTime delay time for message * @return {@link SendResult} */ private SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) { // ...... }
要是发送普通消息参数不支持也就不说啥了,关键是它支持,找谁说理去。和上面的区别,也就一个是固定延迟级别,一个是任意时间延时。
/** * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param timeout send timeout with millis * @param delayLevel level for the delay message * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { // ...... }
关于这个问题我提交了 Issue,看看官方怎么说:https://github.com/apache/rocketmq-spring/issues/676
4. 定义消息消费者
4.1 消费者代码
优惠券模板到期结束消费者代码定义如下所示:
package com.nageoffer.onecoupon.merchant.admin.mq.consumer; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTemplateStatusEnum; import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTemplateDO; import com.nageoffer.onecoupon.merchant.admin.service.CouponTemplateService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 优惠券推送延迟执行-变更记录发送状态消费者 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-08-21 */ @Component @RequiredArgsConstructor @RocketMQMessageListener( topic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}", consumerGroup = "one-coupon_merchant-admin-service_coupon-template-delay-status_cg${unique-name:}" ) @Slf4j(topic = "CouponTemplateDelayExecuteStatusConsumer") public class CouponTemplateDelayExecuteStatusConsumer implements RocketMQListener<JSONObject> { private final CouponTemplateService couponTemplateService; @Override public void onMessage(JSONObject message) { // 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等) log.info("[消费者] 优惠券模板定时执行@变更模板表状态 - 执行消费逻辑,消息体:{}", message.toString()); // 修改指定优惠券模板状态为已结束 LambdaUpdateWrapper<CouponTemplateDO> updateWrapper = Wrappers.lambdaUpdate(CouponTemplateDO.class) .eq(CouponTemplateDO::getShopNumber, message.getLong("shopNumber")) .eq(CouponTemplateDO::getId, message.getLong("couponTemplateId")) .set(CouponTemplateDO::getStatus, CouponTemplateStatusEnum.ENDED.getStatus()); couponTemplateService.update(updateWrapper); } }
代码比较简答,两步就可以:
- 添加
@RocketMQMessageListener
注解,其中加上 Topic 和消费者组定义。 - 实现
RocketMQListener
消息监听接口,泛型的类型是我们生产者发送消息的类定义。
4.2 @RocketMQMessageListener 不需要解析占位符么?
因为这是在 Spring 环境里提供的注解,底层会自动解析其中的占位符。
4.3 是否需要幂等?
其实不需要,幂等的逻辑是多次执行结果不一致,不过我们是修改为结束状态,变更多少次都是一样的。
4.4 消费时为什么不删除模板缓存?
因为在预热缓存时我们已经设置了过期时间,不需要重复删除。
这里可以引申出一个扩展点,那就是缓存不设置过期时间,可以在这个消费者里去将缓存删除。通过这种方式,还可以减少缓存预热的代码复杂度。
4.5 @Slf4j(topic=xxx) 什么意思?
如果没有 topic 这个属性,那么你的日志打印是这样的:
2024-08-22T19:26:15.172+08:00 INFO 90884 --- [io-10010-exec-1] c.n.o.m.a.s.i.CouponTemplateServiceImpl : [生产者] 优惠券模板延时关闭 - 发送结果:SEND_OK,消息ID:2408820760D4CCC0901EE0E538FD681A6304251A69D7705148480000,消息Keys:d904fbe7-f8c6-4e77-997c-6b08f83868a3
添加了 Topic 后,就会将日志打印引用类规范化。
2024-08-22T19:23:17.456+08:00 INFO 78983 --- [cg-mading0924_2] CouponTemplateDelayExecuteStatusConsumer : [消费者] 优惠券模板定时执行@变更模板表状态 - 执行消费逻辑,消息体:{"couponTemplateId":1826580899668439042,"shopNumber":1810714735922956666}
文末总结
通过该章节带着大家走了一遍消息队列的发送、消费流程,代码直接写到业务代码里,接下来如果再用到消息队列,会带着大家基于模板方法模式抽象出来。
生产使用消息队列还是需要谨慎的,简单的发送、消费看不出来,遇到问题时就考验代码的周全了。建议大家看看这里的消息队列规范:https://t.zsxq.com/sHIqY
完结,撒花 🎉
▪第12小节:EasyExcel解析百万Excel创建批量分发任务
业务背景
我们设想一个场景,你是一个保险公司的运营人员,如果你们出了一个新的保险,怎么让用户更好知道?漫无目的宣传肯定不行,是不是可以找之前购买过你们保险或者有意向购买保险的用户,给他们发个短信通知或者电话营销好一些。
我们优惠券的分发和上面保险推广是相同的原理,获取到用户信息的 Excel 后,将优惠券写入到用户领券列表中,同时根据配置选择是否通知用户,通知的话有短信、微信公众号、邮件等。
编辑
为什么优惠券分发是通过 Excel 进行的?
这背后其实有一些背景需要先了解,首先是 Excel 中的用户数据是怎么来的。
我们日常在淘宝、京东等大型电商平台上浏览或购买商品时,都会被埋点采集数据。这些埋点可以精确到你点击了哪个商品、浏览了多长时间、访问了几次店铺等。这些行为数据会被汇总到用户画像系统中(不同公司可能叫法不同),用于识别潜在意向客户。基于这些画像,平台运营人员可以有针对性地进行优惠券投放或发送营销短信。
针对如何将优惠券分发给这些潜在客户,通常有两种模式:
- 1. Excel 导出模式:运营人员根据需求向画像平台提报,画像平台返回一个符合条件的用户列表(Excel 文件)。然后,平台通过内部运营系统,选择优惠券并将其分发给 Excel 中的用户。
- 2. 标签查询模式:运营人员向画像平台提需求,画像平台将用户打上统一标识。平台根据这个标识,在分发时结合优惠券执行用户发放。
在实际企业中,这两种方案都存在,且各有适用场景。
我们当前的方案采用的是第一种 Excel 导入模式,原因也很简单:我们并没有真正接入用户画像平台,因此通过模拟生成 Excel 数据的方式,更容易实现并演示整个分发流程。
如果在面试中有面试官对此提出质疑,也可以补充说明第二种标签模式的实现逻辑,大部分人其实都能理解和认可。
例如,如果我们要上线一家高端服装店,为了提升其生意,我们可以从意向客户中提取长期浏览高端服装或已经购买过类似品牌或价位的用户信息,然后将优惠券和通知发送到这些用户的账户。这样可以精准地锁定潜在客户,提高营销效果。
数据库表设计
进入 one_coupon_rebuild_0
数据库中执行下述 SQL 语句。
CREATE TABLE `t_coupon_task` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID', `shop_number` bigint(20) DEFAULT NULL COMMENT '店铺编号', `batch_id` bigint(20) DEFAULT NULL COMMENT '批次ID', `task_name` varchar(128) DEFAULT NULL COMMENT '优惠券批次任务名称', `file_address` varchar(512) DEFAULT NULL COMMENT '文件地址', `fail_file_address` varchar(512) DEFAULT NULL COMMENT '发放失败用户文件地址', `send_num` int(11) DEFAULT NULL COMMENT '发放优惠券数量', `notify_type` varchar(32) DEFAULT NULL COMMENT '通知方式,可组合使用 0:站内信 1:弹框推送 2:邮箱 3:短信', `coupon_template_id` bigint(20) DEFAULT NULL COMMENT '优惠券模板ID', `send_type` tinyint(1) DEFAULT NULL COMMENT '发送类型 0:立即发送 1:定时发送', `send_time` datetime DEFAULT NULL COMMENT '发送时间', `status` tinyint(1) DEFAULT NULL COMMENT '状态 0:待执行 1:执行中 2:执行失败 3:执行成功 4:取消', `completion_time` datetime DEFAULT NULL COMMENT '完成时间', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `operator_id` bigint(20) DEFAULT NULL COMMENT '操作人', `update_time` datetime DEFAULT NULL COMMENT '修改时间', `del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识 0:未删除 1:已删除', PRIMARY KEY (`id`), KEY `idx_batch_id` (`batch_id`) USING BTREE, KEY `idx_coupon_template_id` (`coupon_template_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1816672964423188483 DEFAULT CHARSET=utf8mb4 COMMENT='优惠券模板发送任务表';
因为这种优惠券分发任务不会太多,所以我们默认不进行分库分表,ShardingSphere 5.3.2 对于没有配置分库分表逻辑的表,默认从第一个数据源读取。
我们针对一些核心字段做个讲解:
file_address
:文件地址,保存分发目标用户的 Excel 文件地址。fail_file_address
:发放失败用户文件地址,如果发放执行过程中失败,需要保存错误信息生成一个新的 Excel。send_num
:发放优惠券数量,file_address 中共有多少条记录,方便后续记录是否发放完成。
Git 分支
20240822_dev_create-coupon-task_easyexcel_ding.ma
生成百万测试 Excel 文件
1. Excel 中有哪些字段?
上面的数据库表中有个字段是通知方式,一共有四个值:
- 站内信:需要用户 ID。
- 弹框推送:需要用户 ID。
- 邮箱:需要用户邮箱,这个属于是考虑到了,实际中基本不存在。
- 短信:需要用户手机号,有些公司考虑到用户隐私泄露问题,可能也是记录用户 ID,发送时查询用户接口获取。
那基于上面的描述,我们需要搞三个字段,用户 ID、邮箱、手机号,接下来开始模拟记录。
2. 什么是 Faker?
此 Faker 非彼 Faker。咱们这个章节聊的 Faker 是一个开源库,提供了生成伪随机数据的功能。该库可以用来生成各种各样的测试数据,例如姓名、地址、电话号码、电子邮件、公司名、日期等。
那我们先引入,试试效果怎么样。
2.1 引入 Faker Maven 依赖
<!-- Mock 数据相关依赖 --> <dependency> <groupId>com.github.javafaker</groupId> <artifactId>javafaker</artifactId> <scope>test</scope> <version>1.0.2</version> </dependency>
2.2 写个单元测试
通过一个简单的单元测试让大家熟悉下 Faker 怎么使用。
package com.nageoffer.onecoupon.merchant.admin.task; import com.github.javafaker.Address; import com.github.javafaker.Faker; import com.github.javafaker.PhoneNumber; import org.junit.jupiter.api.Test; import java.util.Locale; /** * Faker 单元测试类 */ public class FakerTests { @Test public void testFaker() { // 创建一个 Faker 实例 Faker faker = new Faker(Locale.CHINA); // 生成中文名 String chineseName = faker.name().fullName(); System.out.println("中文名: " + chineseName); // 生成手机号 PhoneNumber phoneNumber = faker.phoneNumber(); String mobileNumber = phoneNumber.cellPhone(); System.out.println("手机号: " + mobileNumber); // 生成电子邮箱 String email = faker.internet().emailAddress(); System.out.println("电子邮箱: " + email); } }
打印日志如下:
中文名: 沈烨霖 手机号: 15109362990 电子邮箱: 明哲.孙@gmail.com
3. 什么是 EasyExcel?
EasyExcel 是一个基于 Java 的、快速、简洁、解决大文件内存溢出的 Excel 处理工具。他能让你在不用考虑性能、内存的等因素的情况下,快速完成 Excel 的读、写等功能。
我们在生成 Excel 文件时,刚好使用 EasyExcel 操作,可以看出非常的便捷。
3.1 引入 EasyExcel Maven 依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>4.0.1</version> </dependency>
3.2 生成百万用户 Excel
基于 Faker 生成示例数据,将示例数据执行 EasyExcel 数据写入流程,最终保存到项目的 /tmp 文件中。
package com.nageoffer.onecoupon.merchant.admin.task; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.excel.EasyExcel; import com.alibaba.excel.annotation.ExcelProperty; import com.alibaba.excel.annotation.write.style.ColumnWidth; import com.alibaba.excel.util.ListUtils; import com.github.javafaker.Faker; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.junit.jupiter.api.Test; import java.nio.file.Paths; import java.util.List; import java.util.Locale; /** * 百万 Excel 文件生成单元测试 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ public final class ExcelGenerateTests { /** * 写入优惠券推送示例 Excel 的数据,自行控制即可 */ private final int writeNum = 5000; private final Faker faker = new Faker(Locale.CHINA); private final String excelPath = Paths.get("").toAbsolutePath().getParent() + "/tmp"; @Test public void testExcelGenerate() { if (!FileUtil.exist(excelPath)) { FileUtil.mkdir(excelPath); } String fileName = excelPath + "/oneCoupon任务推送Excel.xlsx"; EasyExcel.write(fileName, ExcelGenerateDemoData.class).sheet("优惠券推送列表").doWrite(data()); } private List<ExcelGenerateDemoData> data() { List<ExcelGenerateDemoData> list = ListUtils.newArrayList(); for (int i = 0; i < writeNum; i++) { ExcelGenerateDemoData data = ExcelGenerateDemoData.builder() .mail(faker.number().digits(10) + "@163.com") .phone(faker.phoneNumber().cellPhone()) .userId(IdUtil.getSnowflakeNextIdStr()) .build(); list.add(data); } return list; } /** * 百万 Excel 生成器示例数据模型 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Data @NoArgsConstructor @AllArgsConstructor @Builder static class ExcelGenerateDemoData { @ColumnWidth(30) @ExcelProperty("用户ID") private String userId; @ColumnWidth(20) @ExcelProperty("手机号") private String phone; @ColumnWidth(30) @ExcelProperty("邮箱") private String mail; } }
执行这个单元测试后会在项目根目录下创建 /tmp 文件夹,文件夹下就是咱们的 Excel 数据文件。
为了避免这种测试数据文件上传到 Git 项目,我们需要在 .gitignore
忽略文件中添加 tmp 目录,如下图所示:
编辑
3.3 EasyExcel 注解讲解
- @ColumnWidth(30):表示当前列占单元格多大宽度。
- @ExcelProperty("用户ID"):写入的表头标题。
开发创建优惠券分发任务
因为代码较多,大家查看分支提交记录即可,我们这里只讲解核心代码。
1. 生成后的 Excel 文件
我们调用上面的生成 Excel 单元测试后,会生成一个 Excel 文件。可以看到,一个 100 万记录的 Excel 在 30M 左右。
编辑
2. Hutool 获取 Excel 文件行数
为了对比 EasyExcel 提到的内存安全,我们先尝试使用 Hutool 中的 Excel 工具获取下 Excel 行数,看看效果怎么样。
package com.nageoffer.onecoupon.merchant.admin.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.poi.excel.ExcelReader; import cn.hutool.poi.excel.ExcelUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.nageoffer.onecoupon.framework.exception.ClientException; import com.nageoffer.onecoupon.merchant.admin.common.context.UserContext; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskSendTypeEnum; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskStatusEnum; import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTaskDO; import com.nageoffer.onecoupon.merchant.admin.dao.mapper.CouponTaskMapper; import com.nageoffer.onecoupon.merchant.admin.dto.req.CouponTaskCreateReqDTO; import com.nageoffer.onecoupon.merchant.admin.dto.resp.CouponTemplateQueryRespDTO; import com.nageoffer.onecoupon.merchant.admin.service.CouponTaskService; import com.nageoffer.onecoupon.merchant.admin.service.CouponTemplateService; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.util.Objects; /** * 优惠券推送业务逻辑实现层 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { private final CouponTemplateService couponTemplateService; private final CouponTaskMapper couponTaskMapper; @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // 验证非空参数 // 验证参数是否正确,比如文件地址是否为我们期望的格式等 // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等 CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId()); if (couponTemplate == null) { throw new ClientException("优惠券模板不存在,请检查提交信息是否正确"); } // ...... // 构建优惠券推送任务数据库持久层实体 CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class); couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId()); couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId())); couponTaskDO.setShopNumber(UserContext.getShopNumber()); couponTaskDO.setStatus( Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType()) ? CouponTaskStatusEnum.IN_PROGRESS.getStatus() : CouponTaskStatusEnum.PENDING.getStatus() ); // 读取 Excel 文件 ExcelReader reader = ExcelUtil.getReader(requestParam.getFileAddress()); // 获取总行数(包括标题行) int rowCount = reader.getRowCount(); couponTaskDO.setSendNum(rowCount); // 保存优惠券推送任务记录到数据库 couponTaskMapper.insert(couponTaskDO); } }
通过 API 管理工具开始发起调用,一些参数说明:
- fileAddress:写上面 Excel 文件的绝对路径即可。
- couponTemplateId:写个之前创建并且存在的优惠券模板 ID。
编辑
我们通过 JDK 自带的 visualvm 监控工具查看下内存变化,可以看到有个非常明显的内存上升。这里有点纳闷,为什么一个不到 30M 的 Excel 能引发这么大的内存占用。
编辑
3. EasyExcel 获取 Excel 文件行数
创建 EasyExcel 读取监听类,代码很简单,只是用于类似于 i++ 的逻辑。
package com.nageoffer.onecoupon.merchant.admin.service.handler.excel; import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; import lombok.Getter; /** * Excel 行数统计监听器 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ public class RowCountListener extends AnalysisEventListener<Object> { @Getter private int rowCount = 0; @Override public void invoke(Object data, AnalysisContext context) { rowCount++; } @Override public void doAfterAllAnalysed(AnalysisContext context) { // No additional actions needed after all data is analyzed } }
调整业务代码,切换 Hutool 的统计为 EasyExcel 行数统计。
package com.nageoffer.onecoupon.merchant.admin.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.poi.excel.ExcelReader; import cn.hutool.poi.excel.ExcelUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.nageoffer.onecoupon.framework.exception.ClientException; import com.nageoffer.onecoupon.merchant.admin.common.context.UserContext; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskSendTypeEnum; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskStatusEnum; import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTaskDO; import com.nageoffer.onecoupon.merchant.admin.dao.mapper.CouponTaskMapper; import com.nageoffer.onecoupon.merchant.admin.dto.req.CouponTaskCreateReqDTO; import com.nageoffer.onecoupon.merchant.admin.dto.resp.CouponTemplateQueryRespDTO; import com.nageoffer.onecoupon.merchant.admin.service.CouponTaskService; import com.nageoffer.onecoupon.merchant.admin.service.CouponTemplateService; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.util.Objects; /** * 优惠券推送业务逻辑实现层 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { private final CouponTemplateService couponTemplateService; private final CouponTaskMapper couponTaskMapper; @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // 验证非空参数 // 验证参数是否正确,比如文件地址是否为我们期望的格式等 // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等 CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId()); if (couponTemplate == null) { throw new ClientException("优惠券模板不存在,请检查提交信息是否正确"); } // ...... // 构建优惠券推送任务数据库持久层实体 CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class); couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId()); couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId())); couponTaskDO.setShopNumber(UserContext.getShopNumber()); couponTaskDO.setStatus( Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType()) ? CouponTaskStatusEnum.IN_PROGRESS.getStatus() : CouponTaskStatusEnum.PENDING.getStatus() ); // 通过 EasyExcel 监听器获取 Excel 中所有行数 RowCountListener listener = new RowCountListener(); EasyExcel.read(requestParam.getFileAddress(), listener).sheet().doRead(); // 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号 int totalRows = listener.getRowCount(); couponTaskDO.setSendNum(totalRows); // 保存优惠券推送任务记录到数据库 couponTaskMapper.insert(couponTaskDO); } }
重启项目,再看看内存占用怎么样。
查看 visualvm 堆内存监控得知,虽然还是有内存上升,但是相对来说好很多了。Hutool 的内存占用在 3G 还要多点,EasyExcel 的内存在 250M 多点。
编辑
文末总结
在本章节中,我们探讨了使用 EasyExcel 处理大文件 Excel 的方法,特别是在开发批量优惠券分发任务时如何解决内存溢出的问题。传统的 Excel 解析工具(如 Hutool)在处理大规模数据时容易导致高内存消耗,甚至出现内存溢出问题。EasyExcel 通过流式处理数据,有效地降低了内存占用。
可能有同学会疑惑,解析 Excel 文件的行数有什么用?这里我们总结两个用处:
- 1. 任务规模预览 :管理或操作人员直观看到本次分发任务大概涉及多少用户;
- 2. 数据完整性校验 :任务完成后,可通过比对总行数与实际分发数量,验证是否有用户遗漏。
实现说明:
- 1. 用途1:在行数统计完成后会实时存入数据库并返回前端展示;
- 2. 用途2:考虑到核心分发逻辑的简洁性,当前版本暂未内置该校验机制。如有需要,建议以扩展的形式实现,可通过扩展方式完成校验触发。
完结,撒花 🎉
▪第13小节:通过线程池和延时队列优化接口响应时间
业务背景
在上一节中,我们通过 EasyExcel 解析百万数据量的 Excel 行数,避免了 JVM 内存占用过多问题。但是末了还有一个小问题没有说,那就是接口响应太慢,百万数据量需要解析 5 秒,这种在后管系统里不是不能接受,但是能优化还是要优化。在这节课我们通过线程池和 Redis 延迟队列的形式优化接口响应时间。
编辑
有同学可能会疑问,我直接将这个请求放到 RocketMQ 来做可以么?先给大家说个前提,这个需要完全可以用消息队列来做,但是实际开发过程中,并不是每个项目都有消息队列的。为此我们通过这种扩展机制扩展大家的解决方案思路。
Git 分支
20240823_optimize_create-coupon-task_threadpool-delayqueue_ding.ma
线程池异步解析 Excel 行数
1. 创建线程池
创建一个公共线程池,因为咱们这个逻辑比较简单,所以直接定义即可。
@Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { private final ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() << 1, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy() ); // ...... }
有些同学可能习惯使用 Executors
工具类直接创建线程池,这种是不推荐的。虽然 Executors
提供了创建线程池的便捷方法,然而,Executors
基于默认配置创建的线程池可能并不适合所有场景,这里我们说下每个方法创建的线程池都有哪些弊端:
newFixedThreadPool
和newSingleThreadExecutor
:这两种固定大小的线程池使用无界的LinkedBlockingQueue
作为工作队列。当任务提交速度超过处理速度时,工作队列会不断增长,可能导致内存溢出。newScheduledThreadPool
:虽然最大线程数是Integer
最大值,但是因为阻塞队列是无界的,所以核心问题同上。newCachedThreadPool
:核心线程数为 0,使用同步的SynchronousQueue
,并且允许创建无限数量的线程。在高并发情况下,可能会创建大量线程,导致系统资源耗尽,甚至使系统崩溃。
扩展知识,线程池处理逻辑如下:
编辑
2. 线程池参数解析
解析下我们线程池中的参数为什么这么设置:
- corePoolSize:因为属于后管任务,大概率不会很频繁,所以直接取服务器 CPU 核数。
- maximumPoolSize:运行任务属于 IO 密集型,最大线程数直接服务器 CPU 核数 2 倍。
- workQueue:理论上说我们不会有阻塞的情况,因为设置的线程数不少,所以如果使用不存储任务的同步队列。
- handler:如果线程数都在运行,直接将任务丢弃即可,因为我们还有延时队列兜底。
3. 使用线程池异步处理
因为线程池和延时队列都可能会用到 Excel 解析的代码,所以我们把这一块逻辑抽象出来一个方法。因为用到了两个参数,为了避免复杂,直接使用 JSONObject 即可。
private void refreshCouponTaskSendNum(JSONObject delayJsonObject) { // 通过 EasyExcel 监听器获取 Excel 中所有行数 RowCountListener listener = new RowCountListener(); EasyExcel.read(delayJsonObject.getString("fileAddress"), listener).sheet().doRead(); int totalRows = listener.getRowCount(); // 刷新优惠券推送记录中发送行数 CouponTaskDO updateCouponTaskDO = CouponTaskDO.builder() .id(delayJsonObject.getLong("couponTaskId")) .sendNum(totalRows) .build(); couponTaskMapper.updateById(updateCouponTaskDO); }
使用线程池异步解析用户上传的 Excel 文件,代码如下:
@Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { private final ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() << 1, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy() ); @Transactional(rollbackFor = Exception.class) @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // 验证非空参数 // 验证参数是否正确,比如文件地址是否为我们期望的格式等 // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等 CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId()); if (couponTemplate == null) { throw new ClientException("优惠券模板不存在,请检查提交信息是否正确"); } // ...... // 构建优惠券推送任务数据库持久层实体 CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class); couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId()); couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId())); couponTaskDO.setShopNumber(UserContext.getShopNumber()); couponTaskDO.setStatus( Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType()) ? CouponTaskStatusEnum.IN_PROGRESS.getStatus() : CouponTaskStatusEnum.PENDING.getStatus() ); // 保存优惠券推送任务记录到数据库 couponTaskMapper.insert(couponTaskDO); // 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号 // 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化 JSONObject delayJsonObject = JSONObject .of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId()); executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject)); } }
不关使用线程池执行什么类型的任务,都会有一个通用的致命问题,那就是刚投递到线程池,还没有运行完,应用宕机了怎么整?
所以就需要我们接下来讲到的延时队列兜底,避免这种宕机行为。
Redis 延时队列兜底任务
1. 使用延时队列兜底
任务投递到线程池后,紧接着我们向延时队列也投递个任务,延迟时间设置为 20 秒。为什么延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务。
@Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { private final RedissonClient redissonClient; private final ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() << 1, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy() ); @Transactional(rollbackFor = Exception.class) @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // 验证非空参数 // 验证参数是否正确,比如文件地址是否为我们期望的格式等 // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等 CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId()); if (couponTemplate == null) { throw new ClientException("优惠券模板不存在,请检查提交信息是否正确"); } // ...... // 构建优惠券推送任务数据库持久层实体 CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class); couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId()); couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId())); couponTaskDO.setShopNumber(UserContext.getShopNumber()); couponTaskDO.setStatus( Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType()) ? CouponTaskStatusEnum.IN_PROGRESS.getStatus() : CouponTaskStatusEnum.PENDING.getStatus() ); // 保存优惠券推送任务记录到数据库 couponTaskMapper.insert(couponTaskDO); // 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号 // 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化 JSONObject delayJsonObject = JSONObject .of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId()); executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject)); // 假设刚把消息提交到线程池,突然应用宕机了,我们通过延迟队列进行兜底 Refresh RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); // 这里延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务 delayedQueue.offer(delayJsonObject, 20, TimeUnit.SECONDS); } }
2. 定义延时队列消费者
代码逻辑比较简单, 为了避免复杂直接定义一个内部类,实现 CommandLineRunner
接口在项目启动后运行后置任务。流程如下:
- 1. 启动一个线程,在线程无限循环获取 Redis 阻塞队列已经到达时间的元素;
- 2. 然后判断数据库中的分发任务 Excel 总数是否为空,为空读取 Excel 记录,然后设置;如果不为空证明线程池已经运行完了。
@Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { // ...... /** * 优惠券延迟刷新发送条数兜底消费者|这是兜底策略,一般来说不会执行这段逻辑 * 如果延迟消息没有持久化成功,或者 Redis 挂了怎么办?后续可以人工处理 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Service @RequiredArgsConstructor class RefreshCouponTaskDelayQueueRunner implements CommandLineRunner { private final CouponTaskMapper couponTaskMapper; private final RedissonClient redissonClient; @Override public void run(String... args) throws Exception { Executors.newSingleThreadExecutor( runnable -> { Thread thread = new Thread(runnable); thread.setName("delay_coupon-task_send-num_consumer"); thread.setDaemon(Boolean.TRUE); return thread; }) .execute(() -> { RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE"); for (; ; ) { try { // 获取延迟队列已到达时间元素 JSONObject delayJsonObject = blockingDeque.take(); if (delayJsonObject != null) { // 获取优惠券推送记录,查看发送条数是否已经有值,有的话代表上面线程池已经处理完成,无需再处理 CouponTaskDO couponTaskDO = couponTaskMapper.selectById(delayJsonObject.getLong("couponTaskId")); if (couponTaskDO.getSendNum() == null) { refreshCouponTaskSendNum(delayJsonObject); } } } catch (Throwable ignored) { } } }); } } }
后续有同学反馈,refreshCouponTaskSendNum
方法在静态内部类执行时出现了空指针异常,原因在于静态内部类的 Bean 注入存在问题。为了解决这个问题,我们直接通过 new
对象来运行该方法。由于问题提出时已经有了许多后续分支的迭代,因此代码并未更新到当前分支,而是提交到了 main
分支。详情请查看:修复优惠券分发任务兜底策略空指针异常。
3. 能不能直接用 Redis 消息队列?
大家一定要记得,Redis 是个缓存,由于它的持久化机制和主从同步机制,意味着可能会丢数据。为此,我们只是把它作为一个兜底方案,而不是全部方案。
- 持久化丢数据是因为即使 AOF 持久化也是异步的,最好的情况也会丢一条数据。
- 主从同步机制,如果主节点在某些写操作尚未同步到从节点时发生故障,这些未同步的写操作将会丢失。
添加 Spring 事务
细心的同学可能看到我们在方法上加了个 @Transactional(rollbackFor = Exception.class)
注解,这是因为如果不加注解的话,我们执行数据库插入操作成功了,但是线程池和延时队列都没有执行。这种情况下,发送条数数据就永远不会被刷新。
我们就将数据库的添加和这些行为绑定一起,也就不会出现这种问题了。
@Service @RequiredArgsConstructor public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService { @Transactional(rollbackFor = Exception.class) @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // ...... } }
文末总结
通过线程池异步处理和 Redis 延时队列的双重保障机制,我们成功将百万级 Excel 解析的接口响应时间从 5 秒优化到毫秒级,在确保系统响应速度的同时,采用"线程池快速处理+延迟队列兜底校验"的创新模式,既解决了 JVM 内存问题,又实现了任务处理的可靠性,这种设计思想在没有专业消息中间件的情况下尤其具有实用价值。
简单一句话总结核心思想:先执行再延迟确认。
完结,撒花 🎉
▪第14小节:基于模板方法模式重构消息队列发送功能
业务背景
优惠券分发任务有两种执行类型,分别是立即发送和定时发送。如果用户创建的是立即发送类型的分发任务,需要通过消息队列执行分发请求发送逻辑,distribution
分发服务监听到这个消息后开始正式执行用户优惠券分发流程。我们本章节先开发立即发送类型的消息队列发送,以及使用模板方法模式重构消息队列发送流程。
编辑
Git 分支
20240824_dev_coupon-task-execute_template-method_ding.ma
优惠券分发调用消息队列
1. 使用 RocketMQ 发送普通消息
之前我们发送过消息队列的延时消息,这次发送普通消息即可。代码如下所示:
@Transactional(rollbackFor = Exception.class) @Override public void createCouponTask(CouponTaskCreateReqDTO requestParam) { // ...... // 如果是立即发送任务,直接调用消息队列进行发送流程 if (Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())) { // 使用 RocketMQ5.x 发送任意时间延时消息 // 定义 Topic String couponTemplateDelayCloseTopic = "one-coupon_distribution-service_coupon-task-execute_topic${unique-name:}"; // 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中 couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic); // 构建消息体 String messageKeys = UUID.randomUUID().toString(); Message<Long> message = MessageBuilder .withPayload(couponTaskDO.getId()) .setHeader(MessageConst.PROPERTY_KEYS, messageKeys) .build(); // 执行 RocketMQ5.x 消息队列发送&异常处理逻辑 SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(couponTemplateDelayCloseTopic, message, 2000L); log.info("[生产者] 执行优惠券分发任务 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys); } catch (Exception ex) { log.error("[生产者] 执行优惠券分发任务 - 消息发送失败,消息体:{}", couponTaskDO.getId(), ex); } } }
这段代码逻辑如下:
- 1. 通过判断入参发送是否为立即发送任务,如果是的话直接调用消息队列进行发送流程;
- 2. 基于
configurableEnvironment
解析出目标 Topic 字符串; - 3. 构建消息体,主要设置分发任务 ID 和 Keys,前者用于到分发服务里执行,后者利用问题排查;
- 4. 调用同步发送消息,并记录详细的成功和错误日志,方便后续问题排查。
2. 使用立即发送创建分发任务
在前端 API 接口处通过创建优惠券推送任务接口发起请求,sendType
字段使用值 0 立即发送。
{ "taskName": "发送百万优惠券推送任务", "fileAddress": "/Users/machen/workspace/opensource/onecoupon-rebuild/tmp/oneCoupon任务推送Excel.xlsx", "notifyType": "0,3", "couponTemplateId": 1826268813595824129, "sendType": 0, "sendTime": "2024-07-12 12:00:00" }
消息队列发送日志如下所示:
2024-08-24T20:04:48.025+08:00 INFO 18768 --- [io-10010-exec-3] c.n.o.m.a.s.impl.CouponTaskServiceImpl : [生产者] 执行优惠券分发任务 - 发送结果:SEND_OK,消息ID:2408820760D4CCC0F015B3A56F47B9554950251A69D77AC14AFA0001,消息Keys:d14090a8-ee9b-4ae6-b4db-43c9d9f72968
3. RocketMQ 控制台查看发送结果
因为我们目前还没有开发分发服务,所以没办法查看消费日志,只能通过控制台查看。通过下图得知消息是正常发送成功了。
编辑
什么是模板方法设计模式?
1. 需求背景
大家可能会注意到,每次发送消息时,总是充斥着大量相同的冗余代码,这些逻辑散落在业务代码中,不利于对核心业务的理解和维护。那我们有没有方法进行抽象出来?
在开始写代码前,我们一般会做方案设计,其中主要的就是我们的预期是什么?
- 将业务代码和消息发送的代码解耦,最好情况下业务代码里只有一行消息发送逻辑。
- 抽象消息发送逻辑,比如调用发送接口和发送日志打印,不同的 Topic 配置等由业务方自定义。
2. 模板方法设计模式
模板方法设计模式是一种行为设计模式,它在一个方法中定义了一个操作的框架,而将一些步骤的实现延迟到子类中。通过这种方式,模板方法允许子类在不改变算法结构的情况下重新定义算法中的某些步骤。
编辑
通俗来讲 : 定义一个抽象类 AbstractTemplate
,并定义一个或若干抽象方法 abstractMethod
。
由子类去继承抽象类的同时实现抽象方法, 在抽象类的 operation
方法中调用抽象方法,最终调用的就是不同子类实现的方法逻辑。
模板方法模式有什么优点?
- 复用性:核心思想就是父级定义公共实现,比如说定义消息队列消息发送接口和日志打印。
- 扩展性:不同的实现被下沉到继承抽象类的子类中,这样在调用时会根据子类的差异自动调用对应的逻辑。
如果不好理解,可以看下面消息队列发送者重构实战代码。
模板方法设计模式重构消息发送
1. 定义消息发送事件基础实体
为了让代码更具备规范和扩展性,我们定义一个消息发送实体。
package com.nageoffer.onecoupon.merchant.admin.mq.base; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * 消息发送事件基础扩充属性实体 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-13 */ @Data @NoArgsConstructor @AllArgsConstructor @Builder public final class BaseSendExtendDTO { /** * 事件名称 */ private String eventName; /** * 主题 */ private String topic; /** * 标签 */ private String tag; /** * 业务标识 */ private String keys; /** * 发送消息超时时间 */ private Long sentTimeout; /** * 具体延迟时间 */ private Long delayTime; }
另外,有些和业务无关的属性,我们再抽象一层 Wrapper 类,用于定义消息发送基础内容。
package com.nageoffer.onecoupon.merchant.admin.mq.base; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.UUID; /** * 消息体包装器 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-13 */ @Data @Builder @NoArgsConstructor(force = true) @AllArgsConstructor @RequiredArgsConstructor public final class MessageWrapper<T> implements Serializable { private static final long serialVersionUID = 1L; /** * 消息发送 Keys */ @NonNull private String keys; /** * 消息体 */ @NonNull private T message; /** * 消息发送时间 */ private Long timestamp = System.currentTimeMillis(); }
2. 定义抽象消息发送类
我们将消息发送的逻辑和结果日志的打印进行了抽象,也就是抽象方法模式中的复用性。并且,我们将消息发送事件的基本参数(如 Topic、Tag、是否延迟消息等)以及 Keys 的个性化属性独立为两个抽象方法。
package com.nageoffer.onecoupon.merchant.admin.mq.producer; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson2.JSON; import com.nageoffer.onecoupon.merchant.admin.mq.base.BaseSendExtendDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; /** * RocketMQ 抽象公共发送消息组件 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-13 */ @RequiredArgsConstructor @Slf4j(topic = "CommonSendProduceTemplate") public abstract class AbstractCommonSendProduceTemplate<T> { private final RocketMQTemplate rocketMQTemplate; /** * 构建消息发送事件基础扩充属性实体 * * @param messageSendEvent 消息发送事件 * @return 扩充属性实体 */ protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent); /** * 构建消息基本参数,请求头、Keys... * * @param messageSendEvent 消息发送事件 * @param requestParam 扩充属性实体 * @return 消息基本参数 */ protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam); /** * 消息事件通用发送 * * @param messageSendEvent 消息发送事件 * @return 消息发送返回结果 */ public SendResult sendMessage(T messageSendEvent) { BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent); SendResult sendResult; try { // 构建 Topic 目标落点 formats: `topicName:tags` StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic()); if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) { destinationBuilder.append(":").append(baseSendExtendDTO.getTag()); } // 延迟时间不为空,发送任意延迟消息,否则发送普通消息 if (baseSendExtendDTO.getDelayTime() != null) { sendResult = rocketMQTemplate.syncSendDeliverTimeMills( destinationBuilder.toString(), buildMessage(messageSendEvent, baseSendExtendDTO), baseSendExtendDTO.getDelayTime() ); } else { sendResult = rocketMQTemplate.syncSend( destinationBuilder.toString(), buildMessage(messageSendEvent, baseSendExtendDTO), baseSendExtendDTO.getSentTimeout() ); } log.info("[生产者] {} - 发送结果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys()); } catch (Throwable ex) { log.error("[生产者] {} - 消息发送失败,消息体:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex); throw ex; } return sendResult; } }
3. 定义消息发送事件
为了规范化流程,我们将消息队列中的数据定义为事件。虽然优惠券推送任务看似是单一任务,没有必要单独定义为事件,但在大多数情况下,消息队列中会传递多个参数,因此将其作为事件处理更具合理性和一致性。
package com.nageoffer.onecoupon.merchant.admin.mq.event; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * 优惠券推送任务执行事件 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-23 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class CouponTaskExecuteEvent { /** * 推送任务id */ private Long couponTaskId; }
4. 定义消息队列生产者
消息队列生产者继承了我们的消息发送抽象类,并实现了两个抽象方法,从而体现了模板方法设计模式的扩展性。在业务代码中,我们只需引入消息发送生产者,即可通过简洁的一行代码完成消息发送流程。
package com.nageoffer.onecoupon.merchant.admin.mq.producer; import cn.hutool.core.util.StrUtil; import com.nageoffer.onecoupon.merchant.admin.mq.base.BaseSendExtendDTO; import com.nageoffer.onecoupon.merchant.admin.mq.base.MessageWrapper; import com.nageoffer.onecoupon.merchant.admin.mq.event.CouponTaskExecuteEvent; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.util.UUID; /** * 优惠券推送任务执行生产者 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-23 */ @Slf4j @Component public class CouponTaskActualExecuteProducer extends AbstractCommonSendProduceTemplate<CouponTaskExecuteEvent> { private final ConfigurableEnvironment environment; public CouponTaskActualExecuteProducer(@Autowired RocketMQTemplate rocketMQTemplate, @Autowired ConfigurableEnvironment environment) { super(rocketMQTemplate); this.environment = environment; } @Override protected BaseSendExtendDTO buildBaseSendExtendParam(CouponTaskExecuteEvent messageSendEvent) { return BaseSendExtendDTO.builder() .eventName("优惠券推送执行") .keys(String.valueOf(messageSendEvent.getCouponTaskId())) .topic(environment.resolvePlaceholders("one-coupon_distribution-service_coupon-task-execute_topic${unique-name:}")) .sentTimeout(2000L) .build(); } @Override protected Message<?> buildMessage(CouponTaskExecuteEvent couponTaskExecuteEvent, BaseSendExtendDTO requestParam) { String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys(); return MessageBuilder .withPayload(new MessageWrapper(keys, couponTaskExecuteEvent)) .setHeader(MessageConst.PROPERTY_KEYS, keys) .setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()) .build(); } }
5. 业务 UML 类图
模板方法模式其实很简单。我们定义一个抽象类,其中包含一个通用的执行方法,同时声明一些抽象方法。在通用执行方法中,我们调用这些抽象方法,并由子类实现它们。通过切换不同的子类,我们可以实现不同的抽象方法逻辑。这样,模板方法模式就能灵活地处理各种具体需求,同时保持代码的结构和可维护性。
编辑
6. 业务中调用消息发送
接下来,我们将用基于模板方法模式实现的消息发送流程替换现有代码,看看其带来的改进和提升。
现在的消息发送代码:
// 使用 RocketMQ5.x 发送任意时间延时消息 // 定义 Topic String couponTemplateDelayCloseTopic = "one-coupon_distribution-service_coupon-task-execute_topic${unique-name:}"; // 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中 couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic); // 构建消息体 String messageKeys = UUID.randomUUID().toString(); Message<Long> message = MessageBuilder .withPayload(couponTaskDO.getId()) .setHeader(MessageConst.PROPERTY_KEYS, messageKeys) .build(); // 执行 RocketMQ5.x 消息队列发送&异常处理逻辑 SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(couponTemplateDelayCloseTopic, message, 2000L); log.info("[生产者] 执行优惠券分发任务 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys); } catch (Exception ex) { log.error("[生产者] 执行优惠券分发任务 - 消息发送失败,消息体:{}", couponTaskDO.getId(), ex); }
重构后的消息发送代码:
// 执行优惠券推送业务,正式向用户发放优惠券 CouponTaskExecuteEvent couponTaskExecuteEvent = CouponTaskExecuteEvent.builder() .couponTaskId(couponTaskDO.getId()) .build(); couponTaskActualExecuteProducer.sendMessage(couponTaskExecuteEvent);
可以看到,新的消息发送代码非常简单,简单两行代码即可完成消息发送。
7. 重构消息消费者
因为我们将消息发送的内容重构成了一个个 Event,所以我们消息消费者也需要进行对应的改动。实现 RocketMQListener
接口中的泛型变更为 MessageWrapper<CouponTemplateDelayEvent>
,修改后即可完成所有消息队列重构改造。
package com.nageoffer.onecoupon.merchant.admin.mq.consumer; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTemplateStatusEnum; import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTemplateDO; import com.nageoffer.onecoupon.merchant.admin.mq.base.MessageWrapper; import com.nageoffer.onecoupon.merchant.admin.mq.event.CouponTemplateDelayEvent; import com.nageoffer.onecoupon.merchant.admin.service.CouponTemplateService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 优惠券推送延迟执行-变更记录发送状态消费者 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-08-21 */ @Component @RequiredArgsConstructor @RocketMQMessageListener( topic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}", consumerGroup = "one-coupon_merchant-admin-service_coupon-template-delay-status_cg${unique-name:}" ) @Slf4j(topic = "CouponTemplateDelayExecuteStatusConsumer") public class CouponTemplateDelayExecuteStatusConsumer implements RocketMQListener<MessageWrapper<CouponTemplateDelayEvent>> { private final CouponTemplateService couponTemplateService; @Override public void onMessage(MessageWrapper<CouponTemplateDelayEvent> messageWrapper) { // 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等) log.info("[消费者] 优惠券模板定时执行@变更模板表状态 - 执行消费逻辑,消息体:{}", JSON.toJSONString(messageWrapper)); // 修改指定优惠券模板状态为已结束 CouponTemplateDelayEvent message = messageWrapper.getMessage(); LambdaUpdateWrapper<CouponTemplateDO> updateWrapper = Wrappers.lambdaUpdate(CouponTemplateDO.class) .eq(CouponTemplateDO::getShopNumber, message.getShopNumber()) .eq(CouponTemplateDO::getId, message.getCouponTemplateId()) .set(CouponTemplateDO::getStatus, CouponTemplateStatusEnum.ENDED.getStatus()); couponTemplateService.update(updateWrapper); } }
文末总结
模板方法模式允许我们将消息发送的通用逻辑抽象到基类中,只需要在子类中实现具体的消息构建和配置方法。这样,在业务逻辑中,我们只需引入相应的消息生产者,并调用简单的接口,就能完成消息发送。最终,我们成功将冗余代码减少到最小,实现了高效、灵活的消息队列发送机制。
完结,撒花 🎉
▪第15小节:开发XXL-Job定时任务执行分发数据
业务背景
优惠券分发任务分为两种类型:立即执行和定时执行。对于立即执行的任务,我们直接通过消息队列触发发送流程;而定时执行的任务则由定时任务监控系统扫描,找到到达执行时间的任务,然后通过 XXL-Job 分布式定时框架进行处理。
编辑
Git 分支
20240825_dev_coupon-task-timing_xxl-job_ding.ma
什么是 XXL-Job
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
- XXL-Job GitHub 地址:https://github.com/xuxueli/xxl-job
- 官方网站:https://www.xuxueli.com/xxl-job
XXL-Job 2.4.0 架构图如下所示:
编辑
安装 XXL-Job
考虑到使用 Docker 安装可能会涉及 Mac 英特尔和 ARM 芯片的区别,为了避免复杂性扩散,所以我们这里仅使用最为原始且高效的方式,拉取源代码构建的方式启动。
1. IDEA 下载 XXL-Job 源代码
通过 Git SSH 的方式拉取 XXL-Job 源代码仓库,SSH 地址:git@github.com:xuxueli/xxl-job.git
我们从 master 分支切换到截止目前 2024.08.25 日最新的发版分支 2.4.1 版本。
编辑
2. 初始化 XXL-Job 数据库
MySQL 中创建名称为 xxl_job
的数据库,并执行下述 SQL 语句:
# # XXL-JOB v2.4.1 # Copyright (c) 2015-present, xuxueli. CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci; use `xxl_job`; SET NAMES utf8mb4; CREATE TABLE `xxl_job_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', `job_desc` varchar(255) NOT NULL, `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `author` varchar(64) DEFAULT NULL COMMENT '作者', `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件', `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型', `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型', `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略', `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略', `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数', `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数', `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型', `glue_source` mediumtext COMMENT 'GLUE源代码', `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔', `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', `job_id` int(11) NOT NULL COMMENT '任务,主键ID', `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址', `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数', `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数', `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间', `trigger_code` int(11) NOT NULL COMMENT '调度-结果', `trigger_msg` text COMMENT '调度-日志', `handle_time` datetime DEFAULT NULL COMMENT '执行-时间', `handle_code` int(11) NOT NULL COMMENT '执行-状态', `handle_msg` text COMMENT '执行-日志', `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败', PRIMARY KEY (`id`), KEY `I_trigger_time` (`trigger_time`), KEY `I_handle_code` (`handle_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log_report` ( `id` int(11) NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间', `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量', `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量', `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_logglue` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_id` int(11) NOT NULL COMMENT '任务,主键ID', `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型', `glue_source` mediumtext COMMENT 'GLUE源代码', `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注', `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_registry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `registry_group` varchar(50) NOT NULL, `registry_key` varchar(255) NOT NULL, `registry_value` varchar(255) NOT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `app_name` varchar(64) NOT NULL COMMENT '执行器AppName', `title` varchar(12) NOT NULL COMMENT '执行器名称', `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入', `address_list` text COMMENT '执行器地址列表,多地址逗号分隔', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL COMMENT '账号', `password` varchar(50) NOT NULL COMMENT '密码', `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员', `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割', PRIMARY KEY (`id`), UNIQUE KEY `i_username` (`username`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_lock` ( `lock_name` varchar(50) NOT NULL COMMENT '锁名称', PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); commit;
我们点击 application.properties
修改其中的 MySQL 连接配置,比如我本地默认密码是 root,就需要改动下。
因为本地 8080 端口容易被占用,那我们最好提前改下 server.port
,比如改为 8088 或其他没有被使用的端口。在这里我们修改为 8088 端口。
编辑
3. 启动 XXL- Job 服务
有一点不得不说,XXL-Job 的 JDK 适配做的真的好,我一开始以为只有 JDK8 能启动,刚拉下来的时候默认 JDK21 了,发现启动没问题,点赞 👍。
编辑
访问 XXL-Job 控制台地址 http://localhost:8088/xxl-job-admin,出现控制台页面即可。
默认用户名密码:admin/123456
编辑
配置 XXL-Job 执行器
1. 创建执行器
XXL-Job 执行器是一个运行在目标服务器上的应用程序模块,用于实际执行由调度中心下发的任务。执行器可以看作是任务的“工作节点”,负责接收调度中心发送的任务调度请求并执行具体的任务逻辑。
创建执行器,配置如下:
- AppName:one-coupon-merchant-admin
- 名称:牛券后管平台
编辑
可以看到创建成功。有同学会问,为什么机器地址为空,因为我们的项目还没有引入 XXL-Job,等引入后这里就有机器地址了。
编辑
2. 创建执行器任务
执行器可以看作是和我们应用系统一一对应,那执行器任务就是应用系统里里定时任务。
编辑
点击保存按钮,页面刷新得知任务创建成功。
编辑
3. 执行器任务参数
有些同学可能会提出疑问,关于运行模式、路由策略、调度过期策略、阻塞处理策略分别都是什么意思?我们详细来说一下。
3.1 运行模式
- BEAN 模式:通过 Spring 管理的 Bean 来执行任务。
- GLUE 模式:通过在 XXL-Job 控制台上直接编写的脚本(如 Groovy、Java 等)来执行任务。
3.2 路由策略
因为我们服务可能会启动多个应用实例,所以选择哪个执行器服务调用就需要我们选择,也就是路由策略。
- 第一个:选择执行器列表中的第一个执行器来执行任务。适用于简单场景或对负载均衡要求不高的场景。
- 最后一个:选择执行器列表中的最后一个执行器来执行任务。通常用于测试或特定需要时使用。
- 轮询:采用轮询方式依次选择执行器来执行任务,每次调度都会选择下一个执行器,依次循环。适用于需要均衡任务在所有执行器上的负载的场景,简单且有效的负载均衡策略。
- 随机:从可用的执行器列表中随机选择一个执行器来执行任务。适用于负载均衡且不在意特定执行器的场景。
- 一致性HASH:基于任务的特征(如任务参数或任务 ID)计算哈希值,然后选择对应的执行器。适用于需要确保相同特征的任务始终由同一个执行器执行的场景,如缓存命中或分布式锁的场景。
- 最不经常使用:选择执行任务次数最少的执行器来执行任务。适用于需要避免热点执行器、希望均衡执行器使用频率的场景。
- 最近最少使用:选择最近最少使用的执行器来执行任务。适用于希望在所有执行器之间均衡调度,同时避免某些执行器长时间闲置的场景。
- 故障转移:如果一个执行器执行失败,则尝试在其他执行器上执行任务,直到成功为止。适用于对任务执行可靠性要求较高的场景,需要确保任务至少执行一次。
- 忙碌转移:将任务交给当前任务队列最少的执行器。如果所有执行器都忙碌,则会选择任务队列最少的执行器执行。适用于需要尽可能均衡执行器任务负载的场景。
- 分片广播:任务被广播到所有的执行器节点上,每个执行器执行一遍。此策略主要用于分片任务的调度,配合任务分片参数可以实现分片执行。适用于大数据量任务需要在多台执行器上同时执行的场景。
3.3 调度过期策略
调度过期策略用于处理调度系统中的任务错过预定触发时间的情况。由于网络延迟、执行器性能问题或系统繁忙等原因,任务可能无法按时执行。
- 忽略:当任务的实际触发时间已经晚于预定的触发时间(即任务过期)时,直接忽略掉此次任务调度,不执行该任务。适用于不要求所有调度任务都必须执行的场景。例如,某些统计任务如果错过了时间就没有执行的必要。
- 立即执行一次:当任务的实际触发时间已经晚于预定触发时间时,即使过期也会立即执行该任务。该策略保证任务在调度成功后一定会被执行。适用于对任务执行有严格要求,必须确保任务被执行的场景。例如,重要的数据处理任务,即使稍有延迟也需要保证执行。
3.4 阻塞处理策略
阻塞处理策略用于应对调度任务的并发控制问题。当调度系统触发了一个任务,但该任务的上一次执行尚未结束或当前执行器正在执行其他任务时,就可能出现任务阻塞的情况。
- 单机串行:在同一台执行器中,同一个任务只能串行执行,即任务执行按顺序排队,等待上一次执行完成后再执行下一个。如果上一次执行未结束,新触发的任务将被阻塞,直到前一个任务完成。适合对任务有严格的顺序要求,或者任务本身是互斥的情况。例如,某些需要对同一个资源进行更新的任务,避免数据冲突或并发写问题。
- 丢弃后续调度:如果上一次任务尚未执行完成,则丢弃当前触发的任务,不再等待或排队执行。也就是说,只会保留最后一次任务的执行请求。适合于对执行频率要求较高但不在意丢失部分执行的任务场景。例如,某些定期的检查任务或快照任务,可以接受丢失部分调度。
- 覆盖之前调度:如果上一次任务尚未执行完成,则终止当前正在执行的任务,立即执行新触发的任务。这种策略会将之前的任务中断,保证最新的任务总能及时得到执行。适合于最新任务必须获得优先处理的情况,例如某些实时监控任务或报警任务,优先处理最新数据或事件。
4. 如何测试定时表达式正确?
我们设置完定时表达式后,不知道触发时间是不是正确,可以通过 XXL-Job 的下次执行时间功能验证。
编辑
开发定时执行优惠券分发任务
1. 引入 XXL-Job Maven 依赖
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.4.1</version> </dependency>
2. 设置 XXL-Job 配置类
2.1 配置 application.yaml
xxl-job: access-token: default_token admin: addresses: http://localhost:8088/xxl-job-admin executor: application-name: one-coupon-merchant-admin ip: 127.0.0.1 log-retention-days: 30 port: 19999
2.2 配置 XXLJobConfiguration
之前我们说过通过 SpringBoot Starter 可以帮助我们快速引入组件库,不需要繁琐的 Spring Bean 配置。很明显,XXL-Job 就没有适配 Starter,需要我们配置相关的 SpringBean 配置。
package com.nageoffer.onecoupon.merchant.admin.config; import cn.hutool.core.util.StrUtil; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.nio.file.Paths; /** * XXL-Job 配置类 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Configuration public class XXLJobConfiguration { @Value("${xxl-job.admin.addresses:}") private String adminAddresses; @Value("${xxl-job.access-token:}") private String accessToken; @Value("${xxl-job.executor.application-name}") private String applicationName; @Value("${xxl-job.executor.ip}") private String ip; @Value("${xxl-job.executor.port}") private int port; @Value("${xxl-job.executor.log-path:}") private String logPath; @Value("${xxl-job.executor.log-retention-days}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(applicationName); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(StrUtil.isNotEmpty(accessToken) ? accessToken : null); xxlJobSpringExecutor.setLogPath(StrUtil.isNotEmpty(logPath) ? logPath : Paths.get("").toAbsolutePath().getParent() + "/tmp"); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
2.3 查看 XXL-Job 执行器地址
完成上述配置后,我们尝试启动项目,然后访问 http://localhost:8088/xxl-job-admin/jobgroup 查看执行器地址是否有值。如果正确有数据即为创建成功。
编辑
3. 编写 XXL-Job 处理器
开发 XXL-Job 调用处理类 CouponTaskJobHandler
完成定时调用。
package com.nageoffer.onecoupon.merchant.admin.job; import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskStatusEnum; import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTaskDO; import com.nageoffer.onecoupon.merchant.admin.dao.mapper.CouponTaskMapper; import com.nageoffer.onecoupon.merchant.admin.mq.event.CouponTaskExecuteEvent; import com.nageoffer.onecoupon.merchant.admin.mq.producer.CouponTaskActualExecuteProducer; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; /** * 优惠券推送任务扫描定时发送记录 XXL-Job 处理器 * <p> * 作者:马丁 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" /> * 开发时间:2024-07-12 */ @Component @RequiredArgsConstructor public class CouponTaskJobHandler extends IJobHandler { private final CouponTaskMapper couponTaskMapper; private final CouponTaskActualExecuteProducer couponTaskActualExecuteProducer; private static final int MAX_LIMIT = 100; @XxlJob(value = "couponTemplateTask") public void execute() throws Exception { long initId = 0; Date now = new Date(); while (true) { // 获取已到执行时间待执行的优惠券定时分发任务 List<CouponTaskDO> couponTaskDOList = fetchPendingTasks(initId, now); if (CollUtil.isEmpty(couponTaskDOList)) { break; } // 调用分发服务对用户发送优惠券 for (CouponTaskDO each : couponTaskDOList) { distributeCoupon(each); } // 查询出来的数据如果小于 MAX_LIMIT 意味着后面将不再有数据,返回即可 if (couponTaskDOList.size() < MAX_LIMIT) { break; } // 更新 initId 为当前列表中最大 ID initId = couponTaskDOList.stream() .mapToLong(CouponTaskDO::getId) .max() .orElse(initId); } } private void distributeCoupon(CouponTaskDO couponTask) { // 修改延时执行推送任务任务状态为执行中 CouponTaskDO couponTaskDO = CouponTaskDO.builder() .id(couponTask.getId()) .status(CouponTaskStatusEnum.IN_PROGRESS.getStatus()) .build(); couponTaskMapper.updateById(couponTaskDO); // 通过消息队列发送消息,由分发服务消费者消费该消息 CouponTaskExecuteEvent couponTaskExecuteEvent = CouponTaskExecuteEvent.builder() .couponTaskId(couponTask.getId()) .build(); couponTaskActualExecuteProducer.sendMessage(couponTaskExecuteEvent); } private List<CouponTaskDO> fetchPendingTasks(long initId, Date now) { LambdaQueryWrapper<CouponTaskDO> queryWrapper = Wrappers.lambdaQuery(CouponTaskDO.class) .eq(CouponTaskDO::getStatus, CouponTaskStatusEnum.PENDING.getStatus()) .le(CouponTaskDO::getSendTime, now) .gt(CouponTaskDO::getId, initId) .last("LIMIT " + MAX_LIMIT); return couponTaskMapper.selectList(queryWrapper); } }
execute()
这是 XXL-Job 调度任务的入口方法。当定时任务触发时,XXL-Job 框架会调用此方法。
首先方法会初始化变量,initId
用于标识已经处理过的任务的最大 ID,now
用于记录当前时间。接下来会执行 while 循环,逻辑如下所示:
- 调用
fetchPendingTasks(initId, now)
方法获取符合条件的待执行任务列表。 - 如果
couponTaskDOList
为空,意味着没有更多的任务需要处理,循环终止。 - 遍历
couponTaskDOList
,对每个任务调用distributeCoupon(each)
方法,将任务修改状态变更为执行中,并发送到消息队列进行异步处理。 - 检查当前获取的任务列表大小,如果小于
MAX_LIMIT
,表示已经是最后一批数据,循环终止。 - 更新
initId
为当前批次中最大的任务 ID,以确保下一次循环获取到新的任务。
调用逻辑时序图如下所示:
编辑
4. 创建定时执行的分发任务
首先创建一条定时发送类型的优惠券分发任务。
编辑
5. 通过 XXL-Job 执行定时任务
通过 http://localhost:8088/xxl-job-admin/jobinfo 执行一次任务查看是否可以调用到我们的服务。
编辑
通过我们的日志得知,调用成功了。
2024-08-25T17:58:58.685+08:00 INFO 28911 --- [dPool-836404352] c.xxl.job.core.executor.XxlJobExecutor : >>>>>>>>>>> xxl-job regist JobThread success, jobId:1, handler:com.xxl.job.core.handler.impl.MethodJobHandler@bedebe9[class com.nageoffer.onecoupon.merchant.admin.job.CouponTaskJobHandler#execute] Creating a new SqlSession SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@b15c2b3] was not registered for synchronization because synchronization is not active JDBC Connection [HikariProxyConnection@849828015 wrapping org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection@2b15cde] will not be managed by Spring ==> Preparing: SELECT id,shop_number,batch_id,task_name,file_address,fail_file_address,send_num,notify_type,coupon_template_id,send_type,send_time,status,completion_time,operator_id,create_time,update_time,del_flag FROM t_coupon_task WHERE (status = ? AND send_time <= ? AND id > ?) LIMIT 100 ==> Parameters: 0(Integer), 2024-08-25 17:58:58.687(Timestamp), 0(Long) 2024-08-25T17:58:58.709+08:00 INFO 28911 --- [1-1724579938685] ShardingSphere-SQL : Logic SQL: SELECT id,shop_number,batch_id,task_name,file_address,fail_file_address,send_num,notify_type,coupon_template_id,send_type,send_time,status,completion_time,operator_id,create_time,update_time,del_flag FROM t_coupon_task WHERE (status = ? AND send_time <= ? AND id > ?) LIMIT 100 2024-08-25T17:58:58.709+08:00 INFO 28911 --- [1-1724579938685] ShardingSphere-SQL : Actual SQL: ds_0 ::: SELECT id,shop_number,batch_id,task_name,file_address,fail_file_address,send_num,notify_type,coupon_template_id,send_type,send_time,status,completion_time,operator_id,create_time,update_time,del_flag FROM t_coupon_task WHERE (status = ? AND send_time <= ? AND id > ?) LIMIT 100 ::: [0, 2024-08-25 17:58:58.687, 0] <== Columns: id, shop_number, batch_id, task_name, file_address, fail_file_address, send_num, notify_type, coupon_template_id, send_type, send_time, status, completion_time, operator_id, create_time, update_time, del_flag <== Row: 1827643671885918209, 1810714735922956666, 1827643671873335296, 发送百万优惠券推送任务, /Users/machen/workspace/opensource/onecoupon-rebuild/tmp/oneCoupon任务推送Excel.xlsx, null, 1000000, 0,3, 1826268813595824129, 1, 2024-07-12 12:00:00, 0, null, 1810518709471555585, 2024-08-25 17:46:22, 2024-08-25 17:46:27, 0 <== Total: 1 Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@b15c2b3] Creating a new SqlSession SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6f3a67a0] was not registered for synchronization because synchronization is not active JDBC Connection [HikariProxyConnection@1489831976 wrapping org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection@2b15cde] will not be managed by Spring ==> Preparing: UPDATE t_coupon_task SET status=?, update_time=? WHERE id=? ==> Parameters: 1(Integer), 2024-08-25 17:58:58.712(Timestamp), 1827643671885918209(Long) 2024-08-25T17:58:58.713+08:00 INFO 28911 --- [1-1724579938685] ShardingSphere-SQL : Logic SQL: UPDATE t_coupon_task SET status=?, update_time=? WHERE id=? 2024-08-25T17:58:58.713+08:00 INFO 28911 --- [1-1724579938685] ShardingSphere-SQL : Actual SQL: ds_0 ::: UPDATE t_coupon_task SET status=?, update_time=? WHERE id=? ::: [1, 2024-08-25 17:58:58.712, 1827643671885918209] <== Updates: 1 Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6f3a67a0] 2024-08-25T17:58:58.826+08:00 INFO 28911 --- [1-1724579938685] CommonSendProduceTemplate : [生产者] 优惠券推送执行 - 发送结果:SEND_OK,消息ID:2408820760D4CCC0F015B3A56F47B95570EF251A69D77F7475C90000,消息Keys:1827643671885918209
我们现在的 XXL-Job 定时任务是没有启动的,一般来说,测试环境验证没有问题,就会将任务开启进行周期运行。
编辑
SpringBoot 条件注解解耦 XXL-Job
为了保障大家启动牛券项目最小依赖原则,我们选择将 XXL-Job 的依赖权交给用户:即通过一个配置开关决定是否需要使用。
1. 添加启动开关
在 application.yaml
中添加一个 enabled
属性,默认为 true。我们这里设置为 false,意味着不开启 XXL-Job。
xxl-job: enabled: false
2. XXL-Job 配置添加条件注解
XXLJobConfiguration
配置类上添加 SpringBoot 条件注解,在特定情况下加载。
@Configuration @ConditionalOnProperty(prefix = "xxl-job", name = "enabled", havingValue = "true", matchIfMissing = true) public class XXLJobConfiguration { // ...... }
@ConditionalOnProperty
是 Spring 框架中的一个条件注解,主要用于在特定条件下启用或禁用 Spring Bean 或配置。通过该注解,可以实现基于配置属性的条件化 Bean 装配。这在需要灵活地控制应用程序组件的加载和启用时非常有用。
具体来说,@ConditionalOnProperty
通过检查 Spring 环境中的某些属性的值来决定一个 Bean 或配置类是否应该被实例化和加载。属性如下:
- prefix:表示要检查的属性的前缀为
xxl-job
。 - name:指定要检查的属性名称是
enabled
,结合前缀,完整的属性名称为xxl-job.enabled
。 - havingValue:指定属性
xxl-job.enabled
的值应该为"true"
,当且仅当属性值匹配"true"
时,相关的 Bean 或配置才会被加载。 - matchIfMissing:如果未找到属性
xxl-job.enabled
,将默认视为匹配。
通过该方式,大家启动 main 分支下的 MerchantAdminApplication
启动类,是不需要强依赖 XXL-Job 的。
文末总结
在本章节我们使用 XXL-Job 来实现定时任务的执行和分发,重点涉及到 XXL-Job 的安装、配置以及在实际业务场景中的应用。然后在 Spring Boot 项目中引入 XXL-Job 的依赖并配置相应的 Spring Bean,从而实现了定时执行优惠券分发任务的功能。
完结,撒花 🎉
Comments NOTHING