从零开始实现秒杀系统(三):RocketMQ消息队列篇

GPT摘要

本文是秒杀系统优化系列的第三篇,介绍如何利用RocketMQ消息队列解决V2版本遗留的问题。V2版本通过Redis预减库存和内存队列提升了性能,但仍存在服务可靠性差(宕机导致内存队列任务丢失)、数据一致性风险(Redis与数据库操作分离)、单机瓶颈(内存队列无法水平扩展)以及数据库行锁影响库存扣减性能等问题。 V3架构的核心优化是引入RocketMQ消息队列,关键改进包括:保留Redis预减库存机制但改为同步创建订单(不涉及行锁),将高争用的库存扣减操作异步化,通过事务消息确保操作执行可靠性,并实现三种消息消费模式(至少一次、至多一次、恰好一次)。主要工作流程为:先进行Redis库存校验和预减库存,发送半消息到RocketMQ后同步创建订单并确认消息,最终返回秒杀成功。消费者根据配置选择不同消费模式处理库存扣减。 技术实现上重点采用RocketMQ事务消息的两阶段提交机制:先发送半消息,执行本地订单创建事务,根据结果确认或回滚消息,并支持事务状态回查。文章详细展示了事务消息的生产者实现代码和事务监听器代码,后者包含本地事务执行和状态回查逻辑。针对不同业务场景,提供了三种库存扣减实现:简单但可能重复的至少一次模式、基于Redis幂等的至多一次模式,以及通过数据库事务和幂等表确保的恰好一次模式。 优化后的系统具备高可靠性(消息不丢失)、解耦性(订单与库存操作分离)、可扩展性(消费者水平扩展)和削峰填谷能力,同时通过事务消息和幂等设计保障数据一致性。文章也指出后续优化方向,包括限流保护、库存对账和监控告警体系建设。该系列完整展现了从基础MySQL实现到集成Redis缓存,最终引入消息队列的渐进式优化过程。

从零开始实现秒杀系统(三):RocketMQ消息队列篇

引言

在前两篇文章中,我们先后探讨了基于MySQL的秒杀系统实现(V1)和基于Redis优化的秒杀系统(V2)。虽然V2版本通过Redis预减库存和内存队列显著提升了系统性能,但我们仍面临着几个关键挑战:服务可靠性、数据一致性、系统扩展性以及高并发下的内存压力问题。

本文作为系列的第三篇,将介绍如何利用RocketMQ消息队列进一步优化秒杀系统,解决V2版本中的剩余问题,实现一个更可靠、更具扩展性的高性能秒杀系统。

V3版本架构改进

V2架构的问题回顾

V2版本虽然性能显著提升,但存在以下问题:

  1. 服务可靠性问题:内存队列中的任务在服务宕机时会丢失
  2. 数据一致性问题:Redis减库存与数据库操作的分离增加了不一致风险
  3. 单机瓶颈:内存队列在单机运行,难以水平扩展
  4. 库存扣减性能:库存扣减仍在数据库事务内,受行锁影响

V3架构设计

V3架构引入了RocketMQ消息队列,主要优化点如下:

  1. Redis预减库存 + 同步订单创建:利用订单创建操作不涉及行锁的特点,保留同步创建
  2. 异步库存扣减:将高争用的库存扣减操作异步化,减轻数据库压力
  3. 事务消息保证:通过RocketMQ事务消息确保库存扣减操作一定执行
  4. 三种消息消费模式:实现不同级别的消息处理保证

核心流程设计

v3整体架构图

image-20250307135544501

秒杀接口执行流程

1
2
3
4
5
6
1. 库存校验(Redis快速失败)
2. Redis预减库存
3. 发送半消息到RocketMQ
4. 本地事务同步创建订单(不涉及行锁,速度较快)
5. 确认消息发送
6. 返回秒杀成功

基于此,工作流程为:

image-20250307135055826

异步库存扣减流程

1
2
3
4
1. RocketMQ消费者接收库存扣减消息
2. 根据配置选择消费模式(至少一次/至多一次/恰好一次)
3. 执行库存扣减操作
4. 处理结果记录(根据消费模式)

事务消息保证异步库存扣减的可靠性

在V3版本中,我们引入了RocketMQ的事务消息机制,确保库存扣减消息能够可靠地执行。这是一个关键的改进,解决了V2版本中数据一致性的问题。

什么是RocketMQ事务消息?

RocketMQ事务消息是一种分布式事务解决方案,它通过两阶段提交(2PC)的机制确保消息发送和本地事务的原子性。简单来说,事务消息可以保证:

  1. 消息一定会被发送且被消费,或者
  2. 消息和本地事务都不会执行

事务消息工作流程

