第21小节:优惠券分发失败记录深分页优化
业务背景
在执行优惠券分发时,对于因库存不足或用户已领取过优惠券而导致的分发失败情况,我们会记录这些失败的分发记录。从企业的实际业务需求来看,这些记录可以通过后台分页查询,或者在分发完成后将失败记录写入 Excel 供运营人员查看。显然,后者更为优越,因为这些失败记录存储在数据库中价值不大,将其保存到 Excel 文件中不仅方便查看,还能节省数据库的存储空间。
我们还需要考虑一个问题:错误分发的情况会很多吗?实际上,这种情况并不常见。但只要是人为操作,总可能出现漏洞,例如未检查优惠券模板的库存,或者提取的用户数据有误(用户已领取过优惠券)。在这种极端情况下,我们需要通过分页的情况 limit offset, count 的形式进行按批次读取,例如下述 SQL:
select*from t_coupon_task_fail where batch_id ='xxxxxx'limit0,1000;
然而,许多人常常忽视一个问题:当 MySQL 在处理分页很深的数据时,尤其是读取大数据量中的靠后部分时,会遇到深度分页问题。
什么是深分页?
MySQL 的深分页问题指的是在使用分页查询时,随着页数的增加,查询的效率会显著降低,尤其是对于大量数据的表。这通常发生在使用 LIMIT
子句进行分页时,例如 LIMIT offset, count
。当 offset
值很大时,MySQL 需要扫描和跳过大量的行,这会导致查询变慢。
对于 SELECT * FROM table LIMIT 1000000, 10
这样的查询,MySQL 会扫描前 1000000 条记录,然后丢弃它们,只返回接下来的 10 条。这意味着即使最终结果只包含 10 条记录,MySQL 仍然需要处理并丢弃大量的行,导致查询时间随着 offset
增大而增加。
Git 分支
20240904_dev_coupon-distribute-v3_limit-page_ding.ma
生成测试数据复现深分页
我们通过存储过程模拟 100 万的单个批次错误记录,确保数据有个偏真实的状态。主要是数据量小了模拟不出来深分页的效果。
DELIMITER $$
CREATE PROCEDURE insert_coupon_task_fail_records()
BEGIN
DECLARE i INT DEFAULT 1;
DECLARE batch_size INT DEFAULT 1000; -- 每批插入1000条记录
DECLARE json_data TEXT;
WHILE i <= 1000000 DO
-- 开启一个事务
START TRANSACTION;
-- 执行批量插入
SET @end = i + batch_size - 1;
WHILE i <= @end DO
SET json_data = CONCAT('{"key": "value_', i, '"}');
INSERT INTO `t_coupon_task_fail` (`batch_id`, `json_object`)
VALUES (1830889785603571980, json_data);
SET i = i + 1;
END WHILE;
-- 提交事务
COMMIT;
END WHILE;
END$$
DELIMITER ;
通过执行这个存储过程,模拟出 100 万记录,大概执行在 10 秒以上。
CALL insert_coupon_task_fail_records();
我们先来看看执行查询前 10 条数据 SQL 执行时间是多久,根据执行返回结果来看,返回大概在 500 µs 左右。
select*from t_coupon_task_fail where batch_id ='1830889785603571980'limit0,10;
如果查询最后几条数据,那么这个 SQL 的平均效应时间是 600 ms,相对于前面的数据性能相差了近 1200 倍!这个数据是很恐怖的。
select*from t_coupon_task_fail where batch_id ='1830889785603571980'limit999990,10;
1 毫秒(ms) = 1000 微秒(µs)
如果我们多执行几次存储过程,让数据来到 500 万,查询性能来到了惊人的近 5 秒。所以,大家如果在项目中遇到类似于这种大数据量深分页的情况,一定要特别注意,稍微不小心就是性能深渊。

深分页主要问题不是读取某一次性能差,而是我们读取都后面的所有记录都很慢,越到后面越慢。举个例子,假设我们共需要去数据库读取 100 万数据,每次读取 1000 条,可能到 50 万就开始明显变慢了,最后就会演变成会慢 50万 / 1000 次。
如何解决深分页?
为了方便大家很直观能理解不同深分页的性能差距,我们接下来的示例会按照 500 万数据的基数进行测试。
1. 子查询优化
我们通过子查询的方式进行优化,优化语句如下:
mysql> select * from t_coupon_task_fail where id >=
->
-> (SELECT id
-> FROM t_coupon_task_fail
-> WHERE batch_id = '1830889785603571980'
-> ORDER BY id
-> LIMIT 4999990, 1
-> )
->
-> limit 10;
+---------+---------------------+--------------------------+
| id | batch_id | json_object |
+---------+---------------------+--------------------------+
| 4999991 | 1830889785603571980 | {"key": "value_999991"} |
| 4999992 | 1830889785603571980 | {"key": "value_999992"} |
| 4999993 | 1830889785603571980 | {"key": "value_999993"} |
| 4999994 | 1830889785603571980 | {"key": "value_999994"} |
| 4999995 | 1830889785603571980 | {"key": "value_999995"} |
| 4999996 | 1830889785603571980 | {"key": "value_999996"} |
| 4999997 | 1830889785603571980 | {"key": "value_999997"} |
| 4999998 | 1830889785603571980 | {"key": "value_999998"} |
| 4999999 | 1830889785603571980 | {"key": "value_999999"} |
| 5000000 | 1830889785603571980 | {"key": "value_1000000"} |
+---------+---------------------+--------------------------+
10 rows in set (0.61 sec)
可以看到,同样的 SQL 语句,性能从近 5 秒下降到了 610 毫秒。
那我们再通过 explain 语句,分析下为什么能够优化深分页问题:

根据执行计划得知,子查询查询是用到了索引。首先在索引上拿到了聚集索引的主键 ID 省去了回表操作,然后第二查询直接根据第一个查询的 ID 往后再去查就可以了。
但是,有个不好的消息,这种方式性能并不是恒定的,如果我们查询 100 万数据后的 10 条记录,性能要把这个更快。
mysql> select * from t_coupon_task_fail where id >=
->
-> (SELECT id
-> FROM t_coupon_task_fail
-> WHERE batch_id = '1830889785603571980'
-> ORDER BY id
-> LIMIT 999990, 1
-> )
->
-> limit 10;
+---------+---------------------+--------------------------+
| id | batch_id | json_object |
+---------+---------------------+--------------------------+
| 999991 | 1830889785603571980 | {"key": "value_999991"} |
| 999992 | 1830889785603571980 | {"key": "value_999992"} |
| 999993 | 1830889785603571980 | {"key": "value_999993"} |
| 999994 | 1830889785603571980 | {"key": "value_999994"} |
| 999995 | 1830889785603571980 | {"key": "value_999995"} |
| 999996 | 1830889785603571980 | {"key": "value_999996"} |
| 999997 | 1830889785603571980 | {"key": "value_999997"} |
| 999998 | 1830889785603571980 | {"key": "value_999998"} |
| 999999 | 1830889785603571980 | {"key": "value_999999"} |
| 1000000 | 1830889785603571980 | {"key": "value_1000000"} |
+---------+---------------------+--------------------------+
10 rows in set (0.15 sec)
可以看到,查询 100 万数据仅需要 150 毫秒。这也就意味着如果数据量超过 500 万,性能会下降的更多,所以,这种深分页解决方案并不可取。
2. 延迟关联
新的解决方案叫法是延迟关联,其实本质上是通过 JOIN 关联的形式进行绑定,SQL 语句如下所示:
mysql> SELECT
-> t1.id,
-> t1.batch_id,
-> t1.json_object
-> FROM
-> t_coupon_task_fail t1
-> INNER JOIN (
-> SELECT
-> id
-> FROM
-> t_coupon_task_fail
-> WHERE
-> batch_id = '1830889785603571980'
-> ORDER BY
-> id
-> LIMIT 4999990,
-> 10) AS t2 on t1.id = t2.id
-> LIMIT 10;
+---------+---------------------+--------------------------+
| id | batch_id | json_object |
+---------+---------------------+--------------------------+
| 4999991 | 1830889785603571980 | {"key": "value_999991"} |
| 4999992 | 1830889785603571980 | {"key": "value_999992"} |
| 4999993 | 1830889785603571980 | {"key": "value_999993"} |
| 4999994 | 1830889785603571980 | {"key": "value_999994"} |
| 4999995 | 1830889785603571980 | {"key": "value_999995"} |
| 4999996 | 1830889785603571980 | {"key": "value_999996"} |
| 4999997 | 1830889785603571980 | {"key": "value_999997"} |
| 4999998 | 1830889785603571980 | {"key": "value_999998"} |
| 4999999 | 1830889785603571980 | {"key": "value_999999"} |
| 5000000 | 1830889785603571980 | {"key": "value_1000000"} |
+---------+---------------------+--------------------------+
10 rows in set (0.52 sec)
性能差不多也是 500 毫秒左右,和子查询优化一样,会随着数据量的增多,性能也会下降。
性能优化思路与子查询基本一致,只不过采用了 JOIN 的形式执行 SQL。

3. 书签记录
上面的两种方式其实解决深分页的思路是一致的,接下来要说的书签记录的方式可以说是性能最强的。
书签记录叫起来有点绕,很多同学不能第一时间知道是什么,从原理上说,属于是一种滚动查询。也就是说我们必须从第一页开始查询,然后获取本页最大的记录 ID,然后再根据大于最大记录 ID 的数据向后持续滚动。也就是说,我们如果想查询大于 4999990 后记录的 10 条,那我们就得知道 4999990 条的 ID。因为我们 ID 是递增的,所以直接查询即可。
mysql> SELECT
-> *
-> FROM
-> t_coupon_task_fail
-> WHERE
-> batch_id = '1830889785603571980'
-> AND id > 4999990
-> LIMIT 10;
+---------+---------------------+--------------------------+
| id | batch_id | json_object |
+---------+---------------------+--------------------------+
| 4999991 | 1830889785603571980 | {"key": "value_999991"} |
| 4999992 | 1830889785603571980 | {"key": "value_999992"} |
| 4999993 | 1830889785603571980 | {"key": "value_999993"} |
| 4999994 | 1830889785603571980 | {"key": "value_999994"} |
| 4999995 | 1830889785603571980 | {"key": "value_999995"} |
| 4999996 | 1830889785603571980 | {"key": "value_999996"} |
| 4999997 | 1830889785603571980 | {"key": "value_999997"} |
| 4999998 | 1830889785603571980 | {"key": "value_999998"} |
| 4999999 | 1830889785603571980 | {"key": "value_999999"} |
| 5000000 | 1830889785603571980 | {"key": "value_1000000"} |
+---------+---------------------+--------------------------+
10 rows in set (0.00 sec)
可以看到,我们的记录统计最小时间是秒,但是执行时间一但是微秒(µs)就没办法显示了,我通过数据库可视化工具看了下,大概是 500 微秒左右。
这个时候再给大家带来一个问题,如果数据量在几千万级别,它还能保证微妙级别的查询么?我们修改下存储过程,造大概 4000 万数据试试。
mysql> select count(*) from t_coupon_task_fail;
+----------+
| count(*) |
+----------+
| 40000000 |
+----------+
1 row in set (3.12 sec)
mysql> SELECT
-> *
-> FROM
-> t_coupon_task_fail
-> WHERE
-> batch_id = '1830889785603571980'
-> AND id > 39999990
-> LIMIT 10;
+----------+---------------------+---------------------------+
| id | batch_id | json_object |
+----------+---------------------+---------------------------+
| 39999991 | 1830889785603571980 | {"key": "value_19999991"} |
| 39999992 | 1830889785603571980 | {"key": "value_19999992"} |
| 39999993 | 1830889785603571980 | {"key": "value_19999993"} |
| 39999994 | 1830889785603571980 | {"key": "value_19999994"} |
| 39999995 | 1830889785603571980 | {"key": "value_19999995"} |
| 39999996 | 1830889785603571980 | {"key": "value_19999996"} |
| 39999997 | 1830889785603571980 | {"key": "value_19999997"} |
| 39999998 | 1830889785603571980 | {"key": "value_19999998"} |
| 39999999 | 1830889785603571980 | {"key": "value_19999999"} |
| 40000000 | 1830889785603571980 | {"key": "value_20000000"} |
+----------+---------------------+---------------------------+
10 rows in set (0.00 sec)
可以看到我们查询 4000 万记录,性能依旧是微妙级,基本上可以认定这个性能是恒定的。
但是这个有点不好的地方在于,不支持跳页,只能上一页和下一页。不过对我们系统来说无所谓,反正是读取所有失败分发的记录生成 Excel 就好。
分发失败记录生成 Excel
我们在 Excel 解析完成后发送的消息队列消费者里,将分发失败数据以书签记录的形式读取到,然后封装为新的失败 Excel,并将地址保存到分发任务记录中。
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<CouponTemplateDistributionEvent> messageWrapper) {
// ......
// 这里是分发任务结束标识为 TRUE,代表已经没有 Excel 记录
long initId = 0;
boolean isFirstIteration = true; // 用于标识是否为第一次迭代
String failFileAddress = excelPath + "/用户分发记录失败Excel-" + event.getCouponTaskBatchId() + ".xlsx";
// 这里应该上传云 OSS 或者 MinIO 等存储平台,但是增加部署成功并且不太好往简历写就仅写入本地
try (ExcelWriter excelWriter = EasyExcel.write(failFileAddress, UserCouponTaskFailExcelObject.class).build()) {
WriteSheet writeSheet = EasyExcel.writerSheet("用户分发失败Sheet").build();
while (true) {
List<CouponTaskFailDO> couponTaskFailDOList = listUserCouponTaskFail(event.getCouponTaskBatchId(), initId);
if (CollUtil.isEmpty(couponTaskFailDOList)) {
// 如果是第一次迭代且集合为空,则设置 failFileAddress 为 null
if (isFirstIteration) {
failFileAddress = null;
}
break;
}
// 标记第一次迭代已经完成
isFirstIteration = false;
// 将失败行数和失败原因写入 Excel 文件
List<UserCouponTaskFailExcelObject> excelDataList = couponTaskFailDOList.stream()
.map(each -> JSONObject.parseObject(each.getJsonObject(), UserCouponTaskFailExcelObject.class))
.toList();
excelWriter.write(excelDataList, writeSheet);
// 查询出来的数据如果小于 BATCH_USER_COUPON_SIZE 意味着后面将不再有数据,返回即可
if (couponTaskFailDOList.size() < BATCH_USER_COUPON_SIZE) {
break;
}
// 更新 initId 为当前列表中最大 ID
initId = couponTaskFailDOList.stream()
.mapToLong(CouponTaskFailDO::getId)
.max()
.orElse(initId);
}
}
// 确保所有用户都已经接到优惠券后,设置优惠券推送任务完成时间
CouponTaskDO couponTaskDO = CouponTaskDO.builder()
.id(event.getCouponTaskId())
.status(CouponTaskStatusEnum.SUCCESS.getStatus())
.completionTime(new Date())
.failFileAddress(failFileAddress)
.build();
couponTaskMapper.updateById(couponTaskDO);
}
/**
* 查询用户分发任务失败记录
*
* @param batchId 分发任务批次 ID
* @param maxId 上次读取最大 ID
* @return 用户分发任务失败记录集合
*/
private List<CouponTaskFailDO> listUserCouponTaskFail(Long batchId, Long maxId) {
LambdaQueryWrapper<CouponTaskFailDO> queryWrapper = Wrappers.lambdaQuery(CouponTaskFailDO.class)
.eq(CouponTaskFailDO::getBatchId, batchId)
.gt(CouponTaskFailDO::getId, maxId)
.last("LIMIT " + BATCH_USER_COUPON_SIZE);
return couponTaskFailMapper.selectList(queryWrapper);
}
可能有同学有疑问,在不查询是否有分发失败记录的时候就创建 EasyExcel 对象,是否有问题?
其实不会的,只有在第一次调用 excelWriter.write(xx, xx)
方法才会创建 Excel 文件,所以不用担心创建空记录 Excel 文件。如果写入 Excel 文件成功后,就能在牛券根目录 /tmp 文件夹下看到我们的记录。
└── tmp
├── oneCoupon任务推送Excel.xlsx
├── 用户分发记录失败Excel-1831284632528740352.xlsx
我们可以看到生成的 Excel 文件内容如下所示:

文末总结
文章介绍了深分页的业务场景,同时呢介绍三种深分页的解决方案,其中书签记录的方式在应对深分页场景下性能最高,在模拟的 4000 万行记录中依然能够保持微秒查询性能。最后将分发失败的用户领券记录保存到了 Excel 中,并赋值到优惠券模板分发记录任务中。
完结,撒花 🎉
不支持跳页:不能获得指定页的数据,因为查询第n页需要第n+1或者第n-1页的数据
个人认为这里还是模拟了一个场景来讲解深分页问题,实际上这种极端场景很难见到。在马丁哥的示例中,是先指定batchId(分发任务批次ID),再查询失败数据。从业务上看,每一次分发任务的批次ID都是不同,不可能上一次分发任务的批次ID是49001,过了两个月又有一个49001的批次ID。而要在一次分发任务中,达到4000w的失败数量,那合理反推有可能是当次分发全部失败了,这种业务场景不好见到的吧
本节想表达的意思没太看懂,看下来是本项目是选用保存Excel的方案,那就没有深分页的问题吧?如果是选用数据库持久化的方案,文中也说了,失败的场景并不常见,深分页在这有点大材小用。感觉这里更多的是让同学们了解一下深分页的一些常见的解决方案,如果想跟面试官聊项目中是否遇到深分页问题,最好不要举这个例子。
第22小节:基于注解实现去重表消息防止重复消费
业务背景
当使用消息队列时,客户端重复消费可能会成为一个严重的问题。
这是因为消息队列具有持久性和可靠性的特性,确保消息能够被成功传递给消费者。然而,这也会导致客户端在某些情况下重复消费消息,例如网络故障(或延迟)、客户端崩溃、消息处理失败等情况。
为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。本小节将探讨如何确保消息队列中的消息不会被重复消费,下文将以 RocketMQ 为例说明。
消息幂等性
在使用消息队列 RocketMQ 实现异步化、解耦、削峰等功能的情况下,我们认为消息中间件是一个可靠的组件,这里的可靠性指的是,只要消息被成功投递到了消息中间件,它就不会丢失,至少能够被消费者成功消费一次。这是消息中间件最基本的特性之一,也就是我们通常所说的 “AT LEAST ONCE”,即消息至少会被成功消费一遍。
举个例子,假设一个消息 M 被发送到消息中间件并被消费程序 A 接收到,A 开始消费这个消息,但是在消费过程中程序重启了。由于这个消息没有被标记为已经被消费成功,消息中间件会持续地将这个消息投递给消费者,直到消息被成功消费为止。
然而,这种可靠性特性也会导致消息被多次投递的情况。举个例子,仍然以之前的例子为例,如果消费程序 A 接收并完成消息 M 的消费逻辑后,正准备通知消息中间件“我已经消费成功了”,但在此之前程序A又重启了,那么对于消息中间件来说,这个消息 M 并没有被成功消费过,因此消息中间件会继续投递这个消息。而对于消费程序A来说,尽管它已经成功消费了这个消息,但由于程序重启导致消息中间件继续投递,看起来就好像这个消息还没有被成功消费过一样。
在 RockectMQ 的场景中,这意味着同一个 messageId 的消息会被重复投递。由于消息的可靠投递是更重要的,所以避免消息重复投递的任务转移给了应用程序自身来实现。这也是 RocketMQ 文档强调消费逻辑需要自行实现幂等性的原因。实际上,这背后的逻辑是:在分布式场景下,保证消息不丢和避免消息重复投递是矛盾的,但是消息重复投递是可以解决的,而消息丢失则非常麻烦。
Git 分支
20240905_dev_coupon-distribute-v4_idempotent_ding.ma
幂等设计
下述方案的优点在于,使用 Redis 消息去重表,不依赖事务,针对消息表本身做了状态的区分:消费中、消费完成。
如果消息已经在消费中,抛出异常,消息会触发延迟消费,在 RocketMQ 的场景下如果消息消费失败,会间隔时间后再次发起消费流程。

