RocketMQ分布式消息系统详解
Apache RocketMQ是阿里巴巴开源的分布式消息中间件,经过双11等大规模场景验证,具有高性能、高可靠性、高实时性的分布式特性。RocketMQ在事务消息、顺序消息、延迟消息等方面有着独特优势,是金融级分布式消息解决方案的首选。
核心价值
RocketMQ = 金融级可靠性 + 万亿级消息堆积 + 毫秒级延迟 + 分布式事务
- 💰 金融级可靠性:99.996%超高可用性,支持分布式事务
- 📈 万亿级堆积:支持万亿级消息堆积,不影响性能
- ⚡ 毫秒级延迟:端到端延迟在毫秒级别
- 🔄 分布式事务:完整的事务消息解决方案
- 📊 顺序消息:支持全局和分区顺序消息
- ⏰ 延迟消息:支持18个延迟等级的定时消息
1. RocketMQ核心架构与设计理念
1.1 分布式架构模型
RocketMQ采用分布式集群架构,通过多个组件协同工作,提供高性能、高可靠的消息服务。
核心组件详解
| 组件 | 作用 | 特点 | 部署建议 |
|---|---|---|---|
| NameServer | 路由注册中心 | 无状态、轻量级 | 集群部署,奇数个节点 |
| Broker | 消息存储转发 | 支持主从、高可用 | 主从部署,数据同步 |
| Producer | 消息生产者 | 负载均衡、故障转移 | 集成到业务应用 |
| Consumer | 消息消费者 | 推拉模式、集群消费 | 独立部署或集成 |
| RocketMQ应用场景对比 |
| 应用场景 | 传统方案 | RocketMQ方案 | 核心优势 | 适用规模 |
|---|---|---|---|---|
| 分布式事务 | 2PC/3PC | 事务消息 | 最终一致性、高性能 | 金融支付 |
| 顺序处理 | 单线程处理 | 顺序消息 | 高并发、保序 | 订单状态流转 |
| 延迟任务 | 定时任务 | 延迟消息 | 精确延迟、高可靠 | 定时提醒 |
| 消息堆积 | 数据库存储 | 磁盘存储 | 万亿级堆积能力 | 大数据场景 |
| 广播通知 | 推送服务 | 广播消费 | 一对多、实时性 | 配置更新 |
2. 事务消息深度解析
2.1 分布式事务原理
RocketMQ的事务消息基于两阶段提交协议,通过半消息机制确保本地事务与消息发送的最终一致性。
- 事务流程
- Java实现
- 应用场景
事务消息特点:
- 两阶段提交:先发送半消息,再根据本地事务结果决定提交或回滚
- 事务回查:支持事务状态回查机制,确保最终一致性
- 高可靠性:即使在网络异常情况下也能保证事务的最终一致性
事务消息完整实现
java
1@Component2public class TransactionMessageService {3 4 private TransactionMQProducer transactionProducer;5 6 @PostConstruct7 public void init() throws MQClientException {8 // 创建事务生产者9 transactionProducer = new TransactionMQProducer("transaction_producer_group");10 transactionProducer.setNamesrvAddr("localhost:9876");11 12 // 设置事务监听器13 transactionProducer.setTransactionListener(new OrderTransactionListener());14 15 // 设置线程池16 ExecutorService executorService = new ThreadPoolExecutor(17 2, 5, 100, TimeUnit.SECONDS,18 new ArrayBlockingQueue<>(2000),19 r -> {20 Thread thread = new Thread(r);21 thread.setName("client-transaction-msg-check-thread");22 return thread;23 }24 );25 transactionProducer.setExecutorService(executorService);26 27 transactionProducer.start();28 log.info("事务生产者启动成功");29 }30 31 /**32 * 发送事务消息33 */34 public void sendTransactionMessage(OrderCreateEvent event) {35 try {36 Message message = new Message(37 "order_transaction_topic",38 "order_create",39 event.getOrderId(),40 JSON.toJSONBytes(event)41 );42 43 // 发送事务消息,传递本地事务参数44 TransactionSendResult result = transactionProducer.sendMessageInTransaction(45 message, event46 );47 48 log.info("事务消息发送结果: {}, 事务状态: {}", 49 result.getSendStatus(), result.getLocalTransactionState());50 51 } catch (MQClientException e) {52 log.error("发送事务消息失败", e);53 throw new BusinessException("事务消息发送失败", e);54 }55 }56 57 /**58 * 事务监听器实现59 */60 @Component61 public static class OrderTransactionListener implements TransactionListener {62 63 @Autowired64 private OrderService orderService;65 66 @Autowired67 private PaymentService paymentService;68 69 /**70 * 执行本地事务71 */72 @Override73 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {74 OrderCreateEvent event = (OrderCreateEvent) arg;75 String orderId = event.getOrderId();76 77 log.info("开始执行本地事务,订单ID: {}", orderId);78 79 try {80 // 1. 创建订单81 Order order = orderService.createOrder(event);82 83 // 2. 扣减库存84 boolean stockReduced = orderService.reduceStock(85 event.getProductId(), event.getQuantity()86 );87 88 if (!stockReduced) {89 log.warn("库存不足,订单创建失败: {}", orderId);90 return LocalTransactionState.ROLLBACK_MESSAGE;91 }92 93 // 3. 预扣费用94 boolean paymentReserved = paymentService.reservePayment(95 event.getUserId(), event.getAmount()96 );97 98 if (!paymentReserved) {99 log.warn("余额不足,订单创建失败: {}", orderId);100 // 回滚库存101 orderService.rollbackStock(event.getProductId(), event.getQuantity());102 return LocalTransactionState.ROLLBACK_MESSAGE;103 }104 105 log.info("本地事务执行成功,订单ID: {}", orderId);106 return LocalTransactionState.COMMIT_MESSAGE;107 108 } catch (Exception e) {109 log.error("本地事务执行异常,订单ID: {}", orderId, e);110 return LocalTransactionState.UNKNOW;111 }112 }113 114 /**115 * 事务状态回查116 */117 @Override118 public LocalTransactionState checkLocalTransaction(MessageExt msg) {119 String orderId = msg.getKeys();120 log.info("回查本地事务状态,订单ID: {}", orderId);121 122 try {123 // 查询订单状态124 Order order = orderService.getOrderById(orderId);125 126 if (order == null) {127 log.info("订单不存在,回滚消息: {}", orderId);128 return LocalTransactionState.ROLLBACK_MESSAGE;129 }130 131 switch (order.getStatus()) {132 case CREATED:133 case PAID:134 log.info("订单状态正常,提交消息: {}", orderId);135 return LocalTransactionState.COMMIT_MESSAGE;136 137 case CANCELLED:138 case FAILED:139 log.info("订单已取消或失败,回滚消息: {}", orderId);140 return LocalTransactionState.ROLLBACK_MESSAGE;141 142 default:143 log.info("订单状态未知,等待下次回查: {}", orderId);144 return LocalTransactionState.UNKNOW;145 }146 147 } catch (Exception e) {148 log.error("回查事务状态异常,订单ID: {}", orderId, e);149 return LocalTransactionState.UNKNOW;150 }151 }152 }153 154 @PreDestroy155 public void destroy() {156 if (transactionProducer != null) {157 transactionProducer.shutdown();158 log.info("事务生产者关闭成功");159 }160 }161}162163/**164 * 事务消息消费者165 */166@Component167@RocketMQMessageListener(168 topic = "order_transaction_topic",169 consumerGroup = "order_transaction_consumer_group",170 messageModel = MessageModel.CLUSTERING171)172public class TransactionMessageConsumer implements RocketMQListener<OrderCreateEvent> {173 174 @Autowired175 private NotificationService notificationService;176 177 @Autowired178 private LogisticsService logisticsService;179 180 @Override181 public void onMessage(OrderCreateEvent event) {182 String orderId = event.getOrderId();183 log.info("接收到订单创建事务消息: {}", orderId);184 185 try {186 // 1. 发送订单确认通知187 notificationService.sendOrderConfirmation(event);188 189 // 2. 创建物流订单190 logisticsService.createShippingOrder(event);191 192 // 3. 更新订单状态193 orderService.updateOrderStatus(orderId, OrderStatus.CONFIRMED);194 195 log.info("订单创建事务消息处理完成: {}", orderId);196 197 } catch (Exception e) {198 log.error("处理订单创建事务消息失败: {}", orderId, e);199 throw new RuntimeException("消息处理失败", e);200 }201 }202}事务消息典型应用场景:
- 电商订单系统:订单创建与库存扣减的一致性
- 支付系统:支付成功与账户变更的一致性
- 积分系统:消费行为与积分增加的一致性
- 物流系统:订单确认与物流创建的一致性
事务消息应用场景
java
1/**2 * 1. 电商下单场景3 */4public class EcommerceOrderScenario {5 6 public void processOrder(OrderRequest request) {7 // 发送事务消息8 OrderCreateEvent event = new OrderCreateEvent(9 request.getOrderId(),10 request.getUserId(), 11 request.getProductId(),12 request.getQuantity(),13 request.getAmount()14 );15 16 transactionMessageService.sendTransactionMessage(event);17 18 // 本地事务将在TransactionListener中执行:19 // 1. 创建订单记录20 // 2. 扣减商品库存21 // 3. 预扣用户余额22 // 4. 根据执行结果决定消息的提交或回滚23 }24}2526/**27 * 2. 支付成功场景28 */29public class PaymentSuccessScenario {30 31 public void handlePaymentSuccess(PaymentEvent event) {32 // 发送事务消息33 PaymentSuccessEvent successEvent = new PaymentSuccessEvent(34 event.getPaymentId(),35 event.getOrderId(),36 event.getAmount(),37 event.getPaymentMethod()38 );39 40 transactionMessageService.sendTransactionMessage(successEvent);41 42 // 本地事务:43 // 1. 更新支付状态44 // 2. 更新订单状态45 // 3. 增加用户积分46 // 4. 创建发票记录47 }48}2.2 事务消息最佳实践
- 最佳实践
- 监控告警
事务消息设计原则:
- 幂等性设计:确保消息处理和本地事务都是幂等的
- 状态可查询:本地事务状态必须可以通过业务主键查询
- 超时处理:设置合理的事务超时时间
- 异常处理:区分业务异常和系统异常
事务消息最佳实践
java
1@Component2public class TransactionBestPractices {3 4 /**5 * 1. 幂等性设计6 */7 @Transactional8 public boolean createOrderIdempotent(OrderCreateEvent event) {9 String orderId = event.getOrderId();10 11 // 检查订单是否已存在(幂等性检查)12 Order existingOrder = orderRepository.findByOrderId(orderId);13 if (existingOrder != null) {14 log.info("订单已存在,跳过创建: {}", orderId);15 return true;16 }17 18 // 创建订单19 Order order = new Order();20 order.setOrderId(orderId);21 order.setUserId(event.getUserId());22 order.setAmount(event.getAmount());23 order.setStatus(OrderStatus.CREATED);24 order.setCreateTime(new Date());25 26 orderRepository.save(order);27 log.info("订单创建成功: {}", orderId);28 return true;29 }30 31 /**32 * 2. 状态可查询设计33 */34 public LocalTransactionState checkTransactionState(String orderId) {35 try {36 Order order = orderRepository.findByOrderId(orderId);37 38 if (order == null) {39 // 订单不存在,可能是创建失败40 return LocalTransactionState.ROLLBACK_MESSAGE;41 }42 43 // 根据订单状态判断事务状态44 switch (order.getStatus()) {45 case CREATED:46 case CONFIRMED:47 return LocalTransactionState.COMMIT_MESSAGE;48 case CANCELLED:49 case FAILED:50 return LocalTransactionState.ROLLBACK_MESSAGE;51 default:52 return LocalTransactionState.UNKNOW;53 }54 55 } catch (Exception e) {56 log.error("查询事务状态异常: {}", orderId, e);57 return LocalTransactionState.UNKNOW;58 }59 }60 61 /**62 * 3. 超时和重试配置63 */64 @Bean65 public TransactionMQProducer transactionProducer() {66 TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");67 producer.setNamesrvAddr("localhost:9876");68 69 // 设置事务超时时间(默认6秒)70 producer.setTransactionTimeOut(10000);71 72 // 设置回查间隔(默认60秒)73 producer.setCheckThreadPoolMinSize(2);74 producer.setCheckThreadPoolMaxSize(5);75 producer.setCheckRequestHoldMax(2000);76 77 return producer;78 }79}事务消息监控
java
1@Component2public class TransactionMessageMonitor {3 4 private final MeterRegistry meterRegistry;5 private final Counter transactionSuccessCounter;6 private final Counter transactionFailureCounter;7 private final Timer transactionTimer;8 9 public TransactionMessageMonitor(MeterRegistry meterRegistry) {10 this.meterRegistry = meterRegistry;11 this.transactionSuccessCounter = Counter.builder("transaction.message.success")12 .description("事务消息成功数量")13 .register(meterRegistry);14 this.transactionFailureCounter = Counter.builder("transaction.message.failure")15 .description("事务消息失败数量")16 .register(meterRegistry);17 this.transactionTimer = Timer.builder("transaction.message.duration")18 .description("事务消息处理时长")19 .register(meterRegistry);20 }21 22 /**23 * 记录事务消息指标24 */25 public void recordTransactionMetrics(String orderId, boolean success, long duration) {26 if (success) {27 transactionSuccessCounter.increment();28 } else {29 transactionFailureCounter.increment();30 }31 32 transactionTimer.record(duration, TimeUnit.MILLISECONDS);33 34 log.info("事务消息指标记录 - 订单: {}, 成功: {}, 耗时: {}ms", 35 orderId, success, duration);36 }37 38 /**39 * 检查事务消息健康状态40 */41 @Scheduled(fixedRate = 60000) // 每分钟检查一次42 public void checkTransactionHealth() {43 double successRate = calculateSuccessRate();44 45 if (successRate < 0.95) { // 成功率低于95%告警46 sendAlert("事务消息成功率告警", 47 String.format("当前成功率: %.2f%%", successRate * 100));48 }49 50 // 检查长时间未回查的事务51 checkLongPendingTransactions();52 }53 54 private double calculateSuccessRate() {55 double successCount = transactionSuccessCounter.count();56 double failureCount = transactionFailureCounter.count();57 double totalCount = successCount + failureCount;58 59 return totalCount > 0 ? successCount / totalCount : 1.0;60 }61 62 private void checkLongPendingTransactions() {63 // 查询长时间未确认的事务消息64 // 实现具体的检查逻辑65 }66 67 private void sendAlert(String title, String message) {68 log.error("告警: {} - {}", title, message);69 // 发送告警通知到监控系统70 }71}3. 顺序消息与延迟消息
3.1 顺序消息实现
RocketMQ支持两种顺序消息:全局顺序和分区顺序,通过队列选择器确保消息的有序性。
- 顺序原理
- Java实现
顺序消息特点:
- 分区顺序:同一分区内消息严格有序
- 全局顺序:所有消息全局有序(性能较低)
- 队列选择:通过MessageQueueSelector选择队列
顺序消息完整实现
java
1@Component2public class OrderedMessageService {3 4 private DefaultMQProducer orderedProducer;5 6 @PostConstruct7 public void init() throws MQClientException {8 orderedProducer = new DefaultMQProducer("ordered_producer_group");9 orderedProducer.setNamesrvAddr("localhost:9876");10 orderedProducer.start();11 log.info("顺序消息生产者启动成功");12 }13 14 /**15 * 发送顺序消息16 */17 public void sendOrderedMessage(OrderStatusEvent event) {18 try {19 Message message = new Message(20 "order_status_topic",21 event.getEventType(),22 event.getOrderId(), // 使用orderId作为key23 JSON.toJSONBytes(event)24 );25 26 // 发送顺序消息,使用订单ID作为队列选择因子27 SendResult result = orderedProducer.send(28 message,29 new MessageQueueSelector() {30 @Override31 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {32 String orderId = (String) arg;33 // 根据订单ID选择队列,确保同一订单的消息发送到同一队列34 int index = Math.abs(orderId.hashCode()) % mqs.size();35 return mqs.get(index);36 }37 },38 event.getOrderId() // 队列选择参数39 );40 41 log.info("顺序消息发送成功 - 订单: {}, 事件: {}, 队列: {}", 42 event.getOrderId(), event.getEventType(), result.getMessageQueue().getQueueId());43 44 } catch (Exception e) {45 log.error("发送顺序消息失败", e);46 throw new BusinessException("顺序消息发送失败", e);47 }48 }49 50 /**51 * 批量发送订单状态变更消息52 */53 public void sendOrderStatusFlow(String orderId) {54 // 模拟订单状态流转:创建 -> 支付 -> 发货 -> 完成55 List<OrderStatusEvent> events = Arrays.asList(56 new OrderStatusEvent(orderId, "ORDER_CREATED", "订单创建"),57 new OrderStatusEvent(orderId, "ORDER_PAID", "订单支付"),58 new OrderStatusEvent(orderId, "ORDER_SHIPPED", "订单发货"),59 new OrderStatusEvent(orderId, "ORDER_COMPLETED", "订单完成")60 );61 62 // 按顺序发送消息63 for (OrderStatusEvent event : events) {64 sendOrderedMessage(event);65 66 // 模拟业务处理间隔67 try {68 Thread.sleep(100);69 } catch (InterruptedException e) {70 Thread.currentThread().interrupt();71 }72 }73 }74 75 @PreDestroy76 public void destroy() {77 if (orderedProducer != null) {78 orderedProducer.shutdown();79 log.info("顺序消息生产者关闭成功");80 }81 }82}8384/**85 * 顺序消息消费者86 */87@Component88@RocketMQMessageListener(89 topic = "order_status_topic",90 consumerGroup = "order_status_consumer_group",91 messageModel = MessageModel.CLUSTERING,92 consumeMode = ConsumeMode.ORDERLY // 顺序消费93)94public class OrderedMessageConsumer implements RocketMQListener<OrderStatusEvent> {95 96 @Autowired97 private OrderService orderService;98 99 @Override100 public void onMessage(OrderStatusEvent event) {101 String orderId = event.getOrderId();102 String eventType = event.getEventType();103 104 log.info("接收到顺序消息 - 订单: {}, 事件: {}", orderId, eventType);105 106 try {107 // 根据事件类型处理业务逻辑108 switch (eventType) {109 case "ORDER_CREATED":110 handleOrderCreated(event);111 break;112 case "ORDER_PAID":113 handleOrderPaid(event);114 break;115 case "ORDER_SHIPPED":116 handleOrderShipped(event);117 break;118 case "ORDER_COMPLETED":119 handleOrderCompleted(event);120 break;121 default:122 log.warn("未知的订单事件类型: {}", eventType);123 }124 125 log.info("顺序消息处理完成 - 订单: {}, 事件: {}", orderId, eventType);126 127 } catch (Exception e) {128 log.error("处理顺序消息失败 - 订单: {}, 事件: {}", orderId, eventType, e);129 throw new RuntimeException("顺序消息处理失败", e);130 }131 }132 133 private void handleOrderCreated(OrderStatusEvent event) {134 orderService.updateOrderStatus(event.getOrderId(), OrderStatus.CREATED);135 log.info("订单创建处理完成: {}", event.getOrderId());136 }137 138 private void handleOrderPaid(OrderStatusEvent event) {139 orderService.updateOrderStatus(event.getOrderId(), OrderStatus.PAID);140 log.info("订单支付处理完成: {}", event.getOrderId());141 }142 143 private void handleOrderShipped(OrderStatusEvent event) {144 orderService.updateOrderStatus(event.getOrderId(), OrderStatus.SHIPPED);145 log.info("订单发货处理完成: {}", event.getOrderId());146 }147 148 private void handleOrderCompleted(OrderStatusEvent event) {149 orderService.updateOrderStatus(event.getOrderId(), OrderStatus.COMPLETED);150 log.info("订单完成处理完成: {}", event.getOrderId());151 }152}3.2 延迟消息实现
RocketMQ支持18个延迟等级的定时消息,适用于延迟处理场景。
- 延迟等级
- Java实现
RocketMQ延迟等级
java
1public class DelayLevels {2 3 /**4 * RocketMQ支持的18个延迟等级5 * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h6 */7 public static final Map<Integer, String> DELAY_LEVELS = Map.of(8 1, "1s", 2, "5s", 3, "10s", 4, "30s",9 5, "1m", 6, "2m", 7, "3m", 8, "4m",10 9, "5m", 10, "6m", 11, "7m", 12, "8m",11 13, "9m", 14, "10m", 15, "20m", 16, "30m",12 17, "1h", 18, "2h"13 );14 15 /**16 * 常用延迟场景映射17 */18 public static class DelayScenarios {19 public static final int ORDER_TIMEOUT_CHECK = 16; // 30分钟后检查订单超时20 public static final int PAYMENT_REMINDER = 14; // 10分钟后支付提醒21 public static final int COUPON_EXPIRE_NOTICE = 17; // 1小时后优惠券过期提醒22 public static final int RETRY_AFTER_FAILURE = 6; // 2分钟后重试23 }24}延迟消息完整实现
java
1@Component2public class DelayMessageService {3 4 private DefaultMQProducer delayProducer;5 6 @PostConstruct7 public void init() throws MQClientException {8 delayProducer = new DefaultMQProducer("delay_producer_group");9 delayProducer.setNamesrvAddr("localhost:9876");10 delayProducer.start();11 log.info("延迟消息生产者启动成功");12 }13 14 /**15 * 发送延迟消息16 */17 public void sendDelayMessage(String topic, String tag, Object messageBody, int delayLevel) {18 try {19 Message message = new Message(topic, tag, JSON.toJSONBytes(messageBody));20 21 // 设置延迟等级22 message.setDelayTimeLevel(delayLevel);23 24 SendResult result = delayProducer.send(message);25 26 log.info("延迟消息发送成功 - Topic: {}, 延迟等级: {}, 消息ID: {}", 27 topic, delayLevel, result.getMsgId());28 29 } catch (Exception e) {30 log.error("发送延迟消息失败", e);31 throw new BusinessException("延迟消息发送失败", e);32 }33 }34 35 /**36 * 订单超时检查37 */38 public void scheduleOrderTimeoutCheck(String orderId) {39 OrderTimeoutCheckEvent event = new OrderTimeoutCheckEvent(40 orderId, System.currentTimeMillis()41 );42 43 // 30分钟后检查订单是否超时44 sendDelayMessage(45 "order_timeout_topic",46 "timeout_check",47 event,48 DelayLevels.DelayScenarios.ORDER_TIMEOUT_CHECK49 );50 51 log.info("订单超时检查任务已调度: {}", orderId);52 }53 54 /**55 * 支付提醒56 */57 public void schedulePaymentReminder(String orderId, String userId) {58 PaymentReminderEvent event = new PaymentReminderEvent(59 orderId, userId, System.currentTimeMillis()60 );61 62 // 10分钟后发送支付提醒63 sendDelayMessage(64 "payment_reminder_topic",65 "payment_reminder",66 event,67 DelayLevels.DelayScenarios.PAYMENT_REMINDER68 );69 70 log.info("支付提醒任务已调度 - 订单: {}, 用户: {}", orderId, userId);71 }72 73 /**74 * 失败重试75 */76 public void scheduleRetryTask(RetryTaskEvent event) {77 // 2分钟后重试78 sendDelayMessage(79 "retry_task_topic",80 "retry",81 event,82 DelayLevels.DelayScenarios.RETRY_AFTER_FAILURE83 );84 85 log.info("重试任务已调度 - 任务ID: {}, 重试次数: {}", 86 event.getTaskId(), event.getRetryCount());87 }88 89 @PreDestroy90 public void destroy() {91 if (delayProducer != null) {92 delayProducer.shutdown();93 log.info("延迟消息生产者关闭成功");94 }95 }96}9798/**99 * 订单超时检查消费者100 */101@Component102@RocketMQMessageListener(103 topic = "order_timeout_topic",104 consumerGroup = "order_timeout_consumer_group"105)106public class OrderTimeoutConsumer implements RocketMQListener<OrderTimeoutCheckEvent> {107 108 @Autowired109 private OrderService orderService;110 111 @Override112 public void onMessage(OrderTimeoutCheckEvent event) {113 String orderId = event.getOrderId();114 log.info("执行订单超时检查: {}", orderId);115 116 try {117 Order order = orderService.getOrderById(orderId);118 119 if (order == null) {120 log.warn("订单不存在: {}", orderId);121 return;122 }123 124 // 检查订单是否已支付125 if (order.getStatus() == OrderStatus.CREATED) {126 // 订单未支付,执行超时处理127 orderService.handleOrderTimeout(orderId);128 log.info("订单超时处理完成: {}", orderId);129 } else {130 log.info("订单已支付,无需超时处理: {}", orderId);131 }132 133 } catch (Exception e) {134 log.error("订单超时检查失败: {}", orderId, e);135 throw new RuntimeException("订单超时检查失败", e);136 }137 }138}139140/**141 * 支付提醒消费者142 */143@Component144@RocketMQMessageListener(145 topic = "payment_reminder_topic",146 consumerGroup = "payment_reminder_consumer_group"147)148public class PaymentReminderConsumer implements RocketMQListener<PaymentReminderEvent> {149 150 @Autowired151 private NotificationService notificationService;152 153 @Override154 public void onMessage(PaymentReminderEvent event) {155 String orderId = event.getOrderId();156 String userId = event.getUserId();157 158 log.info("执行支付提醒 - 订单: {}, 用户: {}", orderId, userId);159 160 try {161 // 检查订单是否仍需要支付162 Order order = orderService.getOrderById(orderId);163 164 if (order != null && order.getStatus() == OrderStatus.CREATED) {165 // 发送支付提醒通知166 notificationService.sendPaymentReminder(userId, orderId);167 log.info("支付提醒发送成功 - 订单: {}", orderId);168 } else {169 log.info("订单已支付或不存在,跳过提醒 - 订单: {}", orderId);170 }171 172 } catch (Exception e) {173 log.error("支付提醒处理失败 - 订单: {}", orderId, e);174 throw new RuntimeException("支付提醒处理失败", e);175 }176 }177}4. 高可用集群架构
4.1 集群部署模式
- 集群架构
- 集群配置
- 高可用配置
集群特点:
- NameServer集群:无状态,任意节点宕机不影响服务
- Broker主从:支持同步/异步复制,主节点宕机从节点可读
- 多Master模式:支持多Master部署,提高并发能力
RocketMQ集群配置
bash
1# NameServer配置 (namesrv.conf)2# 无需特殊配置,启动多个实例即可3rocketmqHome=/opt/rocketmq4kvConfigPath=/opt/rocketmq/namesrv/kvConfig.json5configStorePath=/opt/rocketmq/namesrv/namesrv.properties6listenPort=987678# Broker Master配置 (broker-a.conf)9brokerClusterName=DefaultCluster10brokerName=broker-a11brokerId=012deleteWhen=0413fileReservedTime=4814brokerRole=SYNC_MASTER15flushDiskType=ASYNC_FLUSH1617# 网络配置18namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:987619listenPort=1091120brokerIP1=192.168.1.202122# 存储配置23storePathRootDir=/opt/rocketmq/store24storePathCommitLog=/opt/rocketmq/store/commitlog25storePathConsumeQueue=/opt/rocketmq/store/consumequeue26storePathIndex=/opt/rocketmq/store/index2728# 性能配置29sendMessageThreadPoolNums=12830pullMessageThreadPoolNums=12831queryMessageThreadPoolNums=832adminBrokerThreadPoolNums=1633clientManagerThreadPoolNums=323435# Broker Slave配置 (broker-a-s.conf)36brokerClusterName=DefaultCluster37brokerName=broker-a38brokerId=139deleteWhen=0440fileReservedTime=4841brokerRole=SLAVE42flushDiskType=ASYNC_FLUSH4344namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:987645listenPort=1092146brokerIP1=192.168.1.214748# 启动脚本49#!/bin/bash50# 启动NameServer集群51nohup sh mqnamesrv > /dev/null 2>&1 &5253# 启动Broker Master54nohup sh mqbroker -c broker-a.conf > /dev/null 2>&1 &5556# 启动Broker Slave 57nohup sh mqbroker -c broker-a-s.conf > /dev/null 2>&1 &高可用客户端配置
java
1@Configuration2public class RocketMQHighAvailabilityConfig {3 4 /**5 * 高可用生产者配置6 */7 @Bean8 public DefaultMQProducer haProducer() throws MQClientException {9 DefaultMQProducer producer = new DefaultMQProducer("ha_producer_group");10 11 // 配置多个NameServer地址12 producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");13 14 // 发送超时时间15 producer.setSendMsgTimeout(10000);16 17 // 发送失败重试次数18 producer.setRetryTimesWhenSendFailed(3);19 producer.setRetryTimesWhenSendAsyncFailed(3);20 21 // 压缩消息体阈值22 producer.setCompressMsgBodyOverHowmuch(4096);23 24 // 最大消息大小25 producer.setMaxMessageSize(4 * 1024 * 1024);26 27 producer.start();28 return producer;29 }30 31 /**32 * 高可用消费者配置33 */34 @Bean35 public DefaultMQPushConsumer haConsumer() throws MQClientException {36 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ha_consumer_group");37 38 // 配置多个NameServer地址39 consumer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");40 41 // 消费模式42 consumer.setMessageModel(MessageModel.CLUSTERING);43 44 // 消费线程数45 consumer.setConsumeThreadMin(10);46 consumer.setConsumeThreadMax(20);47 48 // 批量消费数量49 consumer.setConsumeMessageBatchMaxSize(10);50 51 // 消息拉取间隔52 consumer.setPullInterval(1000);53 54 // 消息拉取批次大小55 consumer.setPullBatchSize(32);56 57 consumer.start();58 return consumer;59 }60 61 /**62 * 连接监控和故障转移63 */64 @Component65 public static class ConnectionMonitor {66 67 @EventListener68 public void handleConnectionEvent(ConnectionEvent event) {69 if (event.getType() == ConnectionEventType.DISCONNECT) {70 log.warn("RocketMQ连接断开: {}", event.getRemoteAddr());71 // 触发重连逻辑72 handleReconnection(event);73 } else if (event.getType() == ConnectionEventType.CONNECT) {74 log.info("RocketMQ连接恢复: {}", event.getRemoteAddr());75 }76 }77 78 private void handleReconnection(ConnectionEvent event) {79 // 实现重连逻辑80 // 1. 记录连接失败信息81 // 2. 尝试连接其他NameServer82 // 3. 发送告警通知83 }84 }85}5. 性能优化与监控
5.1 性能优化策略
- 性能调优
- 监控指标
RocketMQ性能优化配置
java
1@Configuration2public class RocketMQPerformanceConfig {3 4 /**5 * 高性能生产者配置6 */7 @Bean8 public DefaultMQProducer highPerformanceProducer() throws MQClientException {9 DefaultMQProducer producer = new DefaultMQProducer("high_perf_producer");10 producer.setNamesrvAddr("localhost:9876");11 12 // 异步发送配置13 producer.setRetryTimesWhenSendAsyncFailed(0);14 producer.setSendMsgTimeout(3000);15 16 // 批量发送配置17 producer.setMaxMessageSize(4 * 1024 * 1024);18 producer.setCompressMsgBodyOverHowmuch(4096);19 20 // 客户端配置21 producer.setClientCallbackExecutorThreads(Runtime.getRuntime().availableProcessors());22 23 producer.start();24 return producer;25 }26 27 /**28 * 批量消息发送29 */30 @Service31 public static class BatchMessageService {32 33 @Autowired34 private DefaultMQProducer producer;35 36 /**37 * 批量发送消息38 */39 public void sendBatchMessages(List<MessageEvent> events) {40 if (events.isEmpty()) {41 return;42 }43 44 try {45 List<Message> messages = events.stream()46 .map(event -> new Message(47 event.getTopic(),48 event.getTag(),49 event.getKey(),50 JSON.toJSONBytes(event)51 ))52 .collect(Collectors.toList());53 54 // 批量发送55 SendResult result = producer.send(messages);56 log.info("批量消息发送成功,数量: {}, 消息ID: {}", 57 messages.size(), result.getMsgId());58 59 } catch (Exception e) {60 log.error("批量消息发送失败", e);61 throw new BusinessException("批量消息发送失败", e);62 }63 }64 65 /**66 * 异步批量发送67 */68 public void sendBatchMessagesAsync(List<MessageEvent> events) {69 if (events.isEmpty()) {70 return;71 }72 73 try {74 List<Message> messages = events.stream()75 .map(event -> new Message(76 event.getTopic(),77 event.getTag(), 78 event.getKey(),79 JSON.toJSONBytes(event)80 ))81 .collect(Collectors.toList());82 83 // 异步批量发送84 producer.send(messages, new SendCallback() {85 @Override86 public void onSuccess(SendResult sendResult) {87 log.info("异步批量消息发送成功,数量: {}, 消息ID: {}", 88 messages.size(), sendResult.getMsgId());89 }90 91 @Override92 public void onException(Throwable e) {93 log.error("异步批量消息发送失败,数量: {}", messages.size(), e);94 }95 });96 97 } catch (Exception e) {98 log.error("异步批量消息发送失败", e);99 throw new BusinessException("异步批量消息发送失败", e);100 }101 }102 }103}RocketMQ监控指标
java
1@Component2public class RocketMQMonitor {3 4 private final MeterRegistry meterRegistry;5 private final Counter messagesSentCounter;6 private final Counter messagesConsumedCounter;7 private final Timer sendLatencyTimer;8 private final Gauge queueDepthGauge;9 10 public RocketMQMonitor(MeterRegistry meterRegistry) {11 this.meterRegistry = meterRegistry;12 13 this.messagesSentCounter = Counter.builder("rocketmq.messages.sent")14 .description("发送消息总数")15 .register(meterRegistry);16 17 this.messagesConsumedCounter = Counter.builder("rocketmq.messages.consumed")18 .description("消费消息总数")19 .register(meterRegistry);20 21 this.sendLatencyTimer = Timer.builder("rocketmq.send.latency")22 .description("消息发送延迟")23 .register(meterRegistry);24 25 this.queueDepthGauge = Gauge.builder("rocketmq.queue.depth")26 .description("队列深度")27 .register(meterRegistry, this, RocketMQMonitor::getQueueDepth);28 }29 30 /**31 * 记录消息发送指标32 */33 public void recordMessageSent(String topic, long latency) {34 messagesSentCounter.increment(Tags.of("topic", topic));35 sendLatencyTimer.record(latency, TimeUnit.MILLISECONDS);36 }37 38 /**39 * 记录消息消费指标40 */41 public void recordMessageConsumed(String topic, String consumerGroup) {42 messagesConsumedCounter.increment(43 Tags.of("topic", topic, "consumer_group", consumerGroup)44 );45 }46 47 /**48 * 获取队列深度49 */50 private double getQueueDepth() {51 // 实现队列深度查询逻辑52 // 可以通过RocketMQ Admin工具查询53 return 0.0;54 }55 56 /**57 * 健康检查58 */59 @Scheduled(fixedRate = 60000)60 public void healthCheck() {61 try {62 // 检查生产者状态63 checkProducerHealth();64 65 // 检查消费者状态66 checkConsumerHealth();67 68 // 检查Broker连接状态69 checkBrokerConnection();70 71 } catch (Exception e) {72 log.error("RocketMQ健康检查失败", e);73 }74 }75 76 private void checkProducerHealth() {77 // 实现生产者健康检查78 }79 80 private void checkConsumerHealth() {81 // 实现消费者健康检查82 }83 84 private void checkBrokerConnection() {85 // 实现Broker连接检查86 }87 88 /**89 * 性能报告90 */91 @Scheduled(fixedRate = 300000) // 每5分钟生成一次报告92 public void generatePerformanceReport() {93 double sendTps = messagesSentCounter.count() / 300.0; // 5分钟内的TPS94 double consumeTps = messagesConsumedCounter.count() / 300.0;95 double avgLatency = sendLatencyTimer.mean(TimeUnit.MILLISECONDS);96 97 log.info("RocketMQ性能报告 - 发送TPS: {}, 消费TPS: {}, 平均延迟: {}ms", 98 String.format("%.2f", sendTps),99 String.format("%.2f", consumeTps),100 String.format("%.2f", avgLatency)101 );102 103 // 重置计数器104 meterRegistry.remove(messagesSentCounter);105 meterRegistry.remove(messagesConsumedCounter);106 }107}6. 最佳实践与总结
6.1 架构设计最佳实践
消息设计原则:
- Topic规划:按业务域划分Topic,避免过多细分
- 消息幂等:确保消息处理的幂等性
- 消息大小:单条消息不超过4MB,批量消息不超过1MB
- 消息Key:设置有意义的消息Key,便于问题排查
性能优化建议:
- 异步发送:优先使用异步发送提高吞吐量
- 批量操作:使用批量发送和消费
- 合理配置:根据业务特点调整线程池和缓冲区大小
- 监控告警:建立完善的监控体系
高可用设计:
- 集群部署:NameServer和Broker都要集群部署
- 主从复制:配置Broker主从同步复制
- 故障转移:客户端支持自动故障转移
- 数据备份:定期备份重要的消息数据
6.2 RocketMQ vs 其他消息队列
| 特性对比 | RocketMQ | Kafka | RabbitMQ | 适用场景 |
|---|---|---|---|---|
| 事务消息 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | 金融支付 |
| 顺序消息 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | 状态流转 |
| 延迟消息 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ | 定时任务 |
| 消息堆积 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 大数据场景 |
| 运维复杂度 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | 团队技术水平 |
| 社区生态 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 技术选型 |
RocketMQ学习建议
- 掌握核心特性:深入理解事务消息、顺序消息、延迟消息
- 实践业务场景:在实际项目中应用RocketMQ解决业务问题
- 性能调优:学会分析和优化RocketMQ性能
- 运维监控:掌握集群部署和监控告警
- 源码学习:有条件的话可以学习RocketMQ源码实现
RocketMQ作为阿里巴巴开源的分布式消息中间件,在金融级应用场景中表现出色。通过合理的架构设计和性能优化,可以构建高可用、高性能的消息系统,特别适合对可靠性要求极高的业务场景。
参与讨论