在秒杀系统中,事务消息的工作流程如下:

  1. 发送半消息:系统先向RocketMQ发送一条”半消息”,此时消息不会被消费
  2. 执行本地事务:系统执行本地事务(创建订单)
  3. 提交或回滚:根据本地事务执行结果,向RocketMQ发送确认或回滚指令
  4. 事务状态回查:如果因网络等原因未收到确认指令,RocketMQ会回查事务状态
  5. 消息消费:确认后,消息才能被消费者消费

事务消息

实现代码示例

半消息发送代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
public class MQProducer {

/**
* Send transaction message for stock reduction
* This ensures the inventory is reduced only if the order is successfully created
*/
public String sendStockReductionTransactionMessage(Long userId, GoodsVo goodsVo) throws JsonProcessingException {
String transactionId = UUID.randomUUID().toString();

// Message = what downstream services need
// Args = what local transaction needs

// Prepare stock reduction message data
Map<String, Object> payload = new HashMap<>();
payload.put("goodsId", goodsVo.getId());
payload.put("transactionId", transactionId);


// Create the message with transaction ID in headers
Message<String> message = MessageBuilder.withPayload(objectMapper.writeValueAsString(payload))
.setHeader("transactionId", transactionId)
.build();

// Prepare transaction arguments to be used in the local transaction execution
Map<String, Object> transactionArgs = new HashMap<>();
transactionArgs.put("userId", userId);
transactionArgs.put("goodsVo", goodsVo);
transactionArgs.put("transactionId", transactionId);


log.info("begin Transaction message sent for order creation with txId: {}", transactionId);

// Store transaction start time in Redis with expiration
redisService.set(SeckillKey.txStartTime, transactionId, System.currentTimeMillis());

// Send transactional message
rocketMQTemplate.sendMessageInTransaction(
TOPIC_STOCK_REDUCTION,
message,
transactionArgs);

log.info("Transaction message sent for order creation with txId: {}", transactionId);

return transactionId;
}
}



private void saveTransactionRecord(String transactionId, boolean success) {
TransactionRecord record = new TransactionRecord();
record.setTransactionId(transactionId);
record.setSuccess(success);
record.setCreateTime(new Date());
transactionRecordDao.insert(record);
}
}

核心事务监听器实现:包含执行本地事务(在半消息发送后调用)以及 回查本地事务状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Component
public class StockReductionTransactionListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderService orderService;

@Autowired
private TransactionRecordDao transactionRecordDao;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
log.info("Executing local transaction for message: {}", msg);

// Parse the message body and arguments
@SuppressWarnings("unchecked")
Map<String, Object> params = (Map<String, Object>) arg;
Long userId = (Long) params.get("userId");
GoodsVo goodsVo = (GoodsVo) params.get("goodsVo");
String transactionId = (String) params.get("transactionId");

// Execute local transaction - create order in database with transaction ID
boolean success = createOrderInDB(userId, goodsVo, transactionId);

// Record transaction result
RocketMQLocalTransactionState state = success ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;

localTransactionMap.put(transactionId, state);