通过该方案可以解决什么问题?
- 1.消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
- 2.并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
- 3.支持上游业务生产者重发的业务重复的消息幂等问题。
为什么要给初始化的幂等标识新增 10 分钟过期时间?
在并发场景下,我们使用消息状态来实现并发控制,以使第二条消息被不断延迟消费(即重试)。但如果在此期间第一条消息也因某些异常原因(例如机器重启或外部异常)未成功消费,该怎么办呢?因为每次查询时都会显示消费中的状态,所以延迟消费会一直进行下去,直到最终被视为消费失败并被投递到死信 Topic 中(RocketMQ 默认最多可以重复消费 16 次)。
针对这个问题,我们采取了一种解决方案:在插入消息表时,必须为每条消息设置一个最长消费过期时间,例如 10 分钟。这意味着,如果某个消息在消费过程中超过了 10 分钟,就会被视为消费失败并从消息表中删除。
抽象通用幂等组件
消息防重复消费幂等组件是通用的,接下来的代码开发还是会放到 framework
基础架构组件里。

1. 自定义幂等注解
我们提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key。如果对 SpEL 不熟悉的同学,移步查看 历史章节 进行学习。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {
/**
* 设置防重令牌 Key 前缀
*/
String keyPrefix() default "";
/**
* 通过 SpEL 表达式生成的唯一 Key
*/
String key();
/**
* 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
*/
long keyTimeout() default 3600L;
}
2. 定义 AOP 逻辑增强
上面我们有说到,幂等需要设置两个状态,消费中和已消费,创建对应的枚举:
@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {
/**
* 消费中
*/
CONSUMING("0"),
/**
* 已消费
*/
CONSUMED("1");
@Getter
private final String code;
/**
* 如果消费状态等于消费中,返回失败
*
* @param consumeStatus 消费状态
* @return 是否消费失败
*/
public static boolean isError(String consumeStatus) {
return Objects.equals(CONSUMING.code, consumeStatus);
}
}
接下来通过 AOP 的方式进行增强注解,如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行。
@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {
private final StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT = """
local key = KEYS[1]
local value = ARGV[1]
local expire_time_ms = ARGV[2]
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
""";
/**
* 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑
*/
@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")
public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);
String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String absentAndGet = stringRedisTemplate.execute(
RedisScript.of(LUA_SCRIPT, String.class),
List.of(uniqueKey),
IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout()))
);
// 如果不为空证明已经有
if (Objects.nonNull(absentAndGet)) {
boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
if (errorFlag) {
throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));
}
return null;
}
Object result;
try {
// 执行标记了消息队列防重复消费注解的方法原逻辑
result = joinPoint.proceed();
// 设置防重令牌 Key 过期时间,单位秒
stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);
} catch (Throwable ex) {
// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费
stringRedisTemplate.delete(uniqueKey);
throw ex;
}
return result;
}
/**
* @return 返回自定义防重复消费注解
*/
public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(NoMQDuplicateConsume.class);
}
}
可能有些同学不太熟悉 Lua 脚本这个命令代表着什么,我们解释下这个概念,语句如下:
local key =KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value =ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms =ARGV[2] # 第二个参数,即幂等 Key 过期时间
return redis.call('SET', key, value,'NX','GET','PX', expire_time_ms)
最重要的是返回值那一行:
SET key value
: 在 Redis 中设置key
对应的值为value
。'NX'
: 表示只有在key
不存在时才执行设置操作,防止覆盖已有值。'GET'
: 表示在设置新值之前,获取并返回设置前的旧值(Redis 6.2 开始支持SET
命令的GET
选项)。'PX expire_time_ms'
: 设置key
的过期时间,单位是毫秒。
该脚本的主要作用是:在 Redis 中尝试以 NX
方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。
获取到 Redis 里面的 Key 值后,可能会有三个流程执行:
absentAndGet
为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。absentAndGet
为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。absentAndGet
为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。
3. 注册为 Spring Bean
因为之前 Web 接口防重复提交我们已经设置了幂等配置类,所以追加一个 @Bean 就可以了。
public class IdempotentConfiguration {
/**
* 防止用户重复提交表单信息切面控制器
*/
@Bean
public NoDuplicateSubmitAspect noDuplicateSubmitAspect(RedissonClient redissonClient) {
return new NoDuplicateSubmitAspect(redissonClient);
}
/**
* 防止消息队列消费者重复消费消息切面控制器
*/
@Bean
public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {
return new NoMQDuplicateConsumeAspect(stringRedisTemplate);
}
}
org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件保持不变就好了。
4. 实际使用场景
这个消息防重复注解需要加到我们的 CouponTaskExecuteConsumer
消费者上,因为如果触发了重复消费,意味着一个优惠券分发任务会执行两次。
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = DistributionRocketMQConstant.TEMPLATE_TASK_EXECUTE_TOPIC_KEY,
consumerGroup = DistributionRocketMQConstant.TEMPLATE_TASK_EXECUTE_CG_KEY
)
@Slf4j(topic = "CouponTaskExecuteConsumer")
public class CouponTaskExecuteConsumer implements RocketMQListener<MessageWrapper<CouponTaskExecuteEvent>> {
@NoMQDuplicateConsume(
keyPrefix = "coupon_task_execute:idempotent:",
key = "#messageWrapper.message.couponTaskId",
keyTimeout = 120
)
@Override
public void onMessage(MessageWrapper<CouponTaskExecuteEvent> messageWrapper) {
// ......
}
}
keyPrefix
和 key
组装成全局唯一的幂等标识,keyTimeout 意味着我们可以保持 2 分钟的幂等。
常见问题答疑
1. 为什么要设置 2 分钟幂等?10 分钟不行么?
这个 2 分钟是个经验值,也就是说你这个消息消费的时间是否能够在 2 分钟内执行完成,如果不行需要设置更长的时间。
2. 如果 2 分钟幂等结束后有新的一模一样的请求呢?
这是个伪命题,一般的幂等都是因为网络抖动同时到达,不太可能一个消息都执行完了挺长时间,然后又有一模一样的消息再来消费。
如果面试官非要揪着这个点不放的话,可以把这个幂等标识存放到 MySQL 数据库,进行分表存储。这样存个 10 天半个月也不怕。但是要注意,MySQL 是没有到期删除机制的,还得配合定时任务删除之前的无效数据。
3. 怎么测试幂等?
我们把 Debug 断点打到 NoMQDuplicateConsumeAspect#noMQRepeatConsume
方法里,然后在真正执行前,去 Redis 可视化工具中新增幂等值。

因为这个工具不支持创建 Key 的时候设置 Value,所以我们创建好后,还需要去设置下对应的 Value 为 0 或者 1,这样就能测试对应的流程了。

更复杂的幂等场景
1. 幂等不是万能的
到这里,方案看起来非常完美,所有的消息都可以快速接入去重,而且与具体业务实现完全解耦。但是,是否这样就可以完美地完成去重的所有任务呢?
很遗憾,实际上并非如此。因为需要确保消息至少成功消费一次,因此消息在消费过程中有可能失败并触发重试。
还是以上面的例子,假设消息消费的流程包含:
- 1.检查库存(RPC)
- 2.锁库存(RPC)
- 3.开启事务,插入订单表(MySQL)
- 4.调用某些其他下游服务(RPC)
- 5.更新订单状态
- 6.commit 事务(MySQL)
当消息消费到第三步的时候假设 MySQL 异常导致失败了,触发消息重试。在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤 1 和步骤 2 就会重新再执行一遍。
如果步骤 2 本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。
2. 通用方法实现价值
尽管这种方式并不能完全解决消息幂等问题(事实上,软件工程领域里很少有银弹可以完全解决问题),但它仍然具有很大的价值。通过这种简便的方式,我们能够解决以下问题:
- 1.各种由于 Broker、负载均衡等原因导致的消息重投递的重复问题。
- 2.各种上游生产者导致的业务级别消息重复问题。
- 3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑。
使用这种方法可以确保在正常的消费逻辑场景下(无异常,无异常退出),消息的幂等性全部得到解决,无论是业务重复还是 RocketMQ 特性带来的重复。虽然它不是解决消息幂等性的银弹,但它以一种简单和便捷的方式提供了解决方案。
实际上,这种方法已经可以解决 99% 的消息重复问题了,因为异常、宕机等情况通常是少数情况。
完结,撒花 🎉
6.2.6的redis.call不支持那么多参数,SET key val PX time NX GET会报语法错误,只追加NX或GET中的一个是没有问题的,星球提供的7.2.5是可以同时追加NX和GET的

