/** * Send transaction message for stock reduction * This ensures the inventory is reduced only if the order is successfully created */ public String sendStockReductionTransactionMessage(Long userId, GoodsVo goodsVo)throws JsonProcessingException { StringtransactionId= UUID.randomUUID().toString();
// Message = what downstream services need // Args = what local transaction needs
// Create the message with transaction ID in headers Message<String> message = MessageBuilder.withPayload(objectMapper.writeValueAsString(payload)) .setHeader("transactionId", transactionId) .build();
// Prepare transaction arguments to be used in the local transaction execution Map<String, Object> transactionArgs = newHashMap<>(); transactionArgs.put("userId", userId); transactionArgs.put("goodsVo", goodsVo); transactionArgs.put("transactionId", transactionId);
log.info("begin Transaction message sent for order creation with txId: {}", transactionId);
// Store transaction start time in Redis with expiration redisService.set(SeckillKey.txStartTime, transactionId, System.currentTimeMillis());
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { log.info("Executing local transaction for message: {}", msg);
// Parse the message body and arguments @SuppressWarnings("unchecked") Map<String, Object> params = (Map<String, Object>) arg; LonguserId= (Long) params.get("userId"); GoodsVogoodsVo= (GoodsVo) params.get("goodsVo"); StringtransactionId= (String) params.get("transactionId");
// Execute local transaction - create order in database with transaction ID booleansuccess= createOrderInDB(userId, goodsVo, transactionId);
// Record transaction result RocketMQLocalTransactionStatestate= success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { try { // Extract transaction id from message headers StringtransactionId= (String) msg.getHeaders().get("transactionId"); log.info("Checking transaction status for txId: {}", transactionId);
// Get cached transaction state (for performance) RocketMQLocalTransactionStatestate= localTransactionMap.get(transactionId);
if (state != null) { log.info("Found transaction state in cache: {}", state); return state; }
// If not found in memory (e.g., after service restart), query database log.info("Transaction state not in cache, checking database...");
// 1. Check if order exists with this transaction ID SeckillOrderorder= orderService.getOrderByTransactionId(transactionId);
if (order != null) { // Order exists, transaction was successful log.info("Transaction found in database, order exists: {}", order.getId()); localTransactionMap.put(transactionId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT; }
// 2. Check if transaction is still within valid time window if (NEED_CHECK_TIMEOUT && isTransactionExpired(transactionId)) { log.info("Transaction considered failed: timeout reached"); localTransactionMap.put(transactionId, RocketMQLocalTransactionState.ROLLBACK); return RocketMQLocalTransactionState.ROLLBACK; }
// 3. Still within processing window, return UNKNOWN to trigger retry log.info("Transaction status still unknown, will retry check later"); return RocketMQLocalTransactionState.UNKNOWN;
publicbooleanreduceStockAtLeastOnce(Long goodsId){ // The message ack might be lost, leading to repeated deductions return goodsService.reduceStockWhenLeft(goodsId); }
// Use Redis SETNX to implement idempotence check. If SETNX fails due to a crash, // there will be no chance to deduct the stock again, so it is at most once deduction. booleanisFirstProcess= redisService.setIfNotExists(idempotentKey, "PROCESSED", MESSAGE_RECORD_EXPIRE_TIME);
// If already processed, return directly if (!isFirstProcess) { log.info("Message already processed, transactionId: {}", transactionId); returnfalse; }
publicbooleanprocessStockReductionWithIdempotence(Long goodsId, String transactionId){ // 1. Check if the message has been processed IdempotenceRecordexistingRecord= idempotenceRecordDao.findByTransactionId(transactionId);
// 2. If the message was already processed, return false if (existingRecord != null) { log.info("Message already processed, transactionId: {}", transactionId); returnfalse; } // 3. Execute stock reduction and record the message in the same transaction try { StockReductionConsumerproxy= applicationContext.getBean(StockReductionConsumer.class); return proxy.processWithTransaction(goodsId, transactionId); } catch (org.springframework.dao.DataIntegrityViolationException e) { // Catch the unique index violation exception, indicating concurrent processing record already exists log.info("Duplicate processing detected, transactionId: {}", transactionId); returnfalse; } }
// 2. Record the processing result regardless of success or failure. // The database unique index ensures that when multiple calls enter this function simultaneously, only one will succeed and others will roll back. // alse you can try to use distributedLock without unique index IdempotenceRecordrecord=newIdempotenceRecord(); record.setTransactionId(transactionId); record.setProcessed(success); record.setCreateTime(newDate()); idempotenceRecordDao.insertRecord(record);