log.info("Local transaction executed with result: {}, txId: {}", state, transactionId);
return state;

} catch (Exception e) {
log.error("Error executing local transaction", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// Extract transaction id from message headers
String transactionId = (String) msg.getHeaders().get("transactionId");
log.info("Checking transaction status for txId: {}", transactionId);

// Get cached transaction state (for performance)
RocketMQLocalTransactionState state = localTransactionMap.get(transactionId);

if (state != null) {
log.info("Found transaction state in cache: {}", state);
return state;
}

// If not found in memory (e.g., after service restart), query database
log.info("Transaction state not in cache, checking database...");

// 1. Check if order exists with this transaction ID
SeckillOrder order = orderService.getOrderByTransactionId(transactionId);

if (order != null) {
// Order exists, transaction was successful
log.info("Transaction found in database, order exists: {}", order.getId());
localTransactionMap.put(transactionId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.COMMIT;
}

// 2. Check if transaction is still within valid time window
if (NEED_CHECK_TIMEOUT && isTransactionExpired(transactionId)) {
log.info("Transaction considered failed: timeout reached");
localTransactionMap.put(transactionId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
}

// 3. Still within processing window, return UNKNOWN to trigger retry
log.info("Transaction status still unknown, will retry check later");
return RocketMQLocalTransactionState.UNKNOWN;

} catch (Exception e) {
log.error("Error checking local transaction status", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}

在秒杀场景中使用事务消息具有以下优势:

  1. 数据一致性保证:确保订单创建和库存扣减消息发送的原子性

  2. 异步解耦:将高争用的库存扣减操作异步化,提高系统吞吐量

  3. 可靠性:即使系统崩溃或网络故障,消息也不会丢失

三种消息消费保证模式详解

消费者用于消费库存扣减的消息,本文共实现3中不同的扣减方式,总体流程如下所示:image-20250307134959200

1. 至少一次处理 (At-Least-Once)

最简单的实现,默认的处理方式,但可能导致重复处理:

1
2
3
4
public boolean reduceStockAtLeastOnce(Long goodsId){
// The message ack might be lost, leading to repeated deductions
return goodsService.reduceStockWhenLeft(goodsId);
}

这种模式下,如果消息处理成功但ACK失败,消息会被重新投递,导致重复扣减。适用于对数据一致性要求不高的场景。

2. 至多一次处理 (At-Most-Once)

使用Redis进行幂等性检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean reduceStockAtMostOnce(Long goodsId, String transactionId){
String idempotentKey = "stock:reduction:" + transactionId;

// Use Redis SETNX to implement idempotence check. If SETNX fails due to a crash,
// there will be no chance to deduct the stock again, so it is at most once deduction.
boolean isFirstProcess = redisService.setIfNotExists(idempotentKey, "PROCESSED", MESSAGE_RECORD_EXPIRE_TIME);

// If already processed, return directly
if (!isFirstProcess) {
log.info("Message already processed, transactionId: {}", transactionId);
return false;
}

// Execute stock reduction logic
return goodsService.reduceStockWhenLeft(goodsId);
}

这种模式通过Redis的setnx实现幂等性检查,避免重复处理。但如果在Redis设置成功后、扣减库存前发生故障,消息将不会重新处理,可能导致库存不一致。

3. 恰好一次处理 (Exactly-Once)

通过数据库事务和幂等性表实现最严格的一致性保证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean processStockReductionWithIdempotence(Long goodsId, String transactionId){
// 1. Check if the message has been processed
IdempotenceRecord existingRecord = idempotenceRecordDao.findByTransactionId(transactionId);

// 2. If the message was already processed, return false
if (existingRecord != null) {
log.info("Message already processed, transactionId: {}", transactionId);
return false;
}
// 3. Execute stock reduction and record the message in the same transaction
try {
StockReductionConsumer proxy = applicationContext.getBean(StockReductionConsumer.class);
return proxy.processWithTransaction(goodsId, transactionId);
} catch (org.springframework.dao.DataIntegrityViolationException e) {
// Catch the unique index violation exception, indicating concurrent processing record already exists
log.info("Duplicate processing detected, transactionId: {}", transactionId);
return false;
}
}

@Transactional
public boolean processWithTransaction(Long goodsId, String transactionId) {
// 1. Execute business logic - reduce inventory
boolean success = goodsService.reduceStockWhenLeft(goodsId);

// 2. Record the processing result regardless of success or failure.
// The database unique index ensures that when multiple calls enter this function simultaneously, only one will succeed and others will roll back.
// alse you can try to use distributedLock without unique index
IdempotenceRecord record = new IdempotenceRecord();
record.setTransactionId(transactionId);
record.setProcessed(success);
record.setCreateTime(new Date());
idempotenceRecordDao.insertRecord(record);

return success;
}

这种模式通过数据库事务将业务操作和幂等性记录绑定在一起,确保消息恰好被处理一次。但性能略微低于其他方法

V3架构的优势

  1. 高可靠性:消息队列保证消息不丢失,服务重启也可恢复处理
  2. 系统解耦:订单创建与库存扣减解耦,减少相互影响
  3. 水平扩展:消费者可以水平扩展,提高处理能力
  4. 削峰填谷:消息队列能够缓冲突发流量,保护后端系统
  5. 数据一致性:通过事务消息和幂等性设计确保数据一致性

挑战与解决方案

1. 消息积压问题

挑战:高并发下可能导致消息积压
解决方案

  • 消费者集群水平扩展
  • 消费者线程池优化提高消费速度
  • 消息批量处理减少数据库交互次数

2. 系统复杂性增加

挑战:引入消息队列增加了系统复杂性和运维难度
解决方案

  • 完善监控和告警机制
  • 构建消息队列健康检查系统

总结与展望

V3版本通过引入RocketMQ消息队列,成功解决了V2版本中存在的服务可靠性和水平扩展问题。通过将高争用的库存扣减操作异步化,结合事务消息和三种消息消费保证模式,我们实现了一个高性能、高可靠、可扩展的秒杀系统。在实际生产环境中,可以根据业务需求和一致性要求选择适合的消息消费模式。

在这三篇文章中,我们从零开始,逐步构建了一个高性能、高可用的秒杀系统:

  • V1版本:基于MySQL的基础秒杀实现
  • V2版本:引入Redis缓存优化,实现库存预减和异步下单
  • V3版本:整合RocketMQ消息队列,解决可靠性和扩展性问题

然而,在实际生产环境中,秒杀系统仍面临一些挑战,我们将在下一篇文章中进一步优化系统:

在下一篇文章中,我们将继续完善秒杀系统,重点关注以下方面:

  1. 限流保护:令牌桶限流方案确保系统可用性
  2. 库存对账:解决Redis与数据库之间可能存在的库存不一致问题
  3. 监控告警:构建完善的监控体系,实现问题的早发现早解决

本文源码已开源在GitHub:Goinggoinggoing/seckill

如有疑问或建议,欢迎在评论区讨论!