我嘞个豆,常见问题的第二个问题,就是当时面试官问我的问题(短链接的项目),我当时确实往这方面想了,觉得不太可能会出现这种情况,但是奈何当时面试的时候紧张+嘴笨,没说出来= =
과목th허jlS* 回复 地信哥:地信哥,我看到rocketmq的超时时间是15min,如果消费者成功消费,但是没有返回给broker ack,15min后重新投递,那不是幂等失效了(幂等组件才2min)
如果发现消息处于consuming状态,不就说明mq中有两个或以上的同种消息吗,让一条消息去消费不就可以了,另外的消息直接放弃,如果不放弃反而重新投递的话不会占用资源吗,如果第一条消息消费失败了,mq也能重新投递
丁花哥 回复 rebcaaaaaa*:你这个有逻辑漏洞,光看你说的这个好像没有问题。前提条件:就是一遇到consuming状态就返回成功(应该理解没有错吧)。假设消息M,刚被一次消费A消费,redis状态改为consuming,其他操作还没有进行,如果这个时候这个机器宕机了,那么下次重新投递给消费B,消费时redis状态已经是consuming,直接返回成功,这条消息就没有消费过。
捋一下思路。直接从NoMQDuplicateConsumeAspect切面类中的开始 在noMQRepeatConsume方法中首先执行getNoMQDuplicateConsumeAnnotation方法获取切面方法上面的注解 然后将注解上面的keyPrefix和执行SpELUtil.parseKey方法获取到的messageWrapper.message.couponTaskId值拼接起来作为uniqueKey 然后执行lua脚本获取到redis该key的值,该lua脚本的流程就是获取原始的值,如果没有就设置新的值和过期时间 获取lua脚本执行后的结果赋值给absentAndGet absentAndGet的值有三种情况: 如果为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。 如果为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试 如果为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理 如果不为空,那就证明该消息正在消费或者消费完成。 然后判断是否是消费中,如果消费中就手动抛出ServiceException 如果是消费成功,那就直接return 如果为空,那此时消息就是第一次到达,那就开始执行标记了消息队列防重复消费注解的方法原逻辑 由于在lua脚本中就已经让key的值设置为消费中了,所以如果在执行的过程中有消息过来就保证不会重复消费 执行完了就让缓存中的值设置为消费完成,执行完返回结果 这里注意一点,如果absentAndGet不为空,并且是在消费中的情况,不能直接return。(评论区的一位同学的例子)假设消息M第一次被消费者A消费,redis状态改为consuming,在消息消费成功之前,如果这个时候服务器宕机了,下次重新投递给消费者A,由于此时redis状态已经是consuming了,那直接return的话,这条消息之后就不会被消费成功,这就是一个bug。但是如果在消费中的情况下我们抛出异常,可以成功避免上面的问题,并且,如果消费者A消费成功了,根据我们的代码可以直接return,也不会出现重复消费。
第23小节:开发兑换/秒杀优惠券功能(一)
业务背景
通过之前的章节,我们已经把优惠券相关的业务铺垫的差不多了,接下来我们正式开启引擎服务的优惠券兑换/秒杀业务。
普通兑换自然没有那么多弯弯绕绕,但是涉及到一些比较大额或者有吸引力的优惠券来说,就不一样了,会面临大量用户抢购少量的库存,也就是我们常说的秒杀架构。和常规的秒杀系统一样,我们也是采用了缓存抗并发、然后扣减缓存成功的请求可以进行扣减数据库,并将优惠券添加到用户的领券记录中。
有一点需要和大家先说明下,因为优惠券的领取不像 12306 一样需要保障强一致性,所以,理论上可以从技术方案上做减法,会有一些容忍故障。
再和大家多扯一句,那就是面试可以造火箭,但是你得知道复杂技术方案带来的落地难度问题。有时候针对没那么重要的场景,方案简单粗暴一点并没有问题。很多同学会说缓存宕机了怎么办?缓存主从异步同步丢失数据怎么办?缓存扣减成功数据库没扣减时应用宕机了怎么办?诸如此类,只是一张优惠券而已,其实没有啥问题。
虽然我们是这么说,但还是会以较严谨的方式给大家讲解牛券的秒杀架构。
Git 分支
20240908_dev_acquire-coupon_seckill_ding.ma
秒杀业务难点
在我们兑换/秒杀优惠券模板的接口中,可能会存在以下三个难点:
- 高并发流量压力:秒杀活动往往会瞬间吸引大量用户访问系统,导致流量骤增,如果直接访问数据库,可能会让数据库负载过重,甚至导致宕机。
- 库存超卖问题:由于并发请求,多个用户同时抢购可能会导致系统超卖,即多个用户同时购买到同一库存。
- 用户超领问题:优惠券中会有一个限制,每个用户限流几张,应该如何避免用户领取超过这个限制。
在接下来的讲解中,我们会逐一完成这些难点说明和解决方案讲解。
优惠券秒杀前置拦截
1. 验证优惠券
首先呢,我们应该对前端传来的数据秉承着完全不可信原则,首先验证是否存在,其次呢验证优惠券是否有效活动期间。
代码如下所示:
@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// 验证缓存是否存在,保障数据存在并且缓存中存在CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplate(BeanUtil.toBean(requestParam,CouponTemplateQueryReqDTO.class));
// 验证领取的优惠券是否在活动有效时间boolean isInTime =DateUtil.isIn(newDate(), couponTemplate.getValidStartTime(), couponTemplate.getValidEndTime());if(!isInTime){// 一般来说优惠券领取时间不到的时候,前端不会放开调用请求,可以理解这是用户调用接口在“攻击”thrownewClientException("不满足优惠券领取时间");}}
2. 扣减缓存
如果验证优惠券模板没有问题,那我们开始进行库存扣减和验证用户是否领取优惠券超额。
代码如下所示:
privatefinalstaticStringSTOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH="lua/stock_decrement_and_save_user_receive.lua";@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......// 获取 LUA 脚本,并保存到 Hutool 的单例管理容器,下次直接获取不需要加载DefaultRedisScript<Long> buildLuaScript =Singleton.get(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH,()->{DefaultRedisScript<Long> redisScript =newDefaultRedisScript<>();
redisScript.setScriptSource(newResourceScriptSource(newClassPathResource(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH)));
redisScript.setResultType(Long.class);return redisScript;});// 验证用户是否符合优惠券领取条件JSONObject receiveRule =JSON.parseObject(couponTemplate.getReceiveRule());String limitPerPerson = receiveRule.getString("limitPerPerson");// 执行 LUA 脚本进行扣减库存以及增加 Redis 用户领券记录次数String couponTemplateCacheKey =String.format(EngineRedisConstant.COUPON_TEMPLATE_KEY, requestParam.getCouponTemplateId());String userCouponTemplateLimitCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIMIT_KEY,UserContext.getUserId(), requestParam.getCouponTemplateId());Long stockDecrementLuaResult = stringRedisTemplate.execute(
buildLuaScript,ListUtil.of(couponTemplateCacheKey, userCouponTemplateLimitCacheKey),String.valueOf(couponTemplate.getValidEndTime().getTime()), limitPerPerson
);// 判断 LUA 脚本执行返回类,如果失败根据类型返回报错提示long firstField =StockDecrementReturnCombinedUtil.extractFirstField(stockDecrementLuaResult);if(RedisStockDecrementErrorEnum.isFail(firstField)){thrownewServiceException(RedisStockDecrementErrorEnum.fromType(firstField));}}
为了避免访问库存扣减和判断用户是否已超额领取优惠券多次 Redis 请求,所以我们还是依然采用 Redis Lua 脚本执行。
Lua 脚本如下所示:
--Lua 脚本: 检查用户是否达到优惠券领取上限并记录领取次数
-- 参数列表:
--KEYS[1]: 优惠券库存键 (coupon_stock_key)--KEYS[2]: 用户领取记录键 (user_coupon_key)--ARGV[1]: 优惠券有效期结束时间 (timestamp)--ARGV[2]: 用户领取上限 (limit)
local function combineFields(firstField, secondField)-- 确定 SECOND_FIELD_BITS 为 14,因为 secondField 最大为 9999
local SECOND_FIELD_BITS=14-- 根据 firstField 的实际值,计算其对应的二进制表示
-- 由于 firstField 的范围是0-2,我们可以直接使用它的值
local firstFieldValue = firstField
-- 模拟位移操作,将 firstField 的值左移 SECOND_FIELD_BITS 位
local shiftedFirstField = firstFieldValue *(2^SECOND_FIELD_BITS)-- 将 secondField 的值与位移后的 firstField 值相加
return shiftedFirstField + secondField
end
-- 获取当前库存
local stock =tonumber(redis.call('HGET',KEYS[1],'stock'))-- 判断库存是否大于 0if stock <=0 then
returncombineFields(1,0)-- 库存不足
end
-- 获取用户领取的优惠券次数
local userCouponCount =tonumber(redis.call('GET',KEYS[2]))-- 如果用户领取次数不存在,则初始化为 0if userCouponCount == nil then
userCouponCount =0
end
-- 判断用户是否已经达到领取上限
if userCouponCount >=tonumber(ARGV[2]) then
returncombineFields(2, userCouponCount)-- 用户已经达到领取上限
end
-- 增加用户领取的优惠券次数
if userCouponCount ==0 then
-- 如果用户第一次领取,则需要添加过期时间
redis.call('SET',KEYS[2],1)
redis.call('EXPIRE',KEYS[2],ARGV[1])else-- 因为第一次领取已经设置了过期时间,第二次领取沿用之前即可
redis.call('INCR',KEYS[2])
end
-- 减少优惠券库存
redis.call('HINCRBY',KEYS[1],'stock',-1)returncombineFields(0, userCouponCount)
这里我们还是采用了之前的策略,将返回的两个参数包装为一个 long 类型的数据,并进行拆分。两个参数分别如下:
- 请求是否成功:有 3 个参数,0 代表请求成功,1 代表优惠券已被领取完啦,2 代表用户已经达到领取上限。
- 用户领取次数:初始化为 0,每次领取成功后自增加 1。
如果返回 0,x 代表请求成功,x 就是目前用户已领取优惠券的次数,会把这个 x 保存到数据库表 t_user_coupon
的领取次数 receive_count 字段中。
优惠券保存数据库
1. 扣减 MySQL 优惠券库存
因为我们要加事务,中间遇到问题可以回滚数据库优惠券模板库存,但是如果加到整个方法感觉又不是很合适,因为前面的验证是不需要事务的。所以,我们采用编程式事务,自己开启、提交和回滚事务。
代码如下所示:
privatefinalTransactionTemplate transactionTemplate;@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......// 通过编程式事务执行优惠券库存自减以及增加用户优惠券领取记录long extractSecondField =StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult);
transactionTemplate.executeWithoutResult(status ->{try{int decremented = couponTemplateMapper.decrementCouponTemplateStock(Long.parseLong(requestParam.getShopNumber()),Long.parseLong(requestParam.getCouponTemplateId()),1L);if(!SqlHelper.retBool(decremented)){thrownewServiceException("优惠券已被领取完啦");}// 添加 Redis 用户领取的优惠券记录列表Date now =newDate();DateTime validEndTime =DateUtil.offsetHour(now,JSON.parseObject(couponTemplate.getConsumeRule()).getInteger("validityPeriod"));UserCouponDO userCouponDO =UserCouponDO.builder().couponTemplateId(Long.parseLong(requestParam.getCouponTemplateId())).userId(Long.parseLong(UserContext.getUserId())).source(requestParam.getSource()).receiveCount(Long.valueOf(extractSecondField).intValue()).status(UserCouponStatusEnum.UNUSED.getCode()).receiveTime(now).validStartTime(now).validEndTime(validEndTime).build();
userCouponMapper.insert(userCouponDO);}catch(Exception ex){
status.setRollbackOnly();// 优惠券已被领取完业务异常if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");}});}
我们在进行库存扣减时,采用 MySQL 的行记录锁机制进行扣减。并且在扣减的基础上,为了避免被多扣,在判断条件里,我们加上了必须大于等于当前库存才可以扣减成功。
从零到一分支里注释写错了,以本章节注释为主。
SQL 如下所示:
<!-- 通过 MySQL 的行记录锁机制原子扣减优惠券模板库存 --><update id="decrementCouponTemplateStock">UPDATE t_coupon_template
SET stock = stock - #{decrementStock}WHERE shop_number = #{shopNumber}AND id = #{couponTemplateId}AND stock >= #{decrementStock}</update>
通过之前的章节证明,这个 SQL 记录本质上底层还是 MySQL 行锁,避免扣减冲突。
详情查看牛券之前的 历史文章 文末内容,有详细的图文讲解。
2. 添加用户领券记录
如果扣减数据库成功,那我们则将优惠券领取记录保存到 t_user_coupon
表中。
温馨提示,因为我们是在配置中固定写的用户 ID,所以数据都会写到 0 库的
t_user_coupon_13
表。
在这里,我们加了一些优化,那就是添加优惠券时,可能会有用户重复领券冲突、再比如数据库宕机之类的问题,所以我们针对不同的异常进行了前端报错信息提示。
代码如下所示:
if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");
3. 保存用户领券缓存
添加数据库如果没有异常的话,那我们应该将用户已领取的优惠券添加到 Redis 缓存中。
代码如下所示:
privatefinalTransactionTemplate transactionTemplate;@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......// 通过编程式事务执行优惠券库存自减以及增加用户优惠券领取记录long extractSecondField =StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult);
transactionTemplate.executeWithoutResult(status ->{try{// ......// 添加用户领取优惠券模板缓存记录String userCouponListCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY,UserContext.getUserId());String userCouponItemCacheKey =StrUtil.builder().append(requestParam.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}catch(Exception ex){
status.setRollbackOnly();// 优惠券已被领取完业务异常if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");}});}
大家都知道,如果保存 Redis 后,Redis 即使返回给你成功,但是数据不一定是能持久化成功的,因为在极端宕机情况下,持久化配置中 AOF 最优结果下,也是会丢一条数据的。
详情查看:大话面试:Redis宕机数据会丢失么?
除了持久化配置,Redis 如果是主从配置,主从数据同步是异步的,如果主节点写入成功,返回客户端成功,还没来得及同步从节点宕机了,从节点成为主节点,那么这个数据就成糊涂账了。
为此,我们写了一个很八股的写法,那就是通过先写再查。
代码如下所示:
privatefinalTransactionTemplate transactionTemplate;@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......// 通过编程式事务执行优惠券库存自减以及增加用户优惠券领取记录long extractSecondField =StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult);
transactionTemplate.executeWithoutResult(status ->{try{// ......// 添加用户领取优惠券模板缓存记录String userCouponListCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY,UserContext.getUserId());String userCouponItemCacheKey =StrUtil.builder().append(requestParam.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());// 由于 Redis 在持久化或主从复制的极端情况下可能会出现数据丢失,而我们对指令丢失几乎无法容忍,因此我们采用经典的写后查询策略来应对这一问题Double scored;try{
scored = stringRedisTemplate.opsForZSet().score(userCouponListCacheKey, userCouponItemCacheKey);// scored 为空意味着可能 Redis Cluster 主从同步丢失了数据,比如 Redis 主节点还没有同步到从节点就宕机了,解决方案就是再新增一次if(scored ==null){// 如果这里也新增失败了怎么办?我们大概率做不到绝对的万无一失,只能尽可能增加成功率
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}}catch(Throwable ex){
log.warn("查询Redis用户优惠券记录为空或抛异常,可能Redis宕机或主从复制数据丢失,基础错误信息:{}", ex.getMessage());// 如果直接抛异常大概率 Redis 宕机了,所以应该写个延时队列向 Redis 重试放入值。为了避免代码复杂性,这里直接写新增,大家知道最优解决方案即可
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}}catch(Exception ex){
status.setRollbackOnly();// 优惠券已被领取完业务异常if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");}});}
4. 发送优惠券到期事件
在上面代码的基础上,如果都执行成功,我们需要发送个 RocketMQ 延时队列,在指定时间后将优惠券模板的状态设置为已过期状态。
代码如下所示:
privatefinalTransactionTemplate transactionTemplate;
@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......
// 通过编程式事务执行优惠券库存自减以及增加用户优惠券领取记录long extractSecondField =StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult);
transactionTemplate.executeWithoutResult(status ->{try{// ......
// 添加用户领取优惠券模板缓存记录String userCouponListCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY,UserContext.getUserId());String userCouponItemCacheKey =StrUtil.builder().append(requestParam.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());
// 由于 Redis 在持久化或主从复制的极端情况下可能会出现数据丢失,而我们对指令丢失几乎无法容忍,因此我们采用经典的写后查询策略来应对这一问题Double scored;try{
scored = stringRedisTemplate.opsForZSet().score(userCouponListCacheKey, userCouponItemCacheKey);// scored 为空意味着可能 Redis Cluster 主从同步丢失了数据,比如 Redis 主节点还没有同步到从节点就宕机了,解决方案就是再新增一次if(scored ==null){// 如果这里也新增失败了怎么办?我们大概率做不到绝对的万无一失,只能尽可能增加成功率
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}}catch(Throwable ex){
log.warn("查询Redis用户优惠券记录为空或抛异常,可能Redis宕机或主从复制数据丢失,基础错误信息:{}", ex.getMessage());// 如果直接抛异常大概率 Redis 宕机了,所以应该写个延时队列向 Redis 重试放入值。为了避免代码复杂性,这里直接写新增,大家知道最优解决方案即可
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}
// 发送延时消息队列,等待优惠券到期后,将优惠券信息从缓存中删除UserCouponDelayCloseEvent userCouponDelayCloseEvent =UserCouponDelayCloseEvent.builder().couponTemplateId(requestParam.getCouponTemplateId()).userCouponId(String.valueOf(userCouponDO.getId())).userId(UserContext.getUserId()).delayTime(validEndTime.getTime()).build();SendResult sendResult = couponDelayCloseProducer.sendMessage(userCouponDelayCloseEvent);
// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递if(ObjectUtil.notEqual(sendResult.getSendStatus().name(),"SEND_OK")){
log.warn("发送优惠券关闭延时队列失败,消息参数:{}",JSON.toJSONString(userCouponDelayCloseEvent));}}catch(Exception ex){
status.setRollbackOnly();// 优惠券已被领取完业务异常if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");}});}
重构优惠券秒杀方案
1. 现有技术方案问题
细心的同学可能发现了一个问题,在如此高并发的场景下,在一个事务中操作了这么多 Redis 和 RocketMQ,就会导致事务时间延长以及接口响应速度变慢等问题。
我们在兑换/秒杀优惠券接口的事务中共执行了以下逻辑:
- 1.操作优惠券库存表进行扣减库存;
- 2.添加优惠券模板到用户领券表;
- 3.保存优惠券模板到用户 Redis 领券记录中;
- 4.查询用户 Redis 领券记录是否持久化成功;
- 5.发送 RocketMQ 消息队列延时消息,到期修改用户优惠券状态。
其中 3、4、5 步骤逻辑都是在数据库操作成功的基础上执行的,那我们就可以通过 Canal 监听 Binlog 机制,异步执行这些逻辑就好了,这样就能不占用主逻辑的事务和响应时间了。
2. Canal 改造现有秒杀架构
2.1 什么是 Canal
译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

基于日志增量订阅和消费的业务包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
2.2 MySQL 开启 Binlog 监听
对于自建 MySQL , 需要先开启 Binlog 写入功能,my.cnf 中配置需要包含如下信息。
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW# 选择 ROW 模式
server_id=1# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
配置修改成功后,重启一下 MySQL,确保所有的配置生效。然后执行 MySQL 执行命令查看是否修改 Binlog 成功。
# 查看 binlog 日志是否开启show variables like'log_%';
如果查询出的选型 log_bin 数据为 ON,Binlog 即为开启状态。输出如下:
mysql>show variables like'log_%';+----------------------------------------+-----------------------------------------+| Variable_name |Value|+----------------------------------------+-----------------------------------------+| log_bin |ON|| log_bin_basename |/opt/homebrew/var/mysql/mysql-bin || log_bin_index |/opt/homebrew/var/mysql/mysql-bin.index|| log_bin_trust_function_creators |OFF|| log_bin_use_v1_row_events |OFF|| log_builtin_as_identified_by_password |OFF|| log_error |./bogon.err || log_error_verbosity |3|| log_output |FILE|| log_queries_not_using_indexes |OFF|| log_slave_updates |OFF|| log_slow_admin_statements |OFF|| log_slow_slave_statements |OFF|| log_statements_unsafe_for_binlog |ON|| log_syslog |OFF|| log_syslog_facility | daemon || log_syslog_include_pid |ON|| log_syslog_tag ||| log_throttle_queries_not_using_indexes |0|| log_timestamps | UTC || log_warnings |2|+----------------------------------------+-----------------------------------------+21rowsinset(0.00 sec)
最后,MySQL 执行 SQL 语句创建 canal 单独使用的账号,用来进行 Binlog 的同步和监听。
CREATEUSER canal IDENTIFIED BY'canal';GRANTSELECT,REPLICATION SLAVE,REPLICATION CLIENT ON*.*TO'canal'@'%';
FLUSH PRIVILEGES;
2.3 安装 Canal 中间件
接下来开始配置 Canal 中间件,下载 Canal 安装包,其中的一些配置都是我调试后的。
通过网盘分享的文件:canal.deployer-1.1.6-oneCoupon.zip 链接: https://pan.baidu.com/s/1h9dLEl2g7oSdz9YuNC70rw?pwd=9tfb 提取码: 9tfb
如果本地 MySQL 不是 127.0.0.1:3306
配置,需要将 instance.properties
配置文件中的 canal.instance.master.address
配置项替换为真实的 MySQL 地址和端口。

如果你是自己本地启动的 RocketMQ,将 canal.properties
文件中的 rocketmq.namesrv.addr
配置项变更为自己本地的 RocketMQ 配置,默认为公有云中间件地址。
如果你正在使用公有云 RocketMQ,instance.properties
配置文件中的 canal.mq.topic
也需要变更,配置中默认为 one-coupon_canal_engine-service_common-sync_topic-mading
,-mading
需要大家变更为自己在项目启动 VM 参数里的值。各自检查并进行替换。
-Dunique-name=-mading
如果这一步有问题,是没有办法发送到 RocketMQ 顺利消费的。
全部上述配置大家全都搞定后,开始启动 Canal 进行测试。如果是 Windows 系统,直接双击 Canal 文件夹下 bin 目录的 startup.bat
脚本即可,要停止 Canal 关闭脚本弹框。如果是 Linux 或者 Mac 系统,进入 bin 目录下执行 sh startup.sh
命令。如果是 Linux 或者 Mac 系统要停止执行 sh stop.sh
。
注意,Canal 安装路径中不要有文件夹名称是中文。需要使用 JDK8 启动 Canal 服务,不要使用 JDK17。
启动完成后,打开 logs 文件夹目录下的 canal/canal.log 文件。

查看是否启动成功,如果输出打印如下 the canal server is running now
,即为成功。

然后,我们变更数据库 t_user_coupon
表中任意一条记录的 status 值,比如从 0 变为 1,或者从 1 变为 0。
接下来查看 RocketMQ 控制台。
页面配置查询如下,选择好主题和开始结束时间(默认会填充)后,点击搜索按钮。
消息体格式为 JSON 类型数据,示例数据如下:
{"data":[{"id":"1832782304792027137","user_id":"1810518709471555585","coupon_template_id":"1832782251755053058","receive_time":"2024-09-08 22:05:28","receive_count":"1","valid_start_time":"2024-09-08 22:05:28","valid_end_time":"2024-09-10 22:05:28","use_time":null,"source":"0","status":"1","create_time":"2024-09-08 22:05:28","update_time":"2024-09-08 22:05:28","del_flag":"0"}],"database":"one_coupon_0","es":1725881100000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","user_id":"bigint(20)","coupon_template_id":"bigint(20)","receive_time":"datetime","receive_count":"int(3)","valid_start_time":"datetime","valid_end_time":"datetime","use_time":"datetime","source":"tinyint(1)","status":"tinyint(1)","create_time":"datetime","update_time":"datetime","del_flag":"tinyint(1)"},"old":[{"status":"0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"user_id":-5,"coupon_template_id":-5,"receive_time":93,"receive_count":4,"valid_start_time":93,"valid_end_time":93,"use_time":93,"source":-6,"status":-6,"create_time":93,"update_time":93,"del_flag":-6},"table":"t_user_coupon_13","ts":1725881101080,"type":"UPDATE"}
对应配置字段信息的语意如下:
/**
* Canal Binlog 监听触发事件
*/@DatapublicclassCanalBinlogEvent{
/**
* 变更数据
*/privateList<Map<String, Object>> data;
/**
* 数据库名称
*/privateString database;
/**
* es 是指 Mysql Binlog 里原始的时间戳,也就是数据原始变更的时间
* Canal 的消费延迟 = ts - es
*/privateLong es;
/**
* 递增 ID,从 1 开始
*/privateLong id;
/**
* 当前变更是否是 DDL 语句
*/privateBoolean isDdl;
/**
* 表结构字段类型
*/privateMap<String, Object> mysqlType;
/**
* UPDATE 模式下旧数据
*/privateList<Map<String, Object>> old;
/**
* 主键名称
*/privateList<String> pkNames;
/**
* SQL 语句
*/privateString sql;
/**
* SQL 类型
*/privateMap<String, Object> sqlType;
/**
* 表名
*/privateString table;
/**
* ts 是指 Canal 收到这个 Binlog,构造为自己协议对象的时间
* 应用消费的延迟 = now - ts
*/privateLong ts;
/**
* INSERT(新增)、UPDATE(更新)、DELETE(删除)等等
*/privateString type;}
2.4 监听 Canal RocketMQ Topic
一般来说,针对高并发的 Binlog 监听,我们都是将 Canal 的 Binlog 数据丢到消息队列中。Canal 会将 Binlog 的变更内容推送到指定的 RocketMQ Topic。因此,在 Spring Boot 应用中,我们只需要与 RocketMQ 进行对接即可。
代码如下所示:
packagecom.nageoffer.onecoupon.engine.mq.consumer;
importcn.hutool.core.collection.CollUtil;importcn.hutool.core.date.DateUtil;importcn.hutool.core.util.ObjectUtil;importcn.hutool.core.util.StrUtil;importcom.alibaba.fastjson2.JSON;importcom.nageoffer.onecoupon.engine.common.constant.EngineRedisConstant;importcom.nageoffer.onecoupon.engine.common.constant.EngineRockerMQConstant;importcom.nageoffer.onecoupon.engine.mq.event.CanalBinlogEvent;importcom.nageoffer.onecoupon.engine.mq.event.UserCouponDelayCloseEvent;importcom.nageoffer.onecoupon.engine.mq.producer.UserCouponDelayCloseProducer;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.stereotype.Component;
importjava.util.Date;importjava.util.Map;
/**
* 通过 Canal 监听用户优惠券表 Binlog 投递消息队列消费
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部沟通群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-25
*/@Slf4j@Component@RequiredArgsConstructor@RocketMQMessageListener(
topic ="one-coupon_canal_engine-service_common-sync_topic${unique-name:}",
consumerGroup ="one-coupon_canal_engine-service_common-sync_cg${unique-name:}")publicclassCanalBinlogSyncUserCouponConsumerimplementsRocketMQListener<CanalBinlogEvent>{
privatefinalStringRedisTemplate stringRedisTemplate;privatefinalUserCouponDelayCloseProducer couponDelayCloseProducer;
@OverridepublicvoidonMessage(CanalBinlogEvent canalBinlogEvent){Map<String, Object> first =CollUtil.getFirst(canalBinlogEvent.getData());String couponTemplateId = first.get("coupon_template_id").toString();String userCouponId = first.get("id").toString();// 用户优惠券创建事件if(ObjectUtil.equal(canalBinlogEvent.getType(),"INSERT")){// 添加用户领取优惠券模板缓存记录String userCouponListCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY, first.get("user_id").toString());String userCouponItemCacheKey =StrUtil.builder().append(couponTemplateId).append("_").append(userCouponId).toString();Date receiveTime =DateUtil.parse(first.get("receive_time").toString());
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, receiveTime.getTime());
// 由于 Redis 在持久化或主从复制的极端情况下可能会出现数据丢失,而我们对指令丢失几乎无法容忍,因此我们采用经典的写后查询策略来应对这一问题Double scored;try{
scored = stringRedisTemplate.opsForZSet().score(userCouponListCacheKey, userCouponItemCacheKey);// scored 为空意味着可能 Redis Cluster 主从同步丢失了数据,比如 Redis 主节点还没有同步到从节点就宕机了,解决方案就是再新增一次if(scored ==null){// 如果这里也新增失败了怎么办?我们大概率做不到绝对的万无一失,只能尽可能增加成功率
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, receiveTime.getTime());}}catch(Throwable ex){
log.warn("查询Redis用户优惠券记录为空或抛异常,可能Redis宕机或主从复制数据丢失,基础错误信息:{}", ex.getMessage());// 如果直接抛异常大概率 Redis 宕机了,所以应该写个延时队列向 Redis 重试放入值。为了避免代码复杂性,这里直接写新增,大家知道最优解决方案即可
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, receiveTime.getTime());}
// 发送延时消息队列,等待优惠券到期后,将优惠券信息从缓存中删除UserCouponDelayCloseEvent userCouponDelayCloseEvent =UserCouponDelayCloseEvent.builder().couponTemplateId(couponTemplateId).userCouponId(userCouponId).userId(first.get("user_id").toString()).delayTime(DateUtil.parse(first.get("valid_end_time").toString()).getTime()).build();SendResult sendResult = couponDelayCloseProducer.sendMessage(userCouponDelayCloseEvent);
// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递if(ObjectUtil.notEqual(sendResult.getSendStatus().name(),"SEND_OK")){
log.warn("发送优惠券关闭延时队列失败,消息参数:{}",JSON.toJSONString(userCouponDelayCloseEvent));}}}}
通过这种方式我们就能将接口的吞吐量大幅度提高了。
3. 简化 Canal 流程
考虑到很多同学是想快速启动牛券项目,为了不让 Canal 成为强制依赖的组件,我们添加一个配置。
application.yaml 配置文件如下:
one-coupon:user-coupon-list:
save-cache:
type: direct # 有两个选项:direct 在流程里直接操作,binlog 通过解析数据库日志后操作
在 Service 代码中添加一个判断就好,代码如下所示:
@OverridepublicvoidredeemUserCoupon(CouponTemplateRedeemReqDTO requestParam){// ......// 通过编程式事务执行优惠券库存自减以及增加用户优惠券领取记录long extractSecondField =StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult);
transactionTemplate.executeWithoutResult(status ->{try{// ......// 保存优惠券缓存集合有两个选项:direct 在流程里直接操作,binlog 通过解析数据库日志后操作if(StrUtil.equals(userCouponListSaveCacheType,"direct")){// 操作缓存、操作消息队列// ......}}catch(Exception ex){
status.setRollbackOnly();// 优惠券已被领取完业务异常if(ex instanceofServiceException){throw(ServiceException) ex;}if(ex instanceofDuplicateKeyException){
log.error("用户重复领取优惠券,用户ID:{},优惠券模板ID:{}",UserContext.getUserId(), requestParam.getCouponTemplateId());thrownewServiceException("用户重复领取优惠券");}thrownewServiceException("优惠券领取异常,请稍候再试");}});}
Redis 缓存序列化器
因为咱们的用户信息是固定写入的,所以,如果用户领取了优惠券,大家的优惠券信息都是写到一个 Redis(如果用了云 Redis 的话)缓存的。这样的话就不太好分辨和测试,为此我们通过自定义 Redis 缓存的序列化器做个标识。
变更代码如下:

逻辑比较简单,通过自定义一个 RedisKeySerializer
替换默认的 Key 序列化器就好。
@RequiredArgsConstructor@EnableConfigurationProperties(RedisDistributedProperties.class)publicclassCacheConfigurationimplementsInitializingBean{
privatefinalRedisDistributedProperties redisDistributedProperties;privatefinalStringRedisTemplate stringRedisTemplate;
/**
* 创建 Redis Key 序列化器,可自定义 Key Prefix
*/@BeanpublicRedisKeySerializerredisKeySerializer(){String prefix =Optional.ofNullable(redisDistributedProperties.getPrefix()).orElse("");String prefixCharset = redisDistributedProperties.getPrefixCharset();returnnewRedisKeySerializer(prefix, prefixCharset);}
@OverridepublicvoidafterPropertiesSet()throwsException{
stringRedisTemplate.setKeySerializer(redisKeySerializer());}}
然后我们在 VM 参数里添加一个新的参数,因为我只用了公有云 RocketMQ,所以加上后一共是三个参数。

但是有个点,布隆过滤器的创建并不受这个管控,所以我们要手动追加。需要修改后管和引擎的布隆过滤器创建配置。
代码如下:
@ConfigurationpublicclassRBloomFilterConfiguration{
/**
* 优惠券查询缓存穿透布隆过滤器
*/@BeanpublicRBloomFilter<String>couponTemplateQueryBloomFilter(RedissonClient redissonClient,@Value("${framework.cache.redis.prefix:}")String cachePrefix){RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(cachePrefix +"couponTemplateQueryBloomFilter");
bloomFilter.tryInit(640L,0.001);return bloomFilter;}}
然后我们重启后管和引擎服务,执行新增和领取优惠券业务,就可以看到默认的 Key 前面会追加我们自定义的 prefix。

执行优惠券兑换
1. 创建优惠券模板
创建优惠券模板接口地址:点击跳转
创建一个新的优惠券模板,然后用户限领次数 limitPerPerson
设置为 2 次。
{"name":"用户下单满10减3特大优惠","source":0,"target":1,"goods":"","type":0,"validStartTime":"2024-07-08 12:00:00","validEndTime":"2025-06-30 13:59:59","stock":4998,"receiveRule":"{\"limitPerPerson\":2,\"usageInstructions\":\"3\"}","consumeRule":"{\"termsOfUse\":10,\"maximumDiscountAmount\":3,\"explanationOfUnmetConditions\":\"3\",\"validityPeriod\":\"48\"}"}
2. 兑换优惠券
兑换优惠券模板接口地址:点击跳转
从数据库 one_coupon_rebuild_1
中查询 t_coupon_template_15
最新的创建优惠券模板 ID,然后进入 API 地址开始进行兑换。
shopNumber
是固定的,优惠券模板 ID 复制刚创建的。
{"source":0,"shopNumber":"1810714735922956666","couponTemplateId":"1833125314830544897"}
我们调用两次数据是正常的,如果调用第三次,就会触发用户限领异常,如下所示:
{"code":"B000001","message":"用户已经达到领取上限","data":null,"requestId":null,"success":false}
我们查看 Redis 用户领取优惠券缓存数据,使用 ZSet 存储两条记录,刚好符合我们的预期。

文末总结
在优惠券兑换和秒杀业务中,我们实现了关键技术优化。首先,通过编程式事务确保了数据的正确性,结合 Redis 和 RocketMQ 进行消息处理,满足了秒杀场景对高吞吐量的要求。然而,在早期版本中,系统的吞吐量未能达到预期,并且在高并发场景下容易出现异常。为此,我们推出了 v2 版本的架构优化,通过 Canal Binlog 监听数据库变更事件,实现对数据更新的及时捕获和处理。这种方式大幅提升了系统的吞吐量,并减少了异常的发生概率,极大地提高了系统的稳定性和性能。
完结,撒花 🎉
第24小节:开发兑换/秒杀优惠券功能(二)
业务背景
在上一节中,我们介绍了通过数据库扣减完成用户兑换优惠券的逻辑,这种方式虽然稳妥,但性能有所不足,因为主流程的操作是同步执行的,导致响应时间变长,吞吐量下降。在本章节中,我们通过引入消息队列进行异步解耦,主流程仅同步操作 Redis,后续的数据库耗时操作则交由消息队列消费者来执行,从而提升整体性能。
Git 分支
20240910_dev_acquire-coupon-v2_seckill_ding.ma
开发基于消息队列秒杀逻辑
1. 编写兑换优惠券 v2 接口
保持原有代码不变,我们开发一个 v2 版本的方法。前置校验部分可以直接复用 v1 版本的通用逻辑。
代码如下所示:
@OverridepublicvoidredeemUserCouponByMQ(CouponTemplateRedeemReqDTO requestParam){// 验证缓存是否存在,保障数据存在并且缓存中存在CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplate(BeanUtil.toBean(requestParam,CouponTemplateQueryReqDTO.class));
// 验证领取的优惠券是否在活动有效时间boolean isInTime =DateUtil.isIn(newDate(), couponTemplate.getValidStartTime(), couponTemplate.getValidEndTime());if(!isInTime){// 一般来说优惠券领取时间不到的时候,前端不会放开调用请求,可以理解这是用户调用接口在“攻击”thrownewClientException("不满足优惠券领取时间");}
// 获取 LUA 脚本,并保存到 Hutool 的单例管理容器,下次直接获取不需要加载DefaultRedisScript<Long> buildLuaScript =Singleton.get(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH,()->{DefaultRedisScript<Long> redisScript =newDefaultRedisScript<>();
redisScript.setScriptSource(newResourceScriptSource(newClassPathResource(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH)));
redisScript.setResultType(Long.class);return redisScript;});
// 验证用户是否符合优惠券领取条件JSONObject receiveRule =JSON.parseObject(couponTemplate.getReceiveRule());String limitPerPerson = receiveRule.getString("limitPerPerson");
// 执行 LUA 脚本进行扣减库存以及增加 Redis 用户领券记录次数String couponTemplateCacheKey =String.format(EngineRedisConstant.COUPON_TEMPLATE_KEY, requestParam.getCouponTemplateId());String userCouponTemplateLimitCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIMIT_KEY,UserContext.getUserId(), requestParam.getCouponTemplateId());Long stockDecrementLuaResult = stringRedisTemplate.execute(
buildLuaScript,ListUtil.of(couponTemplateCacheKey, userCouponTemplateLimitCacheKey),String.valueOf(couponTemplate.getValidEndTime().getTime()), limitPerPerson
);
// 判断 LUA 脚本执行返回类,如果失败根据类型返回报错提示long firstField =StockDecrementReturnCombinedUtil.extractFirstField(stockDecrementLuaResult);if(RedisStockDecrementErrorEnum.isFail(firstField)){thrownewServiceException(RedisStockDecrementErrorEnum.fromType(firstField));}
UserCouponRedeemEvent userCouponRedeemEvent =UserCouponRedeemEvent.builder().requestParam(requestParam).receiveCount((int)StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult)).couponTemplate(couponTemplate).userId(UserContext.getUserId()).build();SendResult sendResult = userCouponRedeemProducer.sendMessage(userCouponRedeemEvent);// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递if(ObjectUtil.notEqual(sendResult.getSendStatus().name(),"SEND_OK")){
log.warn("发送优惠券兑换消息失败,消息参数:{}",JSON.toJSONString(userCouponRedeemEvent));}}
我们的 Event 事件仅保留必须的,也就是 v1 接口后半部分需要新增数据库、缓存的内容。
代码如下所示:
@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassUserCouponRedeemEvent{
/**
* Web 请求参数
*/privateCouponTemplateRedeemReqDTO requestParam;
/**
* 领取次数
*/privateInteger receiveCount;
/**
* 优惠券模板
*/privateCouponTemplateQueryRespDTO couponTemplate;
/**
* 用户 ID
*/privateString userId;}
2. 消息消费者
开发用户兑换优惠券消息消费者,并通过幂等注解避免消息重复消费。
代码如下所示:
/**
* 用户兑换优惠券消息消费者
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部沟通群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-09-10
*/@Component@RequiredArgsConstructor@RocketMQMessageListener(
topic =EngineRockerMQConstant.COUPON_TEMPLATE_REDEEM_TOPIC_KEY,
consumerGroup =EngineRockerMQConstant.COUPON_TEMPLATE_REDEEM_CG_KEY)@Slf4j(topic ="UserCouponRedeemConsumer")publicclassUserCouponRedeemConsumerimplementsRocketMQListener<MessageWrapper<UserCouponRedeemEvent>>{
privatefinalUserCouponMapper userCouponMapper;privatefinalCouponTemplateMapper couponTemplateMapper;privatefinalUserCouponDelayCloseProducer couponDelayCloseProducer;privatefinalStringRedisTemplate stringRedisTemplate;
@NoMQDuplicateConsume(
keyPrefix ="user-coupon-redeem:",
key ="#messageWrapper.keys",
keyTimeout =600)@Transactional(rollbackFor =Exception.class)@OverridepublicvoidonMessage(MessageWrapper<UserCouponRedeemEvent> messageWrapper){// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
log.info("[消费者] 用户兑换优惠券 - 执行消费逻辑,消息体:{}",JSON.toJSONString(messageWrapper));
CouponTemplateRedeemReqDTO requestParam = messageWrapper.getMessage().getRequestParam();CouponTemplateQueryRespDTO couponTemplate = messageWrapper.getMessage().getCouponTemplate();String userId = messageWrapper.getMessage().getUserId();
int decremented = couponTemplateMapper.decrementCouponTemplateStock(Long.parseLong(requestParam.getShopNumber()),Long.parseLong(requestParam.getCouponTemplateId()),1L);if(!SqlHelper.retBool(decremented)){
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,扣减优惠券数据库库存失败,消息体:{}",JSON.toJSONString(messageWrapper));return;}
// 添加 Redis 用户领取的优惠券记录列表Date now =newDate();DateTime validEndTime =DateUtil.offsetHour(now,JSON.parseObject(couponTemplate.getConsumeRule()).getInteger("validityPeriod"));UserCouponDO userCouponDO =UserCouponDO.builder().couponTemplateId(Long.parseLong(requestParam.getCouponTemplateId())).userId(Long.parseLong(userId)).source(requestParam.getSource()).receiveCount(messageWrapper.getMessage().getReceiveCount()).status(UserCouponStatusEnum.UNUSED.getCode()).receiveTime(now).validStartTime(now).validEndTime(validEndTime).build();
userCouponMapper.insert(userCouponDO);
// 添加用户领取优惠券模板缓存记录String userCouponListCacheKey =String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY, userId);String userCouponItemCacheKey =StrUtil.builder().append(requestParam.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());
// 由于 Redis 在持久化或主从复制的极端情况下可能会出现数据丢失,而我们对指令丢失几乎无法容忍,因此我们采用经典的写后查询策略来应对这一问题Double scored;try{
scored = stringRedisTemplate.opsForZSet().score(userCouponListCacheKey, userCouponItemCacheKey);// scored 为空意味着可能 Redis Cluster 主从同步丢失了数据,比如 Redis 主节点还没有同步到从节点就宕机了,解决方案就是再新增一次if(scored ==null){// 如果这里也新增失败了怎么办?我们大概率做不到绝对的万无一失,只能尽可能增加成功率
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}}catch(Throwable ex){
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,查询Redis用户优惠券记录为空或抛异常,可能Redis宕机或主从复制数据丢失,基础错误信息:{}", ex.getMessage());// 如果直接抛异常大概率 Redis 宕机了,所以应该写个延时队列向 Redis 重试放入值。为了避免代码复杂性,这里直接写新增,大家知道最优解决方案即可
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());}
// 发送延时消息队列,等待优惠券到期后,将优惠券信息从缓存中删除UserCouponDelayCloseEvent userCouponDelayCloseEvent =UserCouponDelayCloseEvent.builder().couponTemplateId(requestParam.getCouponTemplateId()).userCouponId(String.valueOf(userCouponDO.getId())).userId(userId).delayTime(validEndTime.getTime()).build();SendResult sendResult = couponDelayCloseProducer.sendMessage(userCouponDelayCloseEvent);
// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递if(ObjectUtil.notEqual(sendResult.getSendStatus().name(),"SEND_OK")){
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,发送优惠券关闭延时队列失败,消息参数:{}",JSON.toJSONString(userCouponDelayCloseEvent));}}}
代码逻辑和 v1 基本一致,这也也就不再赘述。值得一说的,因为整段逻辑已经通过消息队列异步解耦,就没有通过 Canal 进行订阅消费。
方案存在的问题
1. Redis 极端场景
Redis 提供了两套持久化机制,RDB 快照和 AOF 日志文件追加。
- RDB 它会根据情况定期的 Fork 出一个子进程,生成当前数据库的全量快照。对于 RDB 快照,假如我们在 RDB 快照生成后宕机,那么会丢失快照生成期间全部增量数据,如果在连快照都没成功生成,那么就会丢掉全部数据。
- 另一个是 AOF,它通过向 AOF 日志文件追加每一条执行过的指令实现。而当我们仅开启了 AOF 时,丢失数据的多少取决于我们设置的刷盘策略:当设置为每条指令执行后都刷盘
Always
,我们最多丢失一条指令;当设置为每秒刷一次盘的Eversec
时,最多丢失一秒内的数据;当设置为非主动刷盘的No
时,则可能丢失上次刷盘后到现在的全部数据。
从持久化的角度上,我们哪怕使用了 AOF 的最多丢失一条指令配置,依然可能会丢数据场景。这里只说丢数据的场景,具体映射到项目业务中的问题,我们在下面的内容中说明。
除持久化外,Redis 默认使用异步复制方式将主节点的数据传递给从节点。这意味着在主节点成功写入数据后,可能会因为从节点没有及时接收到该数据而造成数据丢失。特别是在主节点崩溃或重启时,如果从节点尚未完成数据同步,最新的数据将会丢失。Redis 的三种集群模式:主从、哨兵以及 Cluster 模式都会有这种问题。
通过 Redis 丢数据的场景,我们联想到刚好在用户兑换优惠券时,Redis 丢数据了会对业务造成什么影响?
假设当前 Redis 某个优惠券模板库存为 1。当张三用户兑换优惠券,Redis 扣减返回成功,这时 Redis 库存应该是 0,然后程序开始发送 RocketMQ 消息队列执行后续保存数据库流程。很不巧,虽然 Redis 返回了成功,但持久化失败或者 Redis 主节点宕机,造成这个记录并没有真正意义上执行完成。这个时候李四来兑换优惠券,发现 Redis 优惠券模板还是 1,扣减然后发送 RocketMQ。
截止目前,张三和李四的优惠券兑换都是返回成功的。但是 RocketMQ 消费者消费时只有一个能成功,这就造成了问题,李四在使用的时候发现优惠券用不了,因为受消费者库存校验,并没有将优惠券记录新增到数据库中。该方案 造成的影响就是用户看到返回优惠券成功,但是实际上没有办法使用。
2. 库存扣减的几种场景
在应对于企业中不同场景的库存扣减需求,这里分析下:
- 在商品流量较低的情况下,通常不会出现大量请求同时访问单个商品进行库存扣减。此时,可以使用 Redis 进行防护,并直接同步到 MySQL 进行库存扣减,以防止商品超卖。虽然在此场景中涉及多个商品的数据扣减,可能会出现锁竞争,但竞争程度通常不会很激烈。
- 对于秒杀商品,通常会在短时间内出现大量请求同时访问单个商品进行库存扣减。为此,可以使用 Redis 进行防护,并直接将库存扣减同步到 MySQL,以防止商品超卖。由于秒杀商品的库存一般较少,因此造成的锁竞争相对可控。假设库存扣减采用串行方式,每次扣减耗时 5 毫秒,处理 100 个库存也仅需 500 毫秒。
- 某些秒杀商品的库存较多,或同时进行多个热门商品的秒杀(如直播间商品)。在这种情况下,直接扣减数据库库存会给系统带来较大压力,导致接口响应延迟。为应对这种场景,我们设计了优惠券秒杀 v2 接口。虽然基于 Redis 扣减库存和消息队列异步处理的方案可能会引发前后不一致的问题,但它能显著提升性能。此外,Redis 的持久化和主从宕机的风险相对较小。即使发生宕机,对平台或商家来说,也不会造成直接的损失。
我想强调的是,不存在绝对的银弹。Redis 之所以能快速响应,是因为它直接与内存交互,作为缓存中间件,如果每次都为了数据一致性而与磁盘交互,那就本末倒置了。我研究了市场上的云 Redis,包括腾讯 Redis 和阿里云 Tair,发现它们的持久化和主从复制本质上都是异步的。
在提到 Redis 扣减库存和 RocketMQ 异步场景时,如果 Redis 真的丢失了数据,应该怎么处理呢?我想到了一种秒杀的思路。类似于在 12306 购票或银行转账时,当你提交请求后并不会立即得到成功的反馈,而是会看到一个等待界面,然后在一段时间后再告知你结果。如果将这个模式应用到秒杀场景中,可以设想在 Redis 中成功扣减库存并投递消息到队列后,返回给用户一个“等待完成”的页面。大部分情况下,返回这个页面的用户有 99.9999% 的概率是成功的。随后,前端可以向后端请求具体结果,只有在消息队列的消费者成功扣减数据库后,才会返回真正的成功通知。
这种方法适合强绑定的业务,比如 12306 或银行,因为用户必须使用它们的 App。相对而言,如果是电商场景,用户可能对这种交互不太友好,因此不太愿意使用你的平台。这个方法可以作为面试时的讨论点,但在实际电商环境中,补偿机制的应用可能更为普遍。
文末总结
技术架构并非一成不变,合适的业务场景配合对应的架构往往能发挥最大的效用。对于类似订单或 12306 抢票这样的场景,每一笔数据都必须精确无误,这种情况下如果不依赖数据库保障一致性,单纯依赖 Redis 缓存会存在一定的不足。
但对于用户优惠券场景,偶尔多发一张优惠券问题不大,因此没必要将压力过多转移到数据库,毕竟数据库是宝贵的资源。通过消息队列进行异步解耦,不仅可以减少数据库的压力,还能提高系统的吞吐量,肯定更为合适。
如果不想多发优惠券,那我们可以在用户使用优惠券时,发现数据库中没有这个记录,就将用户的优惠券缓存删除,来确保数据的准确性。当然这种可能会引起用户反感,具体可以和产品沟通这种方案怎么解决,设置不可用的标记,或者直接给用户补上都可以。
完结,撒花 🎉
大佬们,马哥说单纯依赖 Redis 缓存会存在一定的不足,可能造成偶尔多发一张优惠券问题。请问在什么情况下,会导致多发一张优惠券呢
红红火火 回复 Dam:秒杀逻辑的第一步就是在redis中判断优惠卷是否有库存以及用户已领取这张优惠卷的次数,如果没库存了或者领取已达上限就直接返回。当缓存这关通过之后,需要修改数据库,并将用户已领取次数刷新到缓存中,但是因为这步是使用消息队列异步执行的,如果这条消息还没执行完时,这个用户又准备领取优惠卷,在redis中校验已领取次数时就会出现不一致的问题了,redis中记录的已领取次数比数据库中的少,可能会多领。
Young 回复 红红火火:你这个说法有点问题,用户已领取次数的改变是在LUA脚本中完成的
李温候 回复 Dam:李温候 回复 Dam:我的理解:操作流程是这样的,先操作redis,再异步操作mysql, 也就是说操作完reids之后,发完信息就直接返回了然后后面再操作mysql扣减库存啥的,但是如果redis宕机了,然后丢失了我们在lua脚本里面的命令(相当于扣减库存没成功),这时候redis的库存就比实现的库存多,这样就导致多发优惠券情况
马丁 回复 Dam:已更新文档,请查看【方案存在的问题】章节
。 回复 Dam:我觉得大概率是:Redis主存同步问题,LUA执行成功,主节点宕机但从节点未同步,导致数据不一致。
II:马哥,这第二种方式如何保障mysql和redis的数据一致性问题?
方卷 回复 II:使用这种方案的前提是可以接受少量的优惠券多领或者少领,也就是不那么强调一致性问题
Tiam. 回复 II:实在被面试官刁难的时候,可以说如果丢失redis命令,用户显示扣减成功,但是因为mysql最终判断库存已为零执行失败,会根据失败日志打印,进行补偿机制,虽然这个概率很低 仅存在redis宕机这种条件
咕噜灵波:这里用户领取优惠券次数,跟优惠券分发里面的没有冲突吗,分发里分发的优惠券没有设置领取次数的key。
马丁 回复 咕噜灵波:这个提的很好,应该要在分发的 Lua [stock_decrement_and_batch_save_user_record.lua] 里加上对应的领取 Key 限制
预备Mser:马哥,确实见过问缓存这边主从架构,主挂了且尚未同步,导致数据不一致且一定要在redis这解决问题的面试官。如果硬要回答话是说用主从的半同步机制还是说没法解决,只能依靠最终结果用mysql和redis对账最后补偿
o_0 回复 预备Mser:可以看看cache key选择性读主从方案
泡面大王:对于优惠券超发补偿,实际工作中怎么补偿的,能举个例子吗?
豆米:大佬们,如果出现了redis扣减库存了,然后异步处理mysql的时候出错,事务回滚了,那岂不是那个用户依然没有秒杀成功?
扶不上墙 回复 豆米:这种一般来说
1 如果重试可解决最后是mq重试解决一致性问题
2 但如果一直重试不成功, 应该还是用定时任务 进行实时数据对账 保证最终一致性
花开富贵:总结一下v1和v2。
v1中通过lua脚本扣减了缓存中的库存,如果扣减成功那就让用户领取优惠券记录存入数据库,在v1中添加记录的操作是顺序执行的,用户领取优惠券记录成功插数据库之后,如果userCouponListSaveCacheType是direct,那就顺序执行添加用户领取优惠券模板缓存到redis中以及发送延迟消息到mq等待优惠券到期后将优惠券信息从缓存中删除的操作,如果userCouponListSaveCacheType是binlog,也就是通过canal监听数据库,那就用一个消息消费者来异步执行添加用户领取优惠券模板缓存到redis以及发送延迟消息到mq等待优惠券到期后将优惠券信息从缓存中删除的操作。
v2其实最主要的变化就是操作数据库不顺序执行了,目的就是减少数据库的压力。执行lua脚本扣减缓存中的库存之后,如果扣减成功那就发送一个消息到mq,然后由消费者异步执行插入用户领取优惠券的记录到数据库中,如果插入成功,后面执行添加用户领取优惠券模板缓存到redis中以及发送延迟消息到mq等待优惠券到期后将优惠券信息从缓存中删除的操作也都是在消费者中执行的。
注意v2没有区分direct和binlog,而是直接让插入数据库的操作和执行添加用户领取优惠券模板缓存到redis中以及发送延迟消息到mq等待优惠券到期后将优惠券信息从缓存中删除的操作都在消费者中执行,并没有使用canal监听
第25小节:开发优惠券预约通知功能(一)
业务背景
大家可以类比 12306 购票预约功能。当购买紧张列车的车票时,我们通常会提前设置预约提醒,以便在开票时及时购买。类似地,优惠券预约提醒也具有很强的时效性,同时需要支持海量用户的提醒需求。为了实现这一点,我们选择了 RocketMQ 5.x 的任意延时消息功能,并通过线程池来并行提醒用户。
在实现过程中,我们采用了位图(bitmap)思想,巧妙地利用单一字段实现了多个时间段的预约提醒功能。通过这种方式,我们不仅满足了海量提醒的需求,还确保了系统的高效和时效性。
整体实现较为复杂,涉及到二进制操作较多,如果大家不熟悉二进制 &、^、位移等操作,需要网上提前学习下。
Git 分支
20240918_dev_coupon-remind-v2_rocketmq-bitmap_youya
本章节预约提醒优惠券核心代码由优雅同学贡献,感谢优雅提供的优秀代码设计。
数据库表设计
1. 用户预约表 SQL
CREATETABLE`t_coupon_template_remind`(`user_id`bigint(20)NOTNULLCOMMENT'用户ID',`coupon_template_id`bigint(20)NOTNULLCOMMENT'券ID',`information`bigint(20)DEFAULTNULLCOMMENT'存储信息',`shop_number`bigint(20)DEFAULTNULLCOMMENT'店铺编号',`start_time`datetimeDEFAULTNULLCOMMENT'优惠券开抢时间',PRIMARYKEY(`user_id`,`coupon_template_id`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='用户预约提醒信息存储表';
2. 是否分库分表?
考虑到主键由 coupon_template_id
和 user_id
组成,均为 bigint
类型,各占 8 字节,总计 16 字节。MySQL 的页大小为 16KB。
- 索引页:每条记录由主键(8 字节 + 8 字节)加上指针(6 字节)组成,共计 22 字节。因此,索引页最多可存储:16,384 字节 ÷ 22 字节 ≈ 744 条记录。
- 数据页:每条记录包含主键(16 字节)、
information
(8 字节)、shop_number
(8 字节)和start_time
(8 字节),合计 40 字节。因此,数据页最多可存储:16,384 字节 ÷ 40 字节 ≈ 410 条记录。
在三层 B+ 树结构下,最大可存储的记录数为:744(第一层)× 744(第二层)× 410(叶子节点)≈ 226,492,160 条记录。
即大约 2.26 亿条记录。考虑到实际情况中页内可能存在的其他开销,这个数值可能会略少,但至少可以支持 2 亿条记录。
假设用户量为 5000 万,那么平均每个用户可以预约 4 张券,完全满足需求。
通过设置定时任务,定期删除已过期的记录,可以有效控制数据量。因此,单表结构已经足够承载当前业务需求,无需进行分库分表。
3. 为什么不设置主键 ID?
如果我们额外创建一个自增的 id
作为主键,那么除了主键索引外,我们仍然需要通过 (user_id, coupon_template_id)
唯一定位一条记录。这意味着我们还需要建立一个联合索引 (user_id, coupon_template_id)
来提高查询性能。因此,额外添加一个 id
作为主键并没有实际意义。
没有显示给出主键的话,MySQL 会找个唯一索引做主键,如果没有唯一索引,就给个隐藏主键。我们通过唯一索引字段作为主键,可以节省额外的主键字段大小。
由于 (user_id, coupon_template_id)
本身就是唯一的,所以直接将其作为主键既能保证唯一性,又能提升查询效率。
为什么选择 (user_id, coupon_template_id)
而不是 (coupon_template_id, user_id)
作为主键?
因为用户可能会查询自己预约了哪些提醒券,我们需要根据 user_id
来检索他预约的所有券。将 user_id
放在前面,可以符合索引的最左前缀匹配原则,显著提高查询性能。
4. 存储信息指的是什么?
一个用户可以有多个券,每个券的信息都是用位图(存储信息字段)存储的,可以存用户对该优惠券模板的所有预约消息,一个 Long 类型的字段就搞定了。
存储信息字段中,我们使用 5 分钟作为一个间隔,支持提前一小时提醒,也就是最多支持一共有 12 个值。并且,我们能支持 5 种通知方式,通过位图的设计思路放到一个字段里,详情看下文描述。
创建优惠券预约提醒
1. 优惠券预约抢购提醒请求参数
首先咱们先说下请求参数的字段都有哪些,方便后续的业务代码跟进。
代码如下所示:
@Data@Schema(description ="优惠券预约抢券提醒请求参数实体")publicclassCouponTemplateRemindCreateReqDTO{
/**
* 优惠券模板id
*/@Schema(description ="优惠券模板id", example ="xxxxxx", required =true)privateString couponTemplateId;
/**
* 店铺编号
*/@Schema(description ="店铺编号", example ="1810714735922956666", required =true)privateString shopNumber;
/**
* 提醒方式
*/@Schema(description ="提醒方式", example ="0", required =true)privateInteger type;
/**
* 提醒时间,比如五分钟,十分钟,十五分钟
*/@Schema(description ="提醒时间", example ="5", required =true)privateInteger remindTime;}
字段解释如下所示:
- 优惠券模板 ID:建议咱们创建新的优惠券模板,创建的时候
validStartTime
需要个性化设置,比如你想设置提前 5、10 分钟提醒,假设当前时间 10:00,那可以设置 10:20,这样都能涵盖住 5、10 分钟的预约提醒测试。 - 店铺编号:使用示例店铺编号即可。
- 提醒方式:比如手机 APP 弹框提醒、邮件提醒等,目前主流是前者,我们这里 0 默认就是这个。
- 提醒时间:提前多少分钟提醒,以 5 分钟为单位,最多支持提前一小时。
2. 验证优惠券是否存在
首先通过我们优惠券模板 Service 中的查询模板信息接口进行判断,看模板是否存在,通过这个可以判断缓存击穿和穿透问题。
@Override@TransactionalpublicvoidcreateCouponRemind(CouponTemplateRemindCreateReqDTO requestParam){// 验证优惠券是否存在,避免缓存穿透问题并获取优惠券开抢时间CouponTemplateQueryRespDTO couponTemplate = couponTemplateService
.findCouponTemplate(newCouponTemplateQueryReqDTO(requestParam.getShopNumber(), requestParam.getCouponTemplateId()));// ......}
3. 创建或者更新优惠券提醒
逻辑简单梳理就是,查询数据库是否存在,不存在创建优惠券提醒,存在则将提醒时间更新到已有的记录中。
代码如下:
@Override@TransactionalpublicvoidcreateCouponRemind(CouponTemplateRemindCreateReqDTO requestParam){// ......// 查询用户是否已经预约过优惠券的提醒信息LambdaQueryWrapper<CouponTemplateRemindDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateRemindDO.class).eq(CouponTemplateRemindDO::getUserId,UserContext.getUserId()).eq(CouponTemplateRemindDO::getCouponTemplateId, requestParam.getCouponTemplateId());CouponTemplateRemindDO couponTemplateRemindDO = couponTemplateRemindMapper.selectOne(queryWrapper);
// 如果没创建过提醒if(couponTemplateRemindDO ==null){
couponTemplateRemindDO =BeanUtil.toBean(requestParam,CouponTemplateRemindDO.class);
// 设置优惠券开抢时间信息
couponTemplateRemindDO.setStartTime(couponTemplate.getValidStartTime());
couponTemplateRemindDO.setInformation(CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType()));
couponTemplateRemindDO.setUserId(Long.parseLong(UserContext.getUserId()));
couponTemplateRemindMapper.insert(couponTemplateRemindDO);}else{Long information = couponTemplateRemindDO.getInformation();Long bitMap =CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType());if((information & bitMap)!=0L){thrownewClientException("已经创建过该提醒了");}
couponTemplateRemindDO.setInformation(information ^ bitMap);
couponTemplateRemindMapper.update(couponTemplateRemindDO, queryWrapper);}// ......}
3.1 创建优惠券抢购提醒
这里有个很复杂的点,那就是如何通过创建时间和提醒类型进行位图运算为一个字段,这里抽取了个工具类。
代码如下所示:
/**
* 下一个类型的位移量,每个类型占用12个bit位,共计60分钟
*/privatestaticfinalintNEXT_TYPE_BITS=12;
/**
* 5分钟为一个间隔
*/privatestaticfinalintTIME_INTERVAL=5;
/**
* 根据预约时间和预约类型计算bitmap
*/publicstaticLongcalculateBitMap(Integer remindTime,Integer type){if(remindTime >TIME_INTERVAL*NEXT_TYPE_BITS){thrownewClientException("预约提醒的时间不能早于开票前"+TIME_INTERVAL*NEXT_TYPE_BITS+"分钟");}return1L<<(type *NEXT_TYPE_BITS+Math.max(0, remindTime /TIME_INTERVAL-1));}
四个字段(两个局部变量和两个入参)含义:
- NEXT_TYPE_BITS:表示每种提醒类型占用的位数。每个类型支持 12 个提醒时间点(如每 5 分钟一个提醒,最多支持提前 60 分钟),意味着每个提醒类型占据 12 个 bit 位。
- TIME_INTERVAL:表示提醒时间的间隔,以分钟为单位。这里定义为每 5 分钟作为一个间隔。即每 5 分钟计算一次提醒点。
- remindTime:传入的预约提醒时间,单位是分钟。代表用户希望在何时接收到提醒。
- type:传入的提醒类型。代表不同的提醒方式,如 App 通知、邮箱提醒等。
在牛券系统中,calculateBitMap
方法用于根据预约时间和预约类型计算相应的 bitmap。传入的参数包括 remindTime
(例如10分钟)和 type=0
(邮件提醒,枚举类 CouponRemindTypeEnum
中定义)。首先,对提醒时间进行校验,当前设定为提醒时间不能超过开票前60分钟。如果超过该时间,系统将抛出异常。
接着,根据提醒时间和类型计算相应的比特位。我们使用 long
类型的字段来存储提醒信息,总计64个比特位:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
每种提醒类型占用12个比特位,从左向右依次存储不同提醒类型的时间节点。以类型为 0
为例,该类型的提醒时间存储在第1到第12个比特位,每个比特位对应一个5分钟的时间间隔。
假设用户预约了类型为 0
的提醒,在第10分钟和第45分钟的节点,则会设置对应的比特位:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0010
可以看到,第2位和第9位的比特被置为 1
,表示预约了10分钟和45分钟的提醒。
对于类型为 1
的提醒(如短信提醒),其信息存储在第13到第24个比特位。如果用户预约了15分钟和50分钟的节点,结果如下:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0010 0000 0100 0000 0000 0000
每种提醒类型均占用12个比特位,可存储最多60分钟内的提醒信息。由于 long
类型有64个比特位,目前可以支持最多5种提醒类型。尽管当前的枚举中仅包含两种提醒方式,但为了后续扩展,我们依然选择使用 long
来存储这些信息。
在 calculateBitMap
的计算方法中,返回语句为:
return 1L << (type * NEXT_TYPE_BITS + Math.max(0, remindTime / TIME_INTERVAL - 1));
这段代码首先通过 type * NEXT_TYPE_BITS
计算提醒类型的偏移量。例如,type = 0
时,偏移量为 0
,表示从第0位开始;type = 1
时,偏移量为 12
,表示从第12位开始存储信息。
接下来,Math.max(0, remindTime / TIME_INTERVAL - 1)
是为了确保提醒时间在有效范围内。使用 Math.max
的目的是防止 remindTime / TIME_INTERVAL - 1
小于 0
,这通常会发生在 remindTime
小于5分钟的情况下。理论上,这种情况应该由前端确保,remindTime
的值是5分钟的倍数。因此,检验逻辑可以提前放在返回语句之前,如果 remindTime
小于5分钟,系统直接抛出参数错误异常。
remindTime / TIME_INTERVAL - 1
用于计算提醒时间应该设置的具体 bit 位。例如,如果 remindTime = 20
分钟,那么 remindTime / TIME_INTERVAL - 1 = 3
。假设 type = 0
,最终结果就是将 1L
向左移动3位:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
向左偏移3位:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 1000
这表示 type = 0
的提醒在20分钟的节点。
如果 type = 1
,提醒时间为15分钟,带入公式计算:
1 * NEXT_TYPE_BITS + Math.max(0, 15 / 5 - 1) = 14
结果为14位,因此 1L
向左偏移14位:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0100 0000 0000 0000
这表示 type = 1
的提醒在15分钟的节点。
3.2 修改优惠券抢购提醒
如果优惠券提醒已存在,我们需要判断提醒是否重复,如果重复存在,直接抛出异常,如果说两个提醒不存在重叠,则使用 information ^ bitMap
运算进行合并结果,修改放入数据库中。
information & bitMap
用于检查当前的提醒信息是否已经包含了用户设置的提醒,如果相同位置上的位都是 1
,则 if ((information & bitMap) != 0L)
会为 true
,系统抛出异常,提示用户已经创建过该提醒。
Long information = couponTemplateRemindDO.getInformation();Long bitMap =CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType());if((information & bitMap)!=0L){thrownewClientException("已经创建过该提醒了");}
couponTemplateRemindDO.setInformation(information ^ bitMap);
information ^ bitMap
是异或运算,给大家举个例子。我们假设:
- 1.
information
当前表示用户已经设置了提醒在 10分钟和45分钟 节点(类型为0
,即 App 提醒)。 - 2.用户这次希望设置新的提醒在 15分钟和50分钟 节点(类型为
0
,即同样是 App 提醒)。
information
的二进制表示:
- 10分钟节点对应的第2位设为
1
,45分钟节点对应的第9位设为1
。 - 所以
information
的二进制表示如下所示,其中第2位和第9位为1
,其他位为0
。
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0010
bitMap
是用户希望新增的提醒信息,表示 15分钟和50分钟 节点。
- 15分钟对应的第3位,50分钟对应的第10位。
bitMap
的二进制表示如下所示,其中第3位和第10位为1
,其他位为0
。
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0010 0000 0100
现在使用 ^
(异或)操作来合并两个提醒信息。对 information
和 bitMap
逐位进行异或运算。
- 相同的位:结果为
0
。 - 不同的位:结果为
1
。
information: 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0010
bitMap: 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0010 0000 0100
------------------------------------------------------------------------------------------------
result: 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0011 0000 0110
结果解释:
- 第2位 和 第9位 是
information
中原本已经设置的提醒节点,bitMap
对应位置是0
,所以这些位保持为1
。 - 第3位 和 第10位 是
bitMap
新增的提醒节点,information
对应位置是0
,异或运算后这些位变为1
,表示新增了提醒。 - 其他位保持不变,仍然为
0
。
最终的合并结果(information ^ bitMap
)是:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0011 0000 0110
这表示用户已经设置了在 10分钟、15分钟、45分钟 和 50分钟 的提醒节点。
^
运算的作用是合并用户已设置的提醒和新设置的提醒,不会覆盖原有提醒节点,而是逐位比较后,将不同的位置置为 1
,相同的位置保持不变。
4. 发送预约抢购优惠券延时消息
当我们把预约抢购信息保存到数据库之后,就要考虑怎么让用户在开抢之前接收通知,我们使用消息队列 RocketMQ 延时消息完成该需求。
4.1 为什么采用 MQ 延时消息
优雅最终设计这个方案之前,横向对比了其他技术方案:
- Redis 延时队列 / Redis 存储信息:Redis 以内存为主进行数据存储,当面对百万级消息时,即使使用 BitMap 进行压缩,也很难有效存储,尤其是这些消息可能需要几天后才被消费。这会导致内存占用巨大。而 RocketMQ 则通过硬盘存储数据,能够轻松处理和存储大量消息。
- MySQL 存储消息:将消息存储在 MySQL 中,并通过定时任务在指定时间提取消息进行消费。MySQL 的 QPS 大约为 5000 多,消费百万条消息大量读取的 IO 容易成为系统瓶颈。此外,随着数据量的增加,MySQL 的迭代速度非常快,引入定时任务删除也增加了系统的复杂性。
MySQL QPS 查询普遍指的是云 MySQL,8C16G,自建和配置较低的可能无法达到。
综上所述,使用 RocketMQ 是更优的选择,特别是在处理大量消息的场景下。它具备高效的硬盘存储和较高的消息吞吐量。然而,RocketMQ 也存在一定的缺点,例如,RocketMQ 5.x 版本尚未广泛普及,稳定性和兼容性难以完全保障,且需要我们对消息消费过程进行额外处理。
4.2 发送预约抢购优惠券消息
我们在业务代码里发送 RocketMQ 延时消息,延迟时间通过我们的优惠券开始时间减去设置的提醒时间即可。
代码如下所示:
@Override@TransactionalpublicvoidcreateCouponRemind(CouponTemplateRemindCreateReqDTO requestParam){// ......// 发送预约提醒抢购优惠券延时消息CouponRemindDelayEvent couponRemindDelayEvent =CouponRemindDelayEvent.builder().couponTemplateId(couponTemplate.getId()).userId(UserContext.getUserId()).contact(UserContext.getUserId()).shopNumber(couponTemplate.getShopNumber()).type(requestParam.getType()).remindTime(requestParam.getRemindTime()).startTime(couponTemplate.getValidStartTime()).delayTime(DateUtil.offsetMinute(couponTemplate.getValidStartTime(),-requestParam.getRemindTime()).getTime()).build();
couponRemindDelayProducer.sendMessage(couponRemindDelayEvent);}
后续的查看用户预约抢购提醒列表、取消用户抢购提醒以及发送提醒通知将在后续章节给大家揭晓。
文末总结
在比对了多种优惠券预约提醒的技术架构方案后,最终选择了 RocketMQ 5.x 的任意延时消息机制来实现预约提醒。通过线程池并行通知用户的方式,提升了提醒的效率。在此过程中,我们使用了位图技术,将多个时间段的提醒信息高效地存储在一个字段中,从而实现了对海量提醒的精准管理和高效处理。
完结,撒花 🎉
通过RocketMQ预约抢购优惠券消息,提醒时间只能是一个,比如提前5分钟提醒,假如我需要在5分钟、10分钟都提醒呢?如果这个不能由RocketMQ实现的话,那么前面通过异或合并两个提醒时间节点是不是没用了?
每次的requestParam中只有一个提醒时间,createCouponRemind方法对于相同的(user_id
,coupon_template_id
)添加多个提醒时间应该要执行多次,每次都会向消息队列发送消息;异或合并是为了将mysql中多个提醒时间合并成一条。
每次请求都是设置一个提醒的时间,然后发送延迟消息,下次请求再来设置的话就要判断之前是否设置过这个时间,如果没有设置过那就再发送延迟消息,每个提醒时间对应一条消息,异或的操作虽然是为了设置最终的提醒时间,但之后也方便通过异或后的结果判断新的请求设置的提醒时间是否之前设置过。
Long bitMap = CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType()); 将结果异或?位图中出现的每个1表示一次提醒,可以通过循环操作发送消息
十八是个木:关于这个将提醒时间和提醒类型绑定到一起的方案,被问到这个方案扩展性不好,如果后期想增加提醒方式,以12bit为一个类型的话,位数是有限的,无法扩展,同时还说到这样做增加复杂性,可读性降低,我说这样做不仅从性能上有提升,而且这种方案可以将不同提醒时间所对应不同提醒方式都存到一条记录里,比如可能前5分支短信提醒,前10分钟弹窗提醒,可以只用一条记录存储,然后又被问,那你这样又限制了提醒时间要以5分钟为一个单位,说假如现在3点38分,用户就想凑个整,4点提醒怎么办,而且这种记录在时间过完之后就没有什么作用了,可以定期删除,所以对数据库内存的占用也不会很多。总之,面试官更倾向于加一个提醒方式的字段,然后定期删除过期数据的方案。该如何取舍呢?还想请教大佬们一下在实际开发中,这个需求,用位运算将这两个字段拼到一起用的多,一般用的是什么方案
画中画着画不出的画 回复 十八是个木:其实我也觉得有些华而不实,包括前面Excel解析行数用线程池+redission做异步,正常MQ也行,可能马哥想教我们多种思路吧。面试官要问到,就说两种方案自己都知道,想试下这种新的吧
十八是个木 回复 画中画着画不出的画:对的,其实说到后面我自己也倾向与后面这种方案了,而且完全可以有主键ID,用户预约时直接插入提醒时间,提醒方式,而且在后面的取消预约中,在实际场景中,取消一般都会先查询,失效的预约前端直接限制了不可取消,查询完成后完全可以根据id去修改预约数据,就不用进行后面一堆位运算了
第26小节:开发优惠券预约通知功能(二)
业务背景
在上一个章节中,我们讨论了优惠券预约的设计,重点介绍了如何将预约提醒发送到 RocketMQ 延时队列中。在本章节,我们将详细说明如何将消息推送给用户,确保他们能够及时接收到预约提醒。
在 v1 分支的基础上,对优惠券模板提醒相关的类进行了名称重构,大家查看当前分支提交记录即可。
Git 分支
20240919_dev_coupon-remind-v2_rocketmq-bitmap_youya
本章节预约提醒优惠券核心代码由优雅同学贡献,感谢优雅提供的优秀代码设计。
推送用户预约提醒
1. 预约消息消费者
开发 RocketMQ 消息队列消费者,相关的用户提醒代码进行了抽象,放在了一个执行器中。
代码如下所示:
/**
* 提醒抢券消费者
* <p>
* 作者:优雅
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-21
*/@Component@RequiredArgsConstructor@RocketMQMessageListener(
topic ="one-coupon_engine-service_coupon-remind_topic${unique-name:}",
consumerGroup ="one-coupon_engine-service_coupon-remind_cg${unique-name:}")@Slf4j(topic ="CouponTemplateRemindDelayConsumer")publicclassCouponTemplateRemindDelayConsumerimplementsRocketMQListener<MessageWrapper<CouponTemplateRemindDelayEvent>>{
privatefinalCouponTemplateRemindExecutor couponTemplateRemindExecutor;
@OverridepublicvoidonMessage(MessageWrapper<CouponTemplateRemindDelayEvent> messageWrapper){// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
log.info("[消费者] 提醒用户抢券 - 执行消费逻辑,消息体:{}",JSON.toJSONString(messageWrapper));
CouponTemplateRemindDelayEvent event = messageWrapper.getMessage();CouponTemplateRemindDTO couponTemplateRemindDTO =BeanUtil.toBean(event,CouponTemplateRemindDTO.class);
// 根据不同策略向用户发送消息提醒
couponTemplateRemindExecutor.executeRemindCouponTemplate(couponTemplateRemindDTO);}}
向用户发送消息提醒的逻辑在 CouponTemplateRemindExecutor
执行器,分为几个版本演进。
代码如下所示:
/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/publicvoidexecuteRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO){// 向用户发起消息提醒switch(Objects.requireNonNull(CouponRemindTypeEnum.getByType(couponTemplateRemindDTO.getType()))){caseAPP-> sendAppMessageRemindCouponTemplate.remind(couponTemplateRemindDTO);caseEMAIL-> sendEmailRemindCouponTemplate.remind(couponTemplateRemindDTO);default->{}}}
2. 线程池并行发送提醒
通过第一版代码,我们可以明显发现一个问题:消息队列的消费速度较慢,因为是逐条处理每条消息后才消费下一条。而我们希望尽快将这些预约信息发送给用户。基于这一前提,我们可以采用线程池进行并行发送,以提高消息处理和发送的效率。
代码如下所示:
// 提醒用户属于 IO 密集型任务privatefinalExecutorService executorService =newThreadPoolExecutor(Runtime.getRuntime().availableProcessors()<<1,Runtime.getRuntime().availableProcessors()<<2,60,TimeUnit.SECONDS,newSynchronousQueue<>(),newThreadPoolExecutor.CallerRunsPolicy());/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/publicvoidexecuteRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO){
executorService.execute(()->{// 向用户发起消息提醒switch(Objects.requireNonNull(CouponRemindTypeEnum.getByType(couponTemplateRemindDTO.getType()))){caseAPP-> sendAppMessageRemindCouponTemplate.remind(couponTemplateRemindDTO);caseEMAIL-> sendEmailRemindCouponTemplate.remind(couponTemplateRemindDTO);default->{}}});}
线程池参数解析如下:
- 核心线程数:
Runtime.getRuntime().availableProcessors() << 1
CPU 核数 * 2,因为是 IO 密集型线程数可以多些。 - 最大线程数:
Runtime.getRuntime().availableProcessors() << 2
CPU 核数 * 4,因为是 IO 密集型线程数可以多些。 - 阻塞队列:SynchronousQueue,不缓冲任务。
- 拒绝策略:CallerRunsPolicy,通过提交任务线程运行被拒绝的任务。
我们通过线程池来加快预约任务的通知提醒处理。当线程池达到其处理能力的瓶颈时,采用任务拒绝策略,将被拒绝的任务交由提交任务的线程自行执行,以确保通知任务不会被丢弃并尽可能提高系统的处理效率。
3. 通过 Redis 延迟队列兜底任务
为了提升消息消费的速度,我们将任务投递到线程池后立即返回消息投递成功给 MQ。然而,如果此时发生了断电,线程池中的任务将会丢失。为了解决这个问题,我们需要引入标记或持久化操作,并在后续通过扫描检测任务状态,确保任务未丢失。
当收到消息时,使用 Redisson 的延时队列,发送一条延时 10 秒的消息。10 秒后,系统检查任务状态。如果消费成功,流程结束;如果消费失败,则重新投递消息,从而保证消息不会丢失。
需要考虑的几个问题:
- 1.为什么使用 Redis 的延时队列,而不是继续使用 RocketMQ 的延时消息? Redis 的延时队列适用于 10 秒的短时任务,数据在 10 秒后投递完成即删除。
- 2.是否有横向对比其它方案? 我们也考虑了 Redis 的过期监听机制。虽然实现简单,但由于过期监听的时间不够精准,且过期消息只发送一次,可靠性较差。如果消息未成功接收或者发生异常,无法再次收到,因此不够稳妥。
- 3.如果“检测状态”的机器也挂了怎么办? 由于我们使用的是 Redis 的延时队列,只要有一台机器存活,就能继续收到消息。除非所有机器同时宕机,否则消息仍可被消费。对于我们这个场景,偶尔丢失少量消息仅意味着少通知几个用户,可以接受,并不构成重大影响。
这种方案确保了在保证消息消费速度的同时,降低了消息丢失的风险。所以,我们要参考之前优惠券分发任务的 Redis Stream 延迟任务进行兜底。
代码如下所示:
/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/publicvoidexecuteRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO){// 假设刚把消息提交到线程池,突然应用宕机了,我们通过延迟队列进行兜底 RefreshRBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_BLOCKING_DEQUE);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);String key =String.format(COUPON_REMIND_CHECK_KEY, couponTemplateRemindDTO.getUserId(), couponTemplateRemindDTO.getCouponTemplateId(), couponTemplateRemindDTO.getRemindTime(), couponTemplateRemindDTO.getType());
stringRedisTemplate.opsForValue().set(key,JSON.toJSONString(couponTemplateRemindDTO));
delayedQueue.offer(key,10,TimeUnit.SECONDS);
executorService.execute(()->{// 向用户发起消息提醒switch(Objects.requireNonNull(CouponRemindTypeEnum.getByType(couponTemplateRemindDTO.getType()))){caseAPP-> sendAppMessageRemindCouponTemplate.remind(couponTemplateRemindDTO);caseEMAIL-> sendEmailRemindCouponTemplate.remind(couponTemplateRemindDTO);default->{}}
// 提醒用户后删除 Key
stringRedisTemplate.delete(key);});}
我们将任务暂存到 Redis String,并通过延迟队列进行兜底处理。首先检查 Redis String 是否存在,如果不存在,表示任务已成功消费;如果存在,则需要重新投递消息,确保任务被正确处理。
为了简化代码结构,避免创建过多的类,我们采用内部类的方式来完成这部分逻辑的实现。
代码如下所示:
@Slf4j@Component@RequiredArgsConstructorstaticclassRefreshCouponRemindDelayQueueRunnerimplementsCommandLineRunner{privatefinalCouponTemplateRemindDelayProducer couponRemindProducer;privatefinalRedissonClient redissonClient;privatefinalStringRedisTemplate stringRedisTemplate;@Overridepublicvoidrun(String... args){Executors.newSingleThreadExecutor(
runnable ->{Thread thread =newThread(runnable);
thread.setName("delay_coupon-remind_consumer");
thread.setDaemon(Boolean.TRUE);return thread;}).execute(()->{RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_BLOCKING_DEQUE);for(;;){try{// 获取延迟队列待消费 KeyString key = blockingDeque.take();if(stringRedisTemplate.hasKey(key)){
log.info("检查用户发送的通知消息Key:{} 未消费完成,开启重新投递", key);// Redis 中还存在该 Key,说明任务没被消费完,则可能是消费机器宕机了,重新投递消息CouponTemplateRemindDelayEvent couponTemplateRemindDelayEvent =JSONUtil.toBean(stringRedisTemplate.opsForValue().get(key),CouponTemplateRemindDelayEvent.class);
couponRemindProducer.sendMessage(couponTemplateRemindDelayEvent);// 提醒用户后删除 Key
stringRedisTemplate.delete(key);}}catch(Throwable ignored){}}});}}
常见问题答疑
1. 消息消费是否需要做幂等?
在我们这个场景中,没必要引入消息消费幂等机制,原因如下:
- 1.消息消费速度非常快:消息处理流程是收到消息后,通过 Redis 发送延时队列,任务立即丢进线程池执行,随后返回结果。在这种情况下,出问题的概率极低,绝大多数消息能够正常消费。
- 2.即使发生消息重复投递,影响也很小:对于少数未能正常消费的消息,可能会产生重复消费的情况。但在我们这个场景中,重复消费的影响仅限于重复通知用户,完全可以接受,不会产生重大问题。
- 3.引入幂等的代价过大:如果我们为消息消费引入幂等机制,需要承受以下开销:
- 需要两次 Redis 网络请求来检查和设置幂等标识。
- 代码中需要增加额外的逻辑判断。
- Redis 需要存储数百万条幂等标识,并且通常这些标识需要设置稍长(比如2 或者 10 分钟)的有效期,导致巨大的存储成本。
综上所述,做消息消费幂等的收益非常有限,但代价巨大,因此我们无需为此场景引入消费幂等机制。
2. 为什么通知类空实现?
发送消息提醒属于非核心功能,通常只需复制 API 实现即可。在实际开发和面试中,这类功能通常不会受到面试官的特别关注。因此,为了简化实现,我们在这里直接采用空实现的方式,省去不必要的复杂度。
3. 如何应对百万用户的券预约通知?
对于这个问题,实际上并没有所谓的“银弹”。在互联网企业中,面对如此大规模的通知需求,常见的解决方案是增加应用节点或扩容线程池中的线程数。以最近很火的华为 XT 非凡大师为例,平时的预约量可能只有几万,经过评估节点后可以正常发送。但是,面对百倍增长的情况,临时增加节点并扩容线程池参数,是我们应对大规模通知的主要手段。
除此之外,我们还可以提前几分钟通知用户,这样可以避免用户因通知延迟而错过提醒,确保用户及时收到通知并做出反应。
测试预约功能
1. 创建新的优惠券模板
大家记得通过后管服务创建新的优惠券模板,需要注意的是,如果你要测试提前 5 分钟通知,那么最好 validStartTime
时间是当前时间的 7 分钟后。这样,我们能空出来 2 分钟时间去创建预约提醒。
比如现在是 18:23,开始时间我就设置为 18:30。
{"name":"用户下单满10减3特大优惠","source":0,"target":1,"goods":"","type":0,"validStartTime":"2024-09-20 18:30:00","validEndTime":"2024-09-24 18:40:00","stock":4998,"receiveRule":"{\"limitPerPerson\":10,\"usageInstructions\":\"3\"}","consumeRule":"{\"termsOfUse\":10,\"maximumDiscountAmount\":3,\"explanationOfUnmetConditions\":\"3\",\"validityPeriod\":\"48\"}"}
2. 创建预约提醒功能
通过引擎服务调用创建预约提醒接口,复制上面创建的优惠券模板 ID,我们选择开抢前 5 分钟发起提醒。
{"couponTemplateId":"1837074807976407041","shopNumber":"1810714735922956666","type":0,"remindTime":5}
3. 查看消息队列消费者日志
可以看到,时间来到 18:25 的时候,我们的消息队列消费日志触发打印,逻辑正常执行。
2024-09-20T18:25:00.007+08:00INFO78960---[ading11045568_3]CouponTemplateRemindDelayConsumer:[消费者] 提醒用户抢券 - 执行消费逻辑,消息体:{"keys":"1810518709471555585:1837075848033775618","message":{"contact":"1810518709471555585","couponTemplateId":"1837074807976407041","delayTime":1726828080000,"remindTime":5,"shopNumber":"1810714735922956666","startTime":"2024-09-20 18:30:00","type":0,"userId":"1810518709471555585"},"timestamp":1726827998881,"uuid":"31d6d070-cbf0-4096-b3f4-f6feb739b972"}
文末总结
开发了优惠券预约通知的消息队列消费者,采用线程池并行发送用户预约提醒,确保用户能够及时收到通知。同时,为了保障线程池中的任务不丢失,我们通过 Redis Stream 延迟队列的方式来实现任务的可靠处理。这种方式既保证了提醒的实时性,也确保了高并发场景下任务的稳定性和可靠性。
关于用户取消预约请求以及查看用户预约列表,请看下回分解。
完结,撒花 🎉
第27小节:用户查询/取消优惠券预约提醒功能
业务背景
当用户预约了一个或多个优惠券抢购提醒后,如果不再需要提醒,可以取消预约通知。不过,虽然用户可以取消提醒,但已经发送到 MQ 的消息不会被撤回,消费者在时间点到达时依然会收到消息。此时,我们不应该再向用户发出提醒。因此,我们需要开发一个方法来判断用户是否取消了预约。同时,还需支持用户查询其已预约的优惠券列表信息,以便用户管理其预约状态。
Git 分支
20240920_dev_coupon-remind-v3_rocketmq-bitmap_youya
本章节预约提醒优惠券核心代码由优雅同学贡献,感谢优雅提供的优秀代码设计。
取消预约提醒
1. 取消用户预约优惠券提醒
有这样一种情况,用户预约了优惠券提醒后不想再预约场景,那我们就需要把这个提醒删除。
代码如下所示:
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{// ......
@OverridepublicvoidcancelCouponRemind(CouponTemplateRemindCancelReqDTO requestParam){// 验证优惠券是否存在,避免缓存穿透问题并获取优惠券开抢时间CouponTemplateQueryRespDTO couponTemplate = couponTemplateService
.findCouponTemplate(newCouponTemplateQueryReqDTO(requestParam.getShopNumber(), requestParam.getCouponTemplateId()));if(couponTemplate.getValidStartTime().before(newDate())){thrownewClientException("无法取消已开始领取的优惠券预约");}
LambdaQueryWrapper<CouponTemplateRemindDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateRemindDO.class).eq(CouponTemplateRemindDO::getUserId,UserContext.getUserId()).eq(CouponTemplateRemindDO::getCouponTemplateId, requestParam.getCouponTemplateId());CouponTemplateRemindDO couponTemplateRemindDO = couponTemplateRemindMapper.selectOne(queryWrapper);if(couponTemplateRemindDO ==null){thrownewClientException("优惠券模板预约信息不存在");}// 计算 BitMap 信息Long bitMap =CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType());if((bitMap & couponTemplateRemindDO.getInformation())==0L){thrownewClientException("您没有预约该时间点的提醒");}
bitMap ^= couponTemplateRemindDO.getInformation();
queryWrapper.eq(CouponTemplateRemindDO::getInformation, couponTemplateRemindDO.getInformation());if(bitMap.equals(0L)){// 如果新 BitMap 信息是 0,说明已经没有预约提醒了,可以直接删除if(couponTemplateRemindMapper.delete(queryWrapper)==0){// MySQL 乐观锁进行删除,如果删除失败,说明用户可能同时正在进行删除、新增提醒操作thrownewClientException("取消提醒失败,请刷新页面后重试");}}else{// 虽然删除了这个预约提醒,但还有其它提醒,那就更新数据库
couponTemplateRemindDO.setInformation(bitMap);if(couponTemplateRemindMapper.update(couponTemplateRemindDO, queryWrapper)==0){// MySQL 乐观锁进行更新,如果更新失败,说明用户可能同时正在进行删除、新增提醒操作thrownewClientException("取消提醒失败,请刷新页面后重试");}}}}
业务流程如下所示:
- 1.验证优惠券:根据查询优惠券模板方法避免缓存击穿和穿透,并且获取到优惠券模板详情后判断优惠券是否已开始领取,如果是的话抛出异常。
- 2.查询预约提醒记录:系统使用
userId
和couponTemplateId
在数据库中查找对应的提醒记录。如果找不到该记录,则抛出异常,提示“优惠券模板预约信息不存在”。如果找到记录,继续执行后续操作。 - 3.计算用户想要取消的提醒对应的 BitMap:使用
CouponTemplateRemindUtil.calculateBitMap()
方法,根据用户的remindTime
和type
计算出该提醒对应的bitMap
(位图)。 - 4.检查用户是否已经预约该提醒:通过
bitMap & couponTemplateRemindDO.getInformation()
检查数据库中的预约提醒信息是否包含该时间点的提醒。如果结果为0
,说明用户没有预约该时间点的提醒,抛出异常提示“您没有预约该时间点的提醒”。 - 5.更新 BitMap 信息:使用异或操作
bitMap ^= couponTemplateRemindDO.getInformation()
取消该时间点的提醒位。此时,bitMap
会去除用户想要取消的提醒对应的位。 - 6.判断更新后的 BitMap:如果
bitMap
为0
,说明用户取消了所有提醒,删除该预约提醒记录。如果bitMap
不为0
:说明用户取消了部分提醒,仍有其他提醒存在。系统更新数据库中的information
字段,保存剩余的提醒信息。
我们通过创建优惠券预约提醒功能,添加一个 0 类型,提前 15 分钟的预约提醒:
{"couponTemplateId":"1836986189085790209","shopNumber":"1810714735922956666","type":0,"remindTime":15}
需要注意,在后台服务创建优惠券模板时,validStartTime
不能小于当前时间。创建完成后,模板信息会被复制到创建优惠券预约提醒的入参中,并在 t_coupon_template_remind
表中生成一条预约提醒记录。
当用户通过取消预约提醒接口进行操作时,传入的参数依然是上述的模板信息。执行取消操作后,查看数据库时,可以发现对应的预约提醒记录会被逐步修改。如果用户创建了多个时间段的提醒,每次取消会修改记录中的提醒信息,直到最后一个预约时间被取消,才最终删除该记录。
2. 消息队列判断是否已取消预约
虽然用户可以取消提醒,但已经发送到 MQ 的消息不会被撤回,消费者在时间点到达时依然会收到消息。这时我们不应该再向用户发出提醒。所以,我们需要开发一个方法,那就是判断用户是否取消了预约。
代码如下所示:
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{
// ......
@OverridepublicbooleanisCancelRemind(CouponTemplateRemindDTO requestParam){LambdaQueryWrapper<CouponTemplateRemindDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateRemindDO.class).eq(CouponTemplateRemindDO::getUserId, requestParam.getUserId()).eq(CouponTemplateRemindDO::getCouponTemplateId, requestParam.getCouponTemplateId());CouponTemplateRemindDO couponTemplateRemindDO = couponTemplateRemindMapper.selectOne(queryWrapper);if(couponTemplateRemindDO ==null){// 数据库中没该条预约提醒,说明被取消returntrue;}
// 即使存在数据,也要检查该类型的该时间点是否有提醒Long information = couponTemplateRemindDO.getInformation();Long bitMap =CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType());
// 按位与等于 0 说明用户取消了预约return(bitMap & information)==0L;}}
通过该方法,我们在消息队列的消费者执行前加入判断,如果已取消则打印一行日志即可。
代码如下所示:
@Slf4j@Component@RequiredArgsConstructorpublicclassCouponTemplateRemindExecutor{
privatefinalCouponTemplateRemindService couponTemplateRemindService;
/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/publicvoidexecuteRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO){// 用户没取消预约,则发出提醒if(couponTemplateRemindService.isCancelRemind(couponTemplateRemindDTO)){
log.info("用户已取消优惠券预约提醒,参数:{}",JSON.toJSONString(couponTemplateRemindDTO));return;}
// ......}}
3. 布隆过滤器优化性能
是否需要每次消息消费时都查询数据库来检查用户是否取消了提醒呢?如果对每条消息都进行数据库查询,消息消费的效率就会受到数据库的瓶颈影响。
为了解决这个问题,可以使用布隆过滤器进行初步判断。当用户取消提醒时,我们根据(用户ID、券ID、提醒时间点、提醒类型)的四元组计算哈希值,并将其存入布隆过滤器。消息消费时,如果布隆过滤器中不存在该哈希值,则说明用户没有取消提醒,可以直接发送提醒。如果存在该哈希值,则有两种可能:
- 1.用户确实取消了提醒。
- 2.布隆过滤器发生了误判。
由于存在误判的可能性,我们必须进一步查询数据库,确认用户是否真的取消了提醒。不过这种情况很少出现,大部分请求已经被布隆过滤器过滤,剩下需要查询数据库的请求量很小。
关于使用 Redis 进行存储:
- 在数据量不大的情况下,可以使用 Redis 的
set
集合来存储这些四元组。 - 不能使用位图,因为位图适用于数字存储,而我们需要使用四元组的哈希值进行判断,强行使用位图会带来误判,与布隆过滤器的情况类似。
3.1 创建布隆过滤器
在优惠券查询布隆过滤器的基础上,添加防止取消提醒缓存穿透布隆过滤器。
代码如下所示:
@ConfigurationpublicclassRBloomFilterConfiguration{
/**
* 优惠券查询缓存穿透布隆过滤器
*/@BeanpublicRBloomFilter<String>couponTemplateQueryBloomFilter(RedissonClient redissonClient,@Value("${framework.cache.redis.prefix:}")String cachePrefix){RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(cachePrefix +"couponTemplateQueryBloomFilter");
bloomFilter.tryInit(640L,0.001);return bloomFilter;}
/**
* 防止取消提醒缓存穿透的布隆过滤器
*/@BeanpublicRBloomFilter<String>cancelRemindBloomFilter(RedissonClient redissonClient,@Value("${framework.cache.redis.prefix:}")String cachePrefix){RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(cachePrefix +"cancelRemindBloomFilter");
bloomFilter.tryInit(640L,0.001);return bloomFilter;}}
3.2 取消预约提醒加入布隆过滤器
在我们取消优惠券提醒方法的最后,将优惠券模板 ID、用户 ID、预约时间、预约类型获取 Hash 加入布隆过滤器。
代码如下所示:
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{// ......
privatefinalRBloomFilter<String> cancelRemindBloomFilter;@OverridepublicvoidcancelCouponRemind(CouponTemplateRemindCancelReqDTO requestParam){// ......
// 取消提醒这个信息添加到布隆过滤器中
cancelRemindBloomFilter.add(String.valueOf(Objects.hash(requestParam.getCouponTemplateId(),UserContext.getUserId(), requestParam.getRemindTime(), requestParam.getType())));}}
3.3 判断取消优惠券提醒
代码如下所示:
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{// ......
privatefinalRBloomFilter<String> cancelRemindBloomFilter;
@OverridepublicbooleanisCancelRemind(CouponTemplateRemindDTO requestParam){if(!cancelRemindBloomFilter.contains(String.valueOf(Objects.hash(requestParam.getCouponTemplateId(), requestParam.getUserId(), requestParam.getRemindTime(), requestParam.getType())))){// 布隆过滤器中不存在,说明没取消提醒,此时已经能挡下大部分请求returnfalse;}
// 对于少部分的“取消了预约”,可能是误判,此时需要去数据库中查找LambdaQueryWrapper<CouponTemplateRemindDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateRemindDO.class).eq(CouponTemplateRemindDO::getUserId, requestParam.getUserId()).eq(CouponTemplateRemindDO::getCouponTemplateId, requestParam.getCouponTemplateId());CouponTemplateRemindDO couponTemplateRemindDO = couponTemplateRemindMapper.selectOne(queryWrapper);if(couponTemplateRemindDO ==null){// 数据库中没该条预约提醒,说明被取消returntrue;}
// 即使存在数据,也要检查该类型的该时间点是否有提醒Long information = couponTemplateRemindDO.getInformation();Long bitMap =CouponTemplateRemindUtil.calculateBitMap(requestParam.getRemindTime(), requestParam.getType());
// 按位与等于 0 说明用户取消了预约return(bitMap & information)==0L;}}
查询预约提醒列表
1. 查询用户优惠券预约提醒列表
代码如下所示:
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{// ......
@OverridepublicList<CouponTemplateRemindQueryRespDTO>listCouponRemind(CouponTemplateRemindQueryReqDTO requestParam){String value = stringRedisTemplate.opsForValue().get(String.format(USER_COUPON_TEMPLATE_REMIND_INFORMATION, requestParam.getUserId()));if(value !=null){returnJSON.parseArray(value,CouponTemplateRemindQueryRespDTO.class);}
// 查出用户预约的信息LambdaQueryWrapper<CouponTemplateRemindDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateRemindDO.class).eq(CouponTemplateRemindDO::getUserId, requestParam.getUserId());List<CouponTemplateRemindDO> couponTemplateRemindDOlist = couponTemplateRemindMapper.selectList(queryWrapper);if(CollUtil.isEmpty(couponTemplateRemindDOlist))returnnewArrayList<>();
// 根据优惠券 ID 查询优惠券信息List<Long> couponIds = couponTemplateRemindDOlist.stream().map(CouponTemplateRemindDO::getCouponTemplateId).toList();List<Long> shopNumbers = couponTemplateRemindDOlist.stream().map(CouponTemplateRemindDO::getShopNumber).toList();List<CouponTemplateDO> couponTemplateDOList = couponTemplateService.listCouponTemplateByIds(couponIds, shopNumbers);List<CouponTemplateRemindQueryRespDTO> actualResult =BeanUtil.copyToList(couponTemplateDOList,CouponTemplateRemindQueryRespDTO.class);
// 填充响应结果的其它信息
actualResult.forEach(each ->{// 找到当前优惠券对应的预约提醒信息
couponTemplateRemindDOlist.stream().filter(i -> i.getCouponTemplateId().equals(each.getId())).findFirst().ifPresent(i ->{// 解析并填充预约提醒信息CouponTemplateRemindUtil.fillRemindInformation(each, i.getInformation());});});
stringRedisTemplate.opsForValue().set(String.format(USER_COUPON_TEMPLATE_REMIND_INFORMATION, requestParam.getUserId()),JSON.toJSONString(actualResult),1,TimeUnit.MINUTES);return actualResult;}}
逻辑整体来说比较简单,但是有两个我认为的难点:
- 如何将位图中的信息解析为正常的预约记录?
- 因为用户预约的优惠券可能是跨多个库的,如何完成跨库查询?
因为我们取消了用户的优惠券模板预约提醒,对应添加的缓存也需要删除,我们这里采用更新数据库删除缓存策略保障数据库和缓存一致性。
@Service@RequiredArgsConstructorpublicclassCouponTemplateServiceRemindImplextendsServiceImpl<CouponTemplateRemindMapper, CouponTemplateRemindDO>implementsCouponTemplateRemindService{// ......
privatefinalRBloomFilter<String> cancelRemindBloomFilter;@OverridepublicvoidcancelCouponRemind(CouponTemplateRemindCancelReqDTO requestParam){// ......
// 取消提醒这个信息添加到布隆过滤器中
cancelRemindBloomFilter.add(String.valueOf(Objects.hash(requestParam.getCouponTemplateId(),UserContext.getUserId(), requestParam.getRemindTime(), requestParam.getType())));// 删除用户预约提醒的缓存信息,通过更新数据库删除缓存策略保障数据库和缓存一致性
stringRedisTemplate.delete(String.format(USER_COUPON_TEMPLATE_REMIND_INFORMATION,UserContext.getUserId()));}}
2. 解析位图信息
外层循环逐一遍历每个时间节点,从 60 分钟(i = 11)开始,依次遍历到 5 分钟(i = 0),即从距离抢购时间最远的节点逐渐遍历到离当前时间最近的节点。对于每个时间节点,还需要遍历所有提醒类型。因此,最终的遍历顺序是:首先遍历 60 分钟节点的所有类型(如邮箱提醒、短信提醒),接着遍历 55 分钟节点的所有类型,依此类推,直到 5 分钟节点。
代码如下所示:
/**
* 下一个类型的位移量,每个类型占用12个bit位,共计60分钟
*/privatestaticfinalintNEXT_TYPE_BITS=12;
/**
* 5分钟为一个间隔
*/privatestaticfinalintTIME_INTERVAL=5;
/**
* 提醒方式的数量
*/privatestaticfinalintTYPE_COUNT=CouponRemindTypeEnum.values().length;
/**
* 填充预约信息
*/publicstaticvoidfillRemindInformation(CouponTemplateRemindQueryRespDTO resp,Long information){List<Date> dateList =newArrayList<>();List<String> remindType =newArrayList<>();Date validStartTime = resp.getValidStartTime();for(int i =NEXT_TYPE_BITS-1; i >=0; i--){// 按时间节点倒叙遍历,即离开抢时间最久,离现在最近for(int j =0; j <TYPE_COUNT; j++){// 对于每个时间节点,遍历所有类型if(((information >>(j *NEXT_TYPE_BITS+ i))&1)==1){// 该时间节点的该提醒类型用户有预约Date date =DateUtil.offsetMinute(validStartTime,-((i +1)*TIME_INTERVAL));
dateList.add(date);
remindType.add(CouponRemindTypeEnum.getDescribeByType(j));}}}
resp.setRemindTime(dateList);
resp.setRemindType(remindType);}
核心判断逻辑为:
((information >>(j *NEXT_TYPE_BITS+ i))&1)==1
在遍历过程中,j
代表提醒类型,i
代表时间节点(例如60分钟对应 i = 11)。因此,j * NEXT_TYPE_BITS + i
计算出位移量,例如当 j = 0
(邮件提醒)且 i = 10
(55分钟),计算结果为 0 * 12 + 10 = 10
。如果 information
在该类型和时间节点下有预约记录,那么 information
的位图类似:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0100 0000 0000
通过 information >> 10
可以得到:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
此时再与 1
进行按位与操作:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
&
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
= 1
说明该节点已经预约。如果结果为 0
,则说明没有预约。例如:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
&
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
= 0
通过这个偏移计算,可以准确判断 j
类型下 i
时间节点的 information
位图是否为1,从而确定用户是否预约了该时间点的提醒。
我们创建几个预约提醒后,通过接口查询结果得知:
{"code":"0","message":null,"data":[{"id":1836986189085790209,"shopNumber":1810714735922956666,"name":"用户下单满10减3特大优惠","source":0,"target":1,"goods":"","type":0,"validStartTime":"2024-09-20 18:50:00","validEndTime":"2024-09-24 18:40:00","receiveRule":"{\"limitPerPerson\": 10, \"usageInstructions\": \"3\"}","consumeRule":"{\"termsOfUse\": 10, \"validityPeriod\": \"48\", \"maximumDiscountAmount\": 3, \"explanationOfUnmetConditions\": \"3\"}","remindTime":["2024-09-20 18:20:00","2024-09-20 18:20:00","2024-09-20 18:25:00","2024-09-20 18:30:00","2024-09-20 18:35:00"],"remindType":["App通知","邮件提醒","App通知","App通知","App通知"]},{"id":1837033594250448898,"shopNumber":1810714735922956666,"name":"用户下单满10减3特大优惠","source":0,"target":1,"goods":"","type":0,"validStartTime":"2024-09-20 18:50:00","validEndTime":"2024-09-24 18:40:00","receiveRule":"{\"limitPerPerson\": 10, \"usageInstructions\": \"3\"}","consumeRule":"{\"termsOfUse\": 10, \"validityPeriod\": \"48\", \"maximumDiscountAmount\": 3, \"explanationOfUnmetConditions\": \"3\"}","remindTime":["2024-09-20 18:00:00","2024-09-20 18:05:00","2024-09-20 18:20:00"],"remindType":["邮件提醒","邮件提醒","邮件提醒"]}],"requestId":null,"fail":false,"success":true}
3. 批量查询优惠券模板
代码如下所示:
@OverridepublicList<CouponTemplateDO>listCouponTemplateByIds(List<Long> couponTemplateIds,List<Long> shopNumbers){// 1. 将 shopNumbers集合 对应的index拆分到数据库中Map<Integer, List<Long>> databaseIndexMap =splitIndexByDatabase(shopNumbers);List<CouponTemplateDO> result =newArrayList<>();// 2. 对每个数据库执行查询for(Map.Entry<Integer, List<Long>> entry : databaseIndexMap.entrySet()){List<Long> shopNumbersSubset = entry.getValue();// 执行查询List<CouponTemplateDO> couponTemplateDOList =queryDatabase(couponTemplateIds, shopNumbersSubset);
result.addAll(couponTemplateDOList);}return result;}privateList<CouponTemplateDO>queryDatabase(List<Long> couponTemplateIds,List<Long> shopNumbers){LambdaQueryWrapper<CouponTemplateDO> queryWrapper =Wrappers.lambdaQuery(CouponTemplateDO.class).in(CouponTemplateDO::getShopNumber, shopNumbers).in(CouponTemplateDO::getId, couponTemplateIds);return couponTemplateMapper.selectList(queryWrapper);}privateMap<Integer, List<Long>>splitIndexByDatabase(List<Long> shopNumbers){Map<Integer, List<Long>> databaseShopNumberMap =newHashMap<>();for(Long shopNumber : shopNumbers){int databaseMod =DBShardingUtil.doCouponSharding(shopNumber);
databaseShopNumberMap
.computeIfAbsent(databaseMod, k ->newArrayList<>()).add(shopNumber);}return databaseShopNumberMap;}
数据库分片工具类逻辑很简单,按照我们的分片算法类进行分片即可,这样就能知道数据在哪个库中。
代码如下所示:
publicfinalclassDBShardingUtil{
/**
* 获取数据库分片算法类,在该类初始化时向 Singleton 放入实例
*/privatestaticfinalDBHashModShardingAlgorithm dbShardingAlgorithm =Singleton.get(DBHashModShardingAlgorithm.class);
/**
* 解决查询商家优惠券 IN 场景跨库表不存在问题
*
* @param shopNumber 分片键 shopNumber
* @return 返回 shopNumber 所在的数据源
*/publicstaticintdoCouponSharding(Long shopNumber){return dbShardingAlgorithm.getShardingMod(shopNumber,getAvailableDatabases().size());}
/**
* 获取可用的数据源列表
*/privatestaticCollection<String>getAvailableDatabases(){returnList.of("ds0","ds1");}}
为什么不直接使用 selectList 查询而是需要进行拆分再多次查询?因为一组优惠券 ID 中可能会牵扯多个库,这样就会出现跨库查询问题。
为此我们按照不同优惠券 ID 的数据库进行分类,比如一共有 5000 条记录,ds0 下有 2600 条记录,ds1 下有 2400 条记录,分别查询即可成功。
文末总结
本章节将继续围绕用户优惠券预约提醒功能的开发展开,支持用户取消预约提醒。取消后,RocketMQ 消费者可以感知到取消操作,不再发送通知。同时,通过各种位运算,系统能够解析出用户的优惠券模板预约列表,让用户方便地管理其预约信息。
完结,撒花 🎉
第28小节:完成锁定/核销/退还优惠券功能
业务背景
兑换优惠券之后,我们可以在订单结算时使用优惠券,这个时候优惠券状态就会变成锁定中;如果用户支付了订单,优惠券状态变更为已使用;如果订单退款,用户优惠券回退到用户账户里,优惠券状态回退到未使用状态。
- 兑换优惠券(未使用):优惠券被领取到用户账户中,初始状态为
未使用
。 - 订单结算(锁定中):当用户在订单中使用优惠券时,优惠券状态应变更为
锁定中
。此时优惠券不可被其他订单再次使用,直到订单完成或取消。 - 支付成功(已使用):用户支付订单后,优惠券状态变更为
已使用
,表示优惠券使用成功,不可再被使用。 - 订单退款(未使用):如果订单取消或发生退款,优惠券状态回退至
未使用
,重新回到用户账户中,可被再次使用。
Git 分支
先从 main 分支上查看,代码入口:UserCouponController
。
数据库设计
1. 结算表 SQL
进入 one_coupon_rebuild_0
数据库中执行下述 SQL 语句。
SQL 语句如下所示:
CREATETABLE`t_coupon_settlement_0`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_1`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_2`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_3`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_4`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_5`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_6`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_7`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
继续进入 one_coupon_rebuild_1
数据库中执行下述 SQL 语句。
SQL 语句如下所示:
CREATETABLE`t_coupon_settlement_8`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_9`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_10`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_11`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_12`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_13`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_14`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
CREATETABLE`t_coupon_settlement_15`(`id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'ID',`order_id`bigint(20)DEFAULTNULLCOMMENT'订单ID',`user_id`bigint(20)DEFAULTNULLCOMMENT'用户ID',`coupon_id`bigint(20)DEFAULTNULLCOMMENT'优惠券ID',`status`int(11)DEFAULTNULLCOMMENT'结算单状态 0:锁定 1:已取消 2:已支付 3:已退款',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLCOMMENT'修改时间',PRIMARYKEY(`id`),KEY`idx_user_id`(`user_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='优惠券结算单表';
为什么结算单表比用户优惠券表小?
这其实是个比较常规问题,因为我们领取优惠券很多,实际上用的很少,十分之一都是说的算多了,这里放了 16 张表作为示例。
2. 分库分表配置
Engine 引擎服务的 shardingsphere-config.yaml
也需要添加上相关的分库分表配置。
ShardingSphere 完整分库分表配置如下所示:
# 数据源集合dataSources:# 自定义数据源名称,可以是 ds_0 也可以叫 datasource_0 都可以ds_0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://127.0.0.1:3306/one_coupon_rebuild_0?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: root
ds_1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://127.0.0.1:3306/one_coupon_rebuild_1?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: root
rules:-!SHARDING
tables:# 需要分片的数据库表集合
t_coupon_template:# 优惠券模板表# 真实存在数据库中的物理表
actualDataNodes: ds_${0..1}.t_coupon_template_${0..15}
databaseStrategy:# 分库策略
standard:# 单分片键分库
shardingColumn: shop_number # 分片键
shardingAlgorithmName: coupon_template_database_mod # 库分片算法名称,对应 rules[0].shardingAlgorithms
tableStrategy:# 分表策略
standard:# 单分片键分表
shardingColumn: shop_number # 分片键
shardingAlgorithmName: coupon_template_table_mod # 表分片算法名称,对应 rules[0].shardingAlgorithms
t_user_coupon:
actualDataNodes: ds_${0..1}.t_user_coupon_${0..31}
databaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: user_coupon_database_mod
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: user_coupon_table_mod
t_coupon_settlement:# 新加的
actualDataNodes: ds_${0..1}.t_coupon_settlement_${0..15}
databaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: user_coupon_settlement_database_mod
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: user_coupon_settlement_table_mod
shardingAlgorithms:# 分片算法定义集合
coupon_template_database_mod:# 优惠券分库算法定义
type: CLASS_BASED # 根据自定义库分片算法类进行分片
props:# 分片相关属性# 自定义库分片算法Class
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.DBHashModShardingAlgorithm
sharding-count:16# 分片总数量
strategy: standard # 分片类型,单字段分片
coupon_template_table_mod:# 优惠券分表算法定义
type: CLASS_BASED # 根据自定义库分片算法类进行分片
props:# 分片相关属性# 自定义表分片算法Class
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.TableHashModShardingAlgorithm
strategy: standard # 分片类型,单字段分片
user_coupon_database_mod:
type: CLASS_BASED
props:
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.DBHashModShardingAlgorithm
sharding-count:32
strategy: standard
user_coupon_table_mod:
type: CLASS_BASED
props:
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.TableHashModShardingAlgorithm
strategy: standard
user_coupon_settlement_database_mod:
type: CLASS_BASED
props:
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.DBHashModShardingAlgorithm
sharding-count:16
strategy: standard
user_coupon_settlement_table_mod:
type: CLASS_BASED
props:
algorithmClassName: com.nageoffer.onecoupon.engine.dao.sharding.TableHashModShardingAlgorithm
strategy: standard
props:# 配置 ShardingSphere 默认打印 SQL 执行语句sql-show:true
锁定优惠券
用户在订单结算时使用优惠券,创建优惠券结算单,并将用户优惠券的状态从“未使用”变更为“锁定中”,确保优惠券在订单支付过程中被锁定,避免并发情况下同一优惠券被重复使用。
1. 获取分布式锁
代码如下时所:
RLock lock = redissonClient.getLock(String.format(EngineRedisConstant.LOCK_COUPON_SETTLEMENT_KEY, requestParam.getCouponId()));boolean tryLock = lock.tryLock();if(!tryLock){thrownewClientException("正在创建优惠券结算单,请稍候再试");}
- 获取分布式锁:使用
Redisson
获取基于Redis
的分布式锁,防止并发情况下同一优惠券被多个线程同时使用。 - 锁的 Key:锁的 Key 为
LOCK_COUPON_SETTLEMENT_KEY + couponId
,表示锁定某个具体优惠券的结算操作。 - tryLock 判断:如果获取不到锁,则表示当前优惠券正在创建结算单,抛出异常,提示稍后再试。
2. 检查优惠券状态
代码如下所示:
LambdaQueryWrapper<CouponSettlementDO> queryWrapper =Wrappers.lambdaQuery(CouponSettlementDO.class).eq(CouponSettlementDO::getCouponId, requestParam.getCouponId()).eq(CouponSettlementDO::getUserId,Long.parseLong(UserContext.getUserId())).in(CouponSettlementDO::getStatus,0,2);
if(couponSettlementMapper.selectOne(queryWrapper)!=null){thrownewClientException("请检查优惠券是否已使用");}
- 检查优惠券状态:通过
CouponSettlementDO
查询当前用户的优惠券是否已经有结算记录。- 状态为
0
表示“锁定中”,状态为2
表示“已使用”。
- 状态为
- 避免重复使用:如果查询结果不为空,说明优惠券正在使用或已使用,抛出异常提示“优惠券已使用”。
3. 用户优惠券的有效性和状态
代码如下所示:
UserCouponDO userCouponDO = userCouponMapper.selectOne(Wrappers.lambdaQuery(UserCouponDO.class).eq(UserCouponDO::getId, requestParam.getCouponId()).eq(UserCouponDO::getUserId,Long.parseLong(UserContext.getUserId())));
if(Objects.isNull(userCouponDO)){thrownewClientException("优惠券不存在");}if(userCouponDO.getValidEndTime().before(newDate())){thrownewClientException("优惠券已过期");}if(userCouponDO.getStatus()!=0){thrownewClientException("优惠券使用状态异常");}
- 检查优惠券是否存在:根据用户 ID 和优惠券 ID 查询用户的优惠券数据,验证优惠券的存在性。
- 检查优惠券有效期:判断优惠券是否过期,如果过期则抛出异常。
- 检查优惠券状态:验证优惠券是否处于“未使用”状态(
status = 0
),如果不是,抛出“优惠券使用状态异常”。
4. 获取优惠券模板和消费规则,计算折扣金额
代码如下所示:
CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplate(newCouponTemplateQueryReqDTO(requestParam.getShopNumber(),String.valueOf(userCouponDO.getCouponTemplateId())));JSONObject consumeRule =JSONObject.parseObject(couponTemplate.getConsumeRule());
- 查询优惠券模板:根据优惠券模板 ID 和店铺编号(
shopNumber
)查询优惠券模板信息。 - 解析优惠券的消费规则:使用
JSONObject
解析消费规则(例如满减条件、折扣比例等)。
5. 根据不同的优惠券类型计算折扣金额
商品专属优惠券:如果 couponTemplate.getTarget()
为 0
,表示优惠券是商品专属券。
- 检查商品编号是否匹配,并计算商品的折扣金额。
- 如果商品金额和折扣金额不一致,则抛出异常。
店铺专属优惠券:如果 couponTemplate.getTarget()
为 1
,表示优惠券是店铺专属券。
- 检查店铺编号是否匹配,并根据优惠券的类型(立减、满减、折扣)计算折扣金额。
计算折扣后金额并检查,代码如下所示:
BigDecimal actualPayableAmount = requestParam.getOrderAmount().subtract(discountAmount);if(actualPayableAmount.compareTo(requestParam.getPayableAmount())!=0){thrownewClientException("折扣后金额不一致");}
- 计算实际应付金额:使用订单金额减去折扣金额来计算实际应付金额,并与请求参数中的
payableAmount
进行比较。 - 验证一致性:如果计算的金额和请求金额不一致,抛出异常。
6. 创建优惠券结算单,并更新优惠券状态
使用 Spring 的 TransactionTemplate
控制事务范围,确保在同一个事务中创建结算单和更新用户优惠券状态。
结算单状态:
- 0:锁定
- 1:已取消
- 2:已支付
- 3:已退款
代码如下所示:
transactionTemplate.executeWithoutResult(status ->{try{// 创建优惠券结算单记录CouponSettlementDO couponSettlementDO =CouponSettlementDO.builder().orderId(requestParam.getOrderId()).couponId(requestParam.getCouponId()).userId(Long.parseLong(UserContext.getUserId())).status(0)// 状态 0 表示“锁定中”.build();
couponSettlementMapper.insert(couponSettlementDO);
// 变更用户优惠券状态LambdaUpdateWrapper<UserCouponDO> userCouponUpdateWrapper =Wrappers.lambdaUpdate(UserCouponDO.class).eq(UserCouponDO::getId, requestParam.getCouponId()).eq(UserCouponDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(UserCouponDO::getStatus,UserCouponStatusEnum.UNUSED.getCode());UserCouponDO updateUserCouponDO =UserCouponDO.builder().status(UserCouponStatusEnum.LOCKING.getCode())// 将状态更新为“锁定中”.build();
userCouponMapper.update(updateUserCouponDO, userCouponUpdateWrapper);}catch(Exception ex){
log.error("创建优惠券结算单失败", ex);
status.setRollbackOnly();// 事务回滚throw ex;}});
- 创建结算单记录:在数据库中插入一条优惠券结算单记录。
- 更新优惠券状态:将用户的优惠券状态从“未使用”更新为“锁定中”。
- 事务控制:确保以上操作在同一个事务中执行,如果发生异常,则进行事务回滚。
7. 删除缓存中的用户优惠券数据
从 Redis 中删除该优惠券,防止用户再次看到并使用。代码如下所示:
String userCouponItemCacheKey =StrUtil.builder().append(userCouponDO.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().remove(String.format(USER_COUPON_TEMPLATE_LIST_KEY,UserContext.getUserId()), userCouponItemCacheKey);
核销优惠券
在优惠券结算过程中,核销优惠券结算单,并且将对应的优惠券状态从“锁定中”变更为“已使用”,保证结算单和用户优惠券状态的一致性。
通过获取分布式锁(Redisson
)来确保同一优惠券结算单在多线程或并发操作时不被重复核销,并使用编程式事务控制(TransactionTemplate
)来确保数据操作的原子性。
代码功能概述:
- 获取分布式锁:防止并发情况下同一优惠券结算单被重复核销。
- 事务管理:通过
TransactionTemplate
控制结算单状态和用户优惠券状态的原子性变更。 - 核销优惠券结算单:将优惠券结算单的状态更新为“已支付”。
- 更新用户优惠券状态:将用户优惠券状态从“锁定中”更新为“已使用”。
- 异常处理和日志记录:在任何操作出现异常时,进行事务回滚,并记录详细的异常信息。
1. 获取分布式锁防并发
使用 Redisson
创建一个基于 Redis
的分布式锁 RLock
,如果当前结算单已经在被其他线程或其他操作进行核销,锁获取失败(tryLock
返回 false
),则抛出异常,提示“正在核销优惠券结算单,请稍候再试”。
代码如下所示:
RLock lock = redissonClient.getLock(String.format(EngineRedisConstant.LOCK_COUPON_SETTLEMENT_KEY, requestParam.getCouponId()));boolean tryLock = lock.tryLock();if(!tryLock){thrownewClientException("正在核销优惠券结算单,请稍候再试");}
2. 修改优惠券结算单,并更新优惠券状态
使用 Spring 的 TransactionTemplate
来手动控制事务,确保操作的原子性和一致性。
核销优惠券结算单,更新状态为“已支付”,状态 0
表示优惠券结算单处于“锁定中”,防止重复核销已支付的结算单。将 CouponSettlementDO
对象的状态更新为 2
(表示“已支付”),并执行更新操作。如果 update
操作返回结果为 0
(表示未更新任何记录),则抛出异常。
将用户优惠券的状态更新为 USED
(表示“已使用”),更新后检查是否成功,如果失败,记录日志并抛出异常。
代码如下所示:
// 通过编程式事务减小事务范围
transactionTemplate.executeWithoutResult(status ->{try{// 变更优惠券结算单状态为已支付LambdaUpdateWrapper<CouponSettlementDO> couponSettlementUpdateWrapper =Wrappers.lambdaUpdate(CouponSettlementDO.class).eq(CouponSettlementDO::getCouponId, requestParam.getCouponId()).eq(CouponSettlementDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(CouponSettlementDO::getStatus,0);CouponSettlementDO couponSettlementDO =CouponSettlementDO.builder().status(2).build();int couponSettlementUpdated = couponSettlementMapper.update(couponSettlementDO, couponSettlementUpdateWrapper);if(!SqlHelper.retBool(couponSettlementUpdated)){
log.error("核销优惠券结算单异常,请求参数:{}", com.alibaba.fastjson.JSON.toJSONString(requestParam));thrownewServiceException("核销优惠券结算单异常");}
// 变更用户优惠券状态LambdaUpdateWrapper<UserCouponDO> userCouponUpdateWrapper =Wrappers.lambdaUpdate(UserCouponDO.class).eq(UserCouponDO::getId, requestParam.getCouponId()).eq(UserCouponDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(UserCouponDO::getStatus,UserCouponStatusEnum.LOCKING.getCode());UserCouponDO userCouponDO =UserCouponDO.builder().status(UserCouponStatusEnum.USED.getCode()).build();int userCouponUpdated = userCouponMapper.update(userCouponDO, userCouponUpdateWrapper);if(!SqlHelper.retBool(userCouponUpdated)){
log.error("修改用户优惠券记录状态已使用异常,请求参数:{}", com.alibaba.fastjson.JSON.toJSONString(requestParam));thrownewServiceException("修改用户优惠券记录状态异常");}}catch(Exception ex){
log.error("核销优惠券结算单失败", ex);
status.setRollbackOnly();throw ex;}finally{
lock.unlock();}});
退款优惠券
将用户已经使用的优惠券恢复为“未使用”状态,并将对应的优惠券结算单状态变更为“已退款”。在优惠券状态变更完成后,还将优惠券重新放回 Redis 缓存中,便于用户后续继续使用该优惠券。
代码逻辑概述:
- 获取分布式锁:通过分布式锁确保同一时间只有一个线程能够对相同的优惠券进行退款处理,防止并发操作引发数据不一致。
- 编程式事务控制:使用
TransactionTemplate
控制事务范围,保证结算单和用户优惠券状态的原子性变更。 - 更新优惠券结算单状态:将优惠券结算单状态从“已使用”变更为“已退款”。
- 恢复用户优惠券状态:将用户优惠券状态从“已使用”恢复为“未使用”。
- 将优惠券放回 Redis 缓存:查询用户优惠券记录,将优惠券重新放回 Redis 中,便于用户后续继续使用。
- 异常处理和锁释放:处理过程中出现任何异常时,记录日志并回滚事务,同时保证分布式锁能够被正确释放。
1. 获取分布式锁防止并发
如果当前优惠券已经在被其他线程或操作进行退款处理,锁获取失败(tryLock
返回 false
),则抛出异常,提示“正在执行优惠券退款,请稍候再试”。
代码如下所示:
RLock lock = redissonClient.getLock(String.format(EngineRedisConstant.LOCK_COUPON_SETTLEMENT_KEY, requestParam.getCouponId()));boolean tryLock = lock.tryLock();if(!tryLock){thrownewClientException("正在执行优惠券退款,请稍候再试");}
2. 更新结算单和优惠券状态
使用 Spring 的 TransactionTemplate
来手动控制事务,确保操作的原子性和一致性。
将 CouponSettlementDO
对象的状态更新为 3
(“已退款”),并执行更新操作。如果 update
操作返回结果为 0
(表示未更新任何记录),则抛出异常。
将用户优惠券的状态更新为 UNUSED
(“未使用”),检查更新是否成功,如果失败,记录日志并抛出异常。
代码如下所示:
// 通过编程式事务减小事务范围
transactionTemplate.executeWithoutResult(status ->{try{// 变更优惠券结算单状态为已退款LambdaUpdateWrapper<CouponSettlementDO> couponSettlementUpdateWrapper =Wrappers.lambdaUpdate(CouponSettlementDO.class).eq(CouponSettlementDO::getCouponId, requestParam.getCouponId()).eq(CouponSettlementDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(CouponSettlementDO::getStatus,2);CouponSettlementDO couponSettlementDO =CouponSettlementDO.builder().status(3).build();int couponSettlementUpdated = couponSettlementMapper.update(couponSettlementDO, couponSettlementUpdateWrapper);if(!SqlHelper.retBool(couponSettlementUpdated)){
log.error("优惠券结算单退款异常,请求参数:{}", com.alibaba.fastjson.JSON.toJSONString(requestParam));thrownewServiceException("核销优惠券结算单异常");}
// 变更用户优惠券状态LambdaUpdateWrapper<UserCouponDO> userCouponUpdateWrapper =Wrappers.lambdaUpdate(UserCouponDO.class).eq(UserCouponDO::getId, requestParam.getCouponId()).eq(UserCouponDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(UserCouponDO::getStatus,UserCouponStatusEnum.USED.getCode());UserCouponDO userCouponDO =UserCouponDO.builder().status(UserCouponStatusEnum.UNUSED.getCode()).build();int userCouponUpdated = userCouponMapper.update(userCouponDO, userCouponUpdateWrapper);if(!SqlHelper.retBool(userCouponUpdated)){
log.error("修改用户优惠券记录状态未使用异常,请求参数:{}", com.alibaba.fastjson.JSON.toJSONString(requestParam));thrownewServiceException("修改用户优惠券记录状态异常");}}catch(Exception ex){
log.error("执行优惠券结算单退款失败", ex);
status.setRollbackOnly();throw ex;}});
查询用户优惠券记录,构建缓存 Key,使用 ZSet
数据结构将优惠券信息放回 Redis 中,确保用户可以在后续继续使用该优惠券。
代码如下所示:
// 查询出来优惠券再放回缓存UserCouponDO userCouponDO = userCouponMapper.selectOne(Wrappers.lambdaQuery(UserCouponDO.class).eq(UserCouponDO::getUserId,Long.parseLong(UserContext.getUserId())).eq(UserCouponDO::getId, requestParam.getCouponId()));String userCouponItemCacheKey =StrUtil.builder().append(userCouponDO.getCouponTemplateId()).append("_").append(userCouponDO.getId()).toString();
stringRedisTemplate.opsForZSet().add(String.format(USER_COUPON_TEMPLATE_LIST_KEY,UserContext.getUserId()), userCouponItemCacheKey, userCouponDO.getReceiveTime().getTime());
文末总结
本章节重点实现了优惠券在不同业务场景下的状态管理,包括锁定、核销和退还功能。通过引入分布式锁(如 Redis 的 Redisson)和事务控制,确保了优惠券状态变更的原子性和一致性,防止了并发操作可能导致的状态冲突。
完结,撒花 🎉
Comments NOTHING