Skip to main content

Java 消息队列面试题集

总题数: 31道 | 重点领域: MQ原理、RabbitMQ、RocketMQ、Kafka | 难度分布: 中高级

本文档整理了 Java 消息队列的完整31道面试题目,涵盖消息队列原理、主流MQ产品特性等各个方面。


面试题目列表

1. 什么是消息队列?

答案:

消息队列(Message Queue,简称MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息来实现应用程序之间的异步通信。

核心概念:

  • 生产者(Producer):负责产生和发送消息到消息队列
  • 消费者(Consumer):从消息队列中获取消息并进行处理
  • 消息队列(Queue):存储消息的容器,遵循FIFO原则
  • 消息代理(Broker):消息队列服务器,负责接收、存储和转发消息

工作原理:

  1. 生产者将消息发送到消息队列
  2. 消息队列将消息持久化存储
  3. 消费者从队列中拉取或被推送消息
  4. 消费者处理完消息后,向队列发送确认

2. 为什么需要消息队列?

答案:

消息队列主要解决以下几个核心问题:

1. 应用解耦

  • 系统间不直接调用,通过消息队列进行通信
  • 降低系统间的耦合度,提高系统的可维护性
  • 某个服务宕机不会影响其他服务

2. 异步处理

  • 将非核心业务异步化,提升系统响应速度
  • 例如:用户注册后,发送邮件、短信等操作可以异步处理
  • 主流程快速返回,提升用户体验

3. 流量削峰

  • 在高并发场景下,通过消息队列缓冲请求
  • 防止系统因瞬时流量过大而崩溃
  • 消费者按照自己的处理能力消费消息

4. 数据分发

  • 一个消息可以被多个消费者消费
  • 实现数据的一对多分发

5. 最终一致性

  • 通过消息队列实现分布式事务的最终一致性
  • 保证数据在各个系统间的一致性

3. 说一下消息队列的模型有哪些?

答案:

消息队列主要有两种消息模型:

1. 点对点模型(Point-to-Point,P2P)

特点:

  • 基于队列(Queue)实现
  • 一个消息只能被一个消费者消费
  • 消息被消费后会从队列中删除
  • 支持多个消费者,但每条消息只会被其中一个消费者处理

应用场景:

  • 任务分发系统
  • 订单处理系统

2. 发布/订阅模型(Publish/Subscribe,Pub/Sub)

特点:

  • 基于主题(Topic)实现
  • 一个消息可以被多个消费者消费
  • 消息被消费后不会立即删除
  • 生产者发布消息到主题,订阅该主题的所有消费者都会收到消息

应用场景:

  • 消息广播
  • 日志收集
  • 事件通知

主流MQ对模型的支持:

  • RabbitMQ:支持两种模型,通过Exchange类型实现
  • Kafka:主要基于发布/订阅模型
  • RocketMQ:支持两种模型

4. 简述下消息队列核心的一些术语?

答案:

1. 基础概念

  • Producer(生产者):负责生产消息并发送到消息队列
  • Consumer(消费者):从消息队列中获取消息并消费
  • Broker(消息代理):消息队列服务器,负责接收、存储、转发消息
  • Message(消息):生产者和消费者之间传递的数据单元

2. 消息组织

  • Queue(队列):存储消息的容器,遵循FIFO原则
  • Topic(主题):消息的逻辑分类,用于发布/订阅模式
  • Partition(分区):Topic的物理分组,提高并发能力
  • Tag(标签):消息的标记,用于消息过滤

3. 消费相关

  • Consumer Group(消费者组):多个消费者组成的组,共同消费一个Topic
  • Offset(偏移量):消费者在分区中的消费位置
  • Acknowledgment(确认):消费者处理完消息后的确认机制

4. 高级特性

  • Dead Letter Queue(死信队列):无法被正常消费的消息存放的队列
  • Delay Message(延迟消息):指定时间后才能被消费的消息
  • Transaction Message(事务消息):支持分布式事务的消息
  • Message Filter(消息过滤):根据条件过滤消息

5. 可靠性相关

  • Persistence(持久化):将消息存储到磁盘,防止丢失
  • Replication(副本):消息的多副本存储,保证高可用
  • Retry(重试):消息消费失败后的重试机制

5. 如何保证消息不丢失?

答案:

消息丢失可能发生在三个阶段,需要分别处理:

1. 生产者阶段(消息发送丢失)

解决方案:

  • 同步发送 + 确认机制:等待Broker确认后再返回
  • 异步发送 + 回调:通过回调函数确认消息是否发送成功
  • 失败重试:发送失败时进行重试
  • 本地消息表:先保存到本地数据库,发送成功后删除

RabbitMQ:

java
1// 开启发送确认
2channel.confirmSelect();
3channel.basicPublish(...);
4channel.waitForConfirms(); // 等待确认

Kafka:

java
1// 设置acks=all,等待所有副本确认
2props.put("acks", "all");

2. Broker阶段(存储丢失)

解决方案:

  • 消息持久化:将消息写入磁盘
  • 主从复制:多副本机制,防止单点故障
  • 刷盘策略:同步刷盘(可靠但慢)vs 异步刷盘(快但可能丢失)

RabbitMQ:

java
1// 队列持久化
2channel.queueDeclare(queueName, true, false, false, null);
3// 消息持久化
4channel.basicPublish("", queueName,
5 MessageProperties.PERSISTENT_TEXT_PLAIN, message);

RocketMQ:

java
1// 同步刷盘
2brokerConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);

Kafka:

properties
1# 副本数量
2replication.factor=3
3# 最小同步副本数
4min.insync.replicas=2

3. 消费者阶段(消费丢失)

解决方案:

  • 手动确认(ACK):消费成功后再确认
  • 关闭自动提交:防止消息未处理就提交offset
  • 消费重试:消费失败时重试
  • 死信队列:多次失败后进入死信队列

RabbitMQ:

java
1// 手动确认
2channel.basicConsume(queueName, false, consumer);
3// 处理完成后确认
4channel.basicAck(deliveryTag, false);

Kafka:

java
1// 关闭自动提交
2props.put("enable.auto.commit", "false");
3// 手动提交
4consumer.commitSync();

综合方案:

  1. 生产者:使用确认机制 + 重试
  2. Broker:持久化 + 多副本
  3. 消费者:手动确认 + 消费重试
  4. 监控告警:及时发现问题

6. 如何处理重复消息?

答案:

消息重复的原因主要有:网络抖动、消费者宕机重启、消息重试等。解决重复消息的核心思想是幂等性

1. 业务层面保证幂等性

数据库唯一约束:

java
1// 使用唯一索引防止重复插入
2CREATE UNIQUE INDEX idx_order_id ON orders(order_id);

分布式锁:

java
1// 使用Redis分布式锁
2String lockKey = "lock:order:" + orderId;
3if (redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS)) {
4 try {
5 // 处理业务逻辑
6 processOrder(orderId);
7 } finally {
8 redisTemplate.delete(lockKey);
9 }
10}

2. 消息去重表

java
1// 使用消息ID作为唯一键
2@Transactional
3public void handleMessage(String messageId, String content) {
4 // 检查消息是否已处理
5 if (messageRecordMapper.exists(messageId)) {
6 return; // 已处理,直接返回
7 }
8
9 // 处理业务逻辑
10 processBusiness(content);
11
12 // 记录消息已处理
13 messageRecordMapper.insert(messageId);
14}

3. 全局唯一ID

java
1// 使用雪花算法生成全局唯一ID
2long messageId = snowflakeIdGenerator.nextId();
3message.setKey(String.valueOf(messageId));

4. 状态机机制

java
1// 订单状态流转
2public void updateOrderStatus(String orderId, OrderStatus newStatus) {
3 Order order = orderMapper.selectById(orderId);
4
5 // 只有在特定状态才能流转
6 if (order.getStatus() == OrderStatus.CREATED &&
7 newStatus == OrderStatus.PAID) {
8 orderMapper.updateStatus(orderId, newStatus);
9 }
10}

5. Token机制

java
1// 生成唯一token
2String token = UUID.randomUUID().toString();
3redisTemplate.opsForValue().set("token:" + token, "1", 5, TimeUnit.MINUTES);
4
5// 消费时验证token
6if (redisTemplate.delete("token:" + token) > 0) {
7 // token存在且删除成功,处理业务
8 processBusiness();
9}

最佳实践:

  • 优先使用业务层面的幂等性设计
  • 数据库层面使用唯一索引
  • 关键业务使用分布式锁
  • 记录消息处理状态

7. 如何保证消息的有序性?

答案:

1. 为什么会出现乱序?

  • 多个生产者并发发送
  • 消息在多个分区/队列中存储
  • 多个消费者并发消费

2. Kafka保证有序性

单分区有序:

java
1// 方案1:指定分区
2producer.send(new ProducerRecord<>("topic", partition, key, value));
3
4// 方案2:使用相同的key,会路由到同一分区
5producer.send(new ProducerRecord<>("topic", orderId, message));
6
7// 消费者:一个分区只能被消费者组中的一个消费者消费

配置要求:

properties
1# 生产者:设置为1,保证消息顺序发送
2max.in.flight.requests.per.connection=1
3
4# 或使用幂等性生产者(Kafka 1.0+)
5enable.idempotence=true

3. RocketMQ保证有序性

全局有序:

java
1// 只创建一个队列
2// 只有一个生产者和一个消费者

分区有序(推荐):

java
1// 发送消息时指定MessageQueueSelector
2producer.send(message, new MessageQueueSelector() {
3 @Override
4 public MessageQueue select(List<MessageQueue> mqs,
5 Message msg, Object arg) {
6 // 根据订单ID选择队列
7 Long orderId = (Long) arg;
8 int index = (int) (orderId % mqs.size());
9 return mqs.get(index);
10 }
11}, orderId);
12
13// 消费者使用MessageListenerOrderly
14consumer.registerMessageListener(new MessageListenerOrderly() {
15 @Override
16 public ConsumeOrderlyStatus consumeMessage(
17 List<MessageExt> msgs, ConsumeOrderlyContext context) {
18 // 顺序消费
19 return ConsumeOrderlyStatus.SUCCESS;
20 }
21});

4. RabbitMQ保证有序性

java
1// 使用单个队列 + 单个消费者
2channel.basicQos(1); // 每次只处理一条消息
3
4// 或使用一致性哈希交换机
5Map<String, Object> args = new HashMap<>();
6args.put("hash-header", "order-id");
7channel.exchangeDeclare("exchange", "x-consistent-hash", true, false, args);

5. 业务层面保证有序

使用版本号:

java
1// 消息携带版本号
2public class OrderMessage {
3 private String orderId;
4 private Integer version;
5 private String content;
6}
7
8// 消费时检查版本号
9if (message.getVersion() > currentVersion) {
10 processMessage(message);
11 updateVersion(message.getVersion());
12}

使用时间戳:

java
1// 只处理最新的消息
2if (message.getTimestamp() > lastProcessedTimestamp) {
3 processMessage(message);
4}

最佳实践:

  • 使用相同的key/分区键保证相关消息路由到同一分区
  • 单分区单消费者保证顺序消费
  • 业务层面设计幂等性和版本控制

8. 如何处理消息堆积?

答案:

1. 消息堆积的原因

  • 消费者消费速度慢于生产者生产速度
  • 消费者出现故障或宕机
  • 消费逻辑复杂,处理时间长
  • 消费者数量不足

2. 临时解决方案

增加消费者数量:

java
1// 快速扩容消费者实例
2// 注意:消费者数量不能超过分区数(Kafka)或队列数(RocketMQ)

提高消费并行度:

java
1// RocketMQ:增加消费线程数
2consumer.setConsumeThreadMin(20);
3consumer.setConsumeThreadMax(64);
4
5// Kafka:增加分区数
6kafka-topics.sh --alter --topic my-topic --partitions 10

批量消费:

java
1// RocketMQ批量消费
2consumer.setConsumeMessageBatchMaxSize(100);
3
4// Kafka批量拉取
5props.put("max.poll.records", 500);

3. 紧急处理方案

临时队列转储:

java
1// 创建临时Topic,将消息快速转移
2// 使用更多消费者并行处理临时Topic
3public void transferMessages() {
4 // 从堆积队列消费
5 List<Message> messages = consumer.poll();
6
7 // 快速转发到临时队列
8 for (Message msg : messages) {
9 producer.send("temp-topic", msg);
10 }
11}

降级处理:

java
1// 对非核心消息进行降级处理
2if (isNonCriticalMessage(message)) {
3 // 简化处理逻辑或直接丢弃
4 logAndDiscard(message);
5} else {
6 // 正常处理
7 processMessage(message);
8}

4. 长期优化方案

优化消费逻辑:

java
1// 异步处理耗时操作
2@Async
3public void processTimeConsumingTask(Message message) {
4 // 耗时操作
5}
6
7// 批量处理数据库操作
8public void batchInsert(List<Order> orders) {
9 orderMapper.batchInsert(orders);
10}

合理设置消费参数:

java
1// Kafka
2props.put("fetch.min.bytes", 1024 * 1024); // 1MB
3props.put("fetch.max.wait.ms", 500);
4
5// RocketMQ
6consumer.setPullBatchSize(32);
7consumer.setConsumeMessageBatchMaxSize(10);

5. 监控和预警

java
1// 监控消息堆积数量
2long lag = getConsumerLag();
3if (lag > threshold) {
4 sendAlert("消息堆积告警:" + lag);
5}
6
7// Kafka监控
8kafka-consumer-groups.sh --describe --group my-group
9
10// RocketMQ监控
11mqadmin consumerProgress -g my-group

6. 架构优化

  • 增加分区/队列数:提高并行度
  • 使用消息过滤:减少无效消息
  • 分离核心和非核心消息:不同Topic不同优先级
  • 引入缓存:减少数据库查询
  • 数据库优化:添加索引、优化SQL

最佳实践:

  1. 提前规划好分区数和消费者数
  2. 做好监控和告警
  3. 优化消费逻辑,提高处理速度
  4. 准备好扩容方案

9. 消息队列设计成推消息还是拉消息?推拉模式的优缺点?

答案:

1. 推模式(Push)

工作原理:

  • Broker主动将消息推送给消费者
  • 消费者被动接收消息

优点:

  • 实时性高,消息到达后立即推送
  • 消费者实现简单,无需主动拉取
  • 适合消息量少、实时性要求高的场景

缺点:

  • Broker需要维护每个消费者的状态
  • 难以控制推送速率,可能压垮消费者
  • 消费者处理能力不同,难以做到流量控制

适用场景:

  • 实时性要求高
  • 消息量不大
  • 消费者处理能力稳定

代表:RabbitMQ(默认推模式)

2. 拉模式(Pull)

工作原理:

  • 消费者主动从Broker拉取消息
  • Broker被动等待消费者请求

优点:

  • 消费者可以根据自己的处理能力拉取消息
  • 支持批量拉取,提高吞吐量
  • Broker实现简单,无需维护消费者状态
  • 消费者可以控制消费速率

缺点:

  • 实时性相对较差
  • 消费者需要循环拉取,可能造成空轮询
  • 消费者实现相对复杂

适用场景:

  • 高吞吐量场景
  • 消费者处理能力差异大
  • 需要批量处理消息

代表:Kafka、RocketMQ(默认拉模式)

3. Kafka的拉模式实现

java
1// 消费者主动拉取
2while (true) {
3 ConsumerRecords<String, String> records =
4 consumer.poll(Duration.ofMillis(100));
5
6 for (ConsumerRecord<String, String> record : records) {
7 processMessage(record);
8 }
9}

优化空轮询:

properties
1# 长轮询:如果没有数据,Broker会等待一段时间
2fetch.min.bytes=1
3fetch.max.wait.ms=500

4. RabbitMQ的推模式实现

java
1// 推模式
2channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
3 @Override
4 public void handleDelivery(String consumerTag, Envelope envelope,
5 AMQP.BasicProperties properties, byte[] body) {
6 processMessage(body);
7 channel.basicAck(envelope.getDeliveryTag(), false);
8 }
9});
10
11// RabbitMQ也支持拉模式
12GetResponse response = channel.basicGet(queueName, false);
13if (response != null) {
14 processMessage(response.getBody());
15 channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
16}

5. 混合模式

RocketMQ的长轮询:

java
1// 消费者拉取,但Broker会hold住请求
2// 如果有新消息立即返回,否则等待一段时间
3consumer.setPullTimeoutMillis(30000);

对比总结:

特性推模式拉模式
实时性相对较低
吞吐量较低
流量控制困难容易
Broker复杂度
消费者复杂度
批量处理不支持支持

最佳实践:

  • 高吞吐量场景:选择拉模式(Kafka、RocketMQ)
  • 实时性要求高:选择推模式(RabbitMQ)
  • 现代MQ:采用长轮询,兼顾实时性和吞吐量

10. RocketMQ 的事务消息有什么缺点?你还了解过别的事务消息实现吗?

答案:

1. RocketMQ事务消息原理

java
1// 发送事务消息
2TransactionMQProducer producer = new TransactionMQProducer("group");
3producer.setTransactionListener(new TransactionListener() {
4 @Override
5 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
6 // 执行本地事务
7 try {
8 executeLocalBusiness();
9 return LocalTransactionState.COMMIT_MESSAGE;
10 } catch (Exception e) {
11 return LocalTransactionState.ROLLBACK_MESSAGE;
12 }
13 }
14
15 @Override
16 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
17 // 事务回查
18 return checkLocalTransactionStatus();
19 }
20});
21
22// 发送半消息
23producer.sendMessageInTransaction(message, null);

流程:

  1. 发送半消息(Half Message)到Broker
  2. 执行本地事务
  3. 根据本地事务结果提交或回滚消息
  4. 如果长时间未收到确认,Broker回查事务状态

2. RocketMQ事务消息的缺点

缺点1:只支持单向消息

  • 只保证生产者的本地事务和发送消息的一致性
  • 不保证消费者的消费和本地事务的一致性
  • 消费者仍需自己保证幂等性

缺点2:事务回查的性能开销

  • Broker需要定期回查事务状态
  • 回查频率过高影响性能
  • 需要业务方实现回查接口

缺点3:半消息的存储开销

  • 半消息需要单独存储
  • 增加了存储成本

缺点4:不支持跨集群事务

  • 只能在单个RocketMQ集群内使用

缺点5:实现复杂

  • 需要实现事务监听器
  • 需要实现事务回查逻辑
  • 需要考虑各种异常情况

3. 其他事务消息实现

Kafka事务消息:

java
1// 生产者配置
2props.put("transactional.id", "my-transactional-id");
3props.put("enable.idempotence", true);
4
5// 使用事务
6producer.initTransactions();
7try {
8 producer.beginTransaction();
9
10 // 发送消息
11 producer.send(record1);
12 producer.send(record2);
13
14 // 执行本地事务
15 executeLocalTransaction();
16
17 // 提交事务
18 producer.commitTransaction();
19} catch (Exception e) {
20 producer.abortTransaction();
21}
22
23// 消费者配置
24props.put("isolation.level", "read_committed");

特点:

  • 支持跨分区、跨Topic的事务
  • 保证Exactly-Once语义
  • 支持消费-转换-生产的原子性
  • 性能相对较好

缺点:

  • 不支持本地事务和消息发送的原子性
  • 主要用于消息系统内部的事务

4. 基于本地消息表的事务实现

java
1@Transactional
2public void createOrder(Order order) {
3 // 1. 插入订单
4 orderMapper.insert(order);
5
6 // 2. 插入本地消息表
7 LocalMessage message = new LocalMessage();
8 message.setContent(JSON.toJSONString(order));
9 message.setStatus(MessageStatus.PENDING);
10 messageMapper.insert(message);
11}
12
13// 定时任务扫描本地消息表
14@Scheduled(fixedDelay = 1000)
15public void sendPendingMessages() {
16 List<LocalMessage> messages =
17 messageMapper.selectPendingMessages();
18
19 for (LocalMessage message : messages) {
20 try {
21 // 发送消息
22 producer.send(message.getContent());
23
24 // 更新状态为已发送
25 message.setStatus(MessageStatus.SENT);
26 messageMapper.update(message);
27 } catch (Exception e) {
28 // 发送失败,下次继续重试
29 }
30 }
31}

优点:

  • 实现简单
  • 可靠性高
  • 支持任何消息队列

缺点:

  • 需要额外的表和定时任务
  • 有一定延迟
  • 需要处理消息表的清理

5. 基于TCC的分布式事务

java
1// Try阶段:预留资源
2public void tryCreateOrder(Order order) {
3 // 冻结库存
4 inventoryService.freeze(order.getProductId(), order.getQuantity());
5 // 发送消息到MQ
6 producer.send(createOrderMessage);
7}
8
9// Confirm阶段:确认提交
10public void confirmCreateOrder(Order order) {
11 // 扣减库存
12 inventoryService.deduct(order.getProductId(), order.getQuantity());
13}
14
15// Cancel阶段:回滚
16public void cancelCreateOrder(Order order) {
17 // 释放库存
18 inventoryService.unfreeze(order.getProductId(), order.getQuantity());
19}

对比总结:

方案优点缺点适用场景
RocketMQ事务消息实现相对简单只支持单向、需要回查生产者事务
Kafka事务性能好、支持跨分区不支持本地事务消息系统内部
本地消息表可靠性高、通用有延迟、需要额外表通用场景
TCC一致性强实现复杂强一致性要求

最佳实践:

  • 生产者本地事务 + 消息发送:使用RocketMQ事务消息或本地消息表
  • 消息系统内部事务:使用Kafka事务
  • 强一致性要求:使用TCC或Saga
  • 最终一致性:使用可靠消息最终一致性方案

11. 说一下 Kafka 中关于事务消息的实现?

答案:

Kafka从0.11版本开始支持事务消息,主要用于实现Exactly-Once语义。

1. 核心概念

Transaction Coordinator(事务协调器):

  • 负责管理事务状态
  • 每个生产者都有一个对应的事务协调器
  • 存储在__transaction_state内部Topic中

TransactionalId(事务ID):

  • 唯一标识一个生产者
  • 用于故障恢复和幂等性保证

Producer Epoch:

  • 生产者的版本号
  • 防止僵尸实例

2. 事务实现原理

java
1// 配置事务生产者
2props.put("transactional.id", "my-transactional-id");
3props.put("enable.idempotence", true);
4props.put("acks", "all");
5
6KafkaProducer<String, String> producer = new KafkaProducer<>(props);
7
8// 初始化事务
9producer.initTransactions();
10
11try {
12 // 开启事务
13 producer.beginTransaction();
14
15 // 发送消息
16 producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
17 producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
18
19 // 提交事务
20 producer.commitTransaction();
21} catch (Exception e) {
22 // 回滚事务
23 producer.abortTransaction();
24}

3. 事务流程

阶段1:查找事务协调器

  • 生产者根据transactional.id找到对应的事务协调器
  • 发送FindCoordinator请求

阶段2:初始化事务

  • 生产者向协调器发送InitPidRequest
  • 协调器分配PID(Producer ID)和Epoch
  • 协调器在__transaction_state中记录事务信息

阶段3:开始事务

  • 调用beginTransaction()标记事务开始
  • 本地操作,不与Broker交互

阶段4:消费-转换-生产(可选)

java
1// 读取消息、处理、发送到新Topic
2producer.beginTransaction();
3ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
4for (ConsumerRecord<String, String> record : records) {
5 // 处理消息
6 String result = process(record.value());
7 // 发送到新Topic
8 producer.send(new ProducerRecord<>("output-topic", result));
9}
10// 提交消费位移到事务
11producer.sendOffsetsToTransaction(offsets, "consumer-group-id");
12producer.commitTransaction();

阶段5:提交或回滚

提交流程:

  1. 生产者发送EndTxnRequest(COMMIT)
  2. 协调器写入PREPARE_COMMIT到事务日志
  3. 协调器向所有相关分区写入事务提交标记
  4. 协调器写入COMPLETE_COMMIT到事务日志

回滚流程:

  1. 生产者发送EndTxnRequest(ABORT)
  2. 协调器写入PREPARE_ABORT到事务日志
  3. 协调器向所有相关分区写入事务回滚标记
  4. 协调器写入COMPLETE_ABORT到事务日志

4. 消费者读取事务消息

java
1// 配置隔离级别
2props.put("isolation.level", "read_committed");
3
4KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
5// 只能读取已提交的消息

隔离级别:

  • read_uncommitted:默认,读取所有消息
  • read_committed:只读取已提交的事务消息

5. 事务保证

原子性:

  • 多个分区的消息要么全部成功,要么全部失败
  • 通过两阶段提交实现

幂等性:

  • 通过PID + Epoch + Sequence Number保证
  • 防止消息重复

隔离性:

  • 未提交的消息对read_committed消费者不可见

6. 事务状态机

1Empty -> Ongoing -> PrepareCommit -> CompleteCommit
2 \-> PrepareAbort -> CompleteAbort

7. 性能影响

优点:

  • 保证Exactly-Once语义
  • 支持跨分区、跨Topic的原子性

缺点:

  • 性能开销较大(约30%)
  • 延迟增加
  • 需要额外的存储空间

最佳实践:

  • 只在需要强一致性时使用事务
  • 合理设置transaction.timeout.ms
  • 监控事务协调器的性能

12. 你了解 Kafka 中的时间轮实现吗?

答案:

Kafka使用时间轮(Timing Wheel)算法来高效管理大量延时任务,主要用于处理延时操作、心跳检测、超时控制等。

1. 为什么需要时间轮?

传统方案的问题:

  • JDK Timer:单线程,任务阻塞会影响其他任务
  • ScheduledExecutorService:基于堆实现,插入和删除O(logN)
  • DelayQueue:基于优先队列,性能不够高

Kafka的需求:

  • 需要管理大量延时任务(百万级)
  • 插入、删除操作要快(O(1))
  • 任务执行要准确

2. 时间轮原理

基本结构:

1时间轮是一个环形数组,每个槽位存储一个任务链表
2
3 0 1 2 3 4 5 6 7
4 [*]-[*]-[*]-[*]-[*]-[*]-[*]-[*]
5 | | | | | | | |
6 任务 任务 任务 ... 任务

核心参数:

  • tickMs:时间轮的时间精度(滴答间隔),如1ms
  • wheelSize:时间轮的槽位数,如20
  • interval:时间轮的时间跨度 = tickMs × wheelSize
  • currentTime:当前时间指针

3. Kafka的时间轮实现

TimingWheel类:

java
1class TimingWheel(
2 tickMs: Long, // 时间精度
3 wheelSize: Int, // 槽位数
4 startMs: Long, // 起始时间
5 taskCounter: AtomicInteger,
6 queue: DelayQueue[TimerTaskList]
7) {
8 private val interval = tickMs * wheelSize // 时间跨度
9 private val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ =>
10 new TimerTaskList(taskCounter)
11 }
12 private var currentTime = startMs - (startMs % tickMs)
13
14 // 上层时间轮(用于更长的延时)
15 @volatile private var overflowWheel: TimingWheel = null
16}

4. 层级时间轮

多层结构:

1第1层:tickMs=1ms, wheelSize=20, interval=20ms
2第2层:tickMs=20ms, wheelSize=20, interval=400ms
3第3层:tickMs=400ms,wheelSize=20, interval=8000ms
4...

任务分配:

  • 延时 < 20ms:放入第1层
  • 延时 < 400ms:放入第2层
  • 延时 < 8000ms:放入第3层
  • 以此类推

5. 添加任务

java
1def add(timerTaskEntry: TimerTaskEntry): Boolean = {
2 val expiration = timerTaskEntry.expirationMs
3
4 if (timerTaskEntry.cancelled) {
5 false
6 } else if (expiration < currentTime + tickMs) {
7 // 已过期,立即执行
8 false
9 } else if (expiration < currentTime + interval) {
10 // 在当前时间轮范围内
11 val virtualId = expiration / tickMs
12 val bucket = buckets((virtualId % wheelSize.toLong).toInt)
13 bucket.add(timerTaskEntry)
14
15 // 设置过期时间
16 if (bucket.setExpiration(virtualId * tickMs)) {
17 queue.offer(bucket)
18 }
19 true
20 } else {
21 // 超出当前时间轮范围,放入上层时间轮
22 if (overflowWheel == null) addOverflowWheel()
23 overflowWheel.add(timerTaskEntry)
24 }
25}

6. 推进时间轮

java
1def advanceClock(timeMs: Long): Unit = {
2 if (timeMs >= currentTime + tickMs) {
3 currentTime = timeMs - (timeMs % tickMs)
4
5 // 推进上层时间轮
6 if (overflowWheel != null) {
7 overflowWheel.advanceClock(currentTime)
8 }
9 }
10}

7. 任务执行流程

java
1// SystemTimer类
2def advanceClock(timeoutMs: Long): Boolean = {
3 // 从DelayQueue获取到期的bucket
4 val bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
5
6 if (bucket != null) {
7 // 推进时间轮
8 timingWheel.advanceClock(bucket.getExpiration)
9
10 // 执行bucket中的所有任务
11 bucket.flush(reinsert)
12 true
13 } else {
14 false
15 }
16}

8. 时间复杂度

操作时间复杂度
添加任务O(1)
删除任务O(1)
执行任务O(1)

9. Kafka中的应用场景

延时操作:

  • Produce请求的延时响应
  • Fetch请求的延时响应
  • 等待ISR副本同步

超时控制:

  • 事务超时
  • 请求超时
  • 会话超时

心跳检测:

  • 消费者心跳
  • 副本同步心跳

10. 优势

  • 高性能:O(1)的插入和删除
  • 低延迟:精确的时间控制
  • 可扩展:支持层级结构,处理任意长的延时
  • 内存友好:只存储任务引用,不复制任务

最佳实践:

  • 根据业务需求设置合适的tickMs和wheelSize
  • 避免创建过多的时间轮层级
  • 及时清理已取消的任务

13. 说说 Kafka 的索引设计有什么亮点?

答案:

Kafka的索引设计非常巧妙,主要包括偏移量索引时间戳索引,实现了快速定位消息的功能。

1. 索引文件结构

三种文件:

100000000000000000000.log # 日志文件(存储消息)
200000000000000000000.index # 偏移量索引文件
300000000000000000000.timeindex # 时间戳索引文件

文件命名:

  • 文件名是该segment的起始offset
  • 例如:00000000000000368769.log表示从offset 368769开始

2. 偏移量索引(OffsetIndex)

索引格式:

1[相对offset, 物理位置]
2[4 bytes, 4 bytes]

示例:

1相对offset 物理位置
20 0
310 1024
420 2048
530 3072

特点:

  • 稀疏索引:不是每条消息都有索引,而是每隔一定数量的消息建立一个索引
  • 相对offset:存储的是相对于segment起始offset的相对值
  • 固定大小:每个索引项8字节,便于二分查找
  • 内存映射:使用mmap技术,提高访问速度

3. 时间戳索引(TimeIndex)

索引格式:

1[时间戳, 相对offset]
2[8 bytes, 4 bytes]

示例:

1时间戳 相对offset
21609459200000 0
31609459260000 100
41609459320000 200

用途:

  • 根据时间戳查找消息
  • 支持基于时间的消息清理
  • 支持基于时间的消息消费

4. 查找消息的过程

根据offset查找消息:

java
1// 1. 定位segment文件
2// 假设要查找offset=368800的消息
3// 找到segment: 00000000000000368769.log
4
5// 2. 在index文件中二分查找
6// 查找小于等于368800的最大索引项
7// 假设找到:相对offset=31, 物理位置=3072
8
9// 3. 从物理位置3072开始顺序扫描log文件
10// 直到找到offset=368800的消息

根据时间戳查找消息:

java
1// 1. 在timeindex文件中二分查找
2// 找到小于等于目标时间戳的最大索引项
3// 得到相对offset
4
5// 2. 使用相对offset在index文件中查找
6// 得到物理位置
7
8// 3. 从物理位置开始顺序扫描

5. 索引的亮点

亮点1:稀疏索引

  • 不为每条消息建索引,节省空间
  • 通过log.index.interval.bytes控制索引密度(默认4KB)
  • 平衡了空间和查找效率

亮点2:内存映射文件(mmap)

java
1// Kafka使用MappedByteBuffer
2MappedByteBuffer mmap = channel.map(
3 FileChannel.MapMode.READ_WRITE, 0, fileSize);

优势:

  • 减少系统调用
  • 利用操作系统的页缓存
  • 零拷贝技术
  • 自动预读

亮点3:二分查找

  • 索引项固定大小,支持O(log n)的二分查找
  • 快速定位到接近目标的位置

亮点4:分段存储

  • 每个segment独立索引
  • 便于删除过期数据
  • 提高并发性能

亮点5:延迟写入

java
1// 索引不是实时写入,而是批量写入
2// 减少磁盘IO次数

6. 索引文件的创建和维护

创建时机:

java
1// 当写入的消息大小累计超过log.index.interval.bytes时
2if (bytesSinceLastIndexEntry > indexIntervalBytes) {
3 // 添加索引项
4 offsetIndex.append(offset, position);
5 timeIndex.append(timestamp, offset);
6 bytesSinceLastIndexEntry = 0;
7}

索引恢复:

java
1// Broker启动时,检查索引文件是否完整
2// 如果不完整,重建索引
3def rebuildIndex() {
4 // 遍历log文件
5 // 重新构建index和timeindex
6}

7. 配置参数

properties
1# 索引间隔(字节)
2log.index.interval.bytes=4096
3
4# segment大小
5log.segment.bytes=1073741824
6
7# 索引文件大小
8segment.index.bytes=10485760

8. 性能优化

预分配空间:

java
1// 索引文件预分配固定大小
2// 避免频繁扩容
3val indexFile = new File("00000000000000000000.index")
4indexFile.setLength(10 * 1024 * 1024) // 10MB

批量刷盘:

java
1// 不是每次写入都刷盘
2// 定期或达到一定量后刷盘
3if (needFlush) {
4 mmap.force() // 强制刷盘
5}

9. 索引的限制

  • 稀疏性:需要顺序扫描部分log文件
  • 内存占用:大量segment会占用较多内存
  • 重建成本:索引损坏时重建耗时

10. 实际应用

消费者从指定offset消费:

java
1consumer.seek(new TopicPartition("topic", 0), 368800);

消费者从指定时间消费:

java
1Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
2timestampsToSearch.put(partition, timestamp);
3Map<TopicPartition, OffsetAndTimestamp> offsets =
4 consumer.offsetsForTimes(timestampsToSearch);

最佳实践:

  • 合理设置segment大小和索引间隔
  • 监控索引文件大小和数量
  • 定期清理过期数据
  • 避免频繁的随机访问

14. 看过源码?那说说 Kafka 控制器事件处理全流程?

答案:

Kafka控制器(Controller)是集群中的一个特殊Broker,负责管理整个集群的元数据和状态变更。

1. 控制器的职责

核心职责:

  • 分区Leader选举:当Leader副本失效时选举新Leader
  • 分区重分配:执行分区副本的重新分配
  • Topic管理:创建、删除Topic
  • 副本管理:管理ISR列表的变更
  • Broker上下线:处理Broker的加入和离开

2. 控制器选举

选举过程:

java
1// 1. 每个Broker启动时尝试在ZK创建/controller节点
2try {
3 zkClient.createEphemeralNodeForController("/controller", brokerId);
4 // 创建成功,成为Controller
5 becomeController();
6} catch (NodeExistsException e) {
7 // 节点已存在,注册监听器
8 zkClient.registerControllerChangeListener();
9}

Controller信息:

json
1{
2 "version": 1,
3 "brokerid": 0,
4 "timestamp": "1609459200000"
5}

3. 事件队列机制

ControllerEventManager:

scala
1class ControllerEventManager(
2 controllerId: Int,
3 processor: ControllerEventProcessor,
4 eventQueueTimeHist: Histogram
5) {
6 // 事件队列
7 private val queue = new LinkedBlockingQueue[ControllerEvent]
8
9 // 事件处理线程
10 private val thread = new ControllerEventThread("controller-event-thread")
11}

事件类型:

scala
1sealed trait ControllerEvent
2case object Startup extends ControllerEvent
3case object BrokerChange extends ControllerEvent
4case class TopicChange(topics: Set[String]) extends ControllerEvent
5case class PartitionModifications(topic: String) extends ControllerEvent
6case class LeaderAndIsrChange(partitions: Set[TopicPartition]) extends ControllerEvent
7case class ControllerChange extends ControllerEvent
8case object Reelect extends ControllerEvent

4. 事件处理全流程

流程图:

1事件产生 -> 加入队列 -> 事件线程取出 -> 状态机处理 -> 发送请求 -> 更新元数据

详细步骤:

步骤1:事件产生

scala
1// ZK监听器触发事件
2class BrokerChangeListener extends IZkChildListener {
3 override def handleChildChange(parentPath: String,
4 currentChildren: java.util.List[String]) {
5 // 将事件加入队列
6 eventManager.put(BrokerChange)
7 }
8}

步骤2:事件入队

scala
1def put(event: ControllerEvent): Unit = {
2 queue.put(event)
3}

步骤3:事件处理线程

scala
1class ControllerEventThread extends ShutdownableThread {
2 override def doWork(): Unit = {
3 // 从队列取出事件
4 val event = queue.take()
5
6 try {
7 // 处理事件
8 processor.process(event)
9 } catch {
10 case e: Throwable => error("Error processing event", e)
11 }
12 }
13}

步骤4:事件处理器

scala
1def process(event: ControllerEvent): Unit = {
2 event match {
3 case BrokerChange =>
4 processBrokerChange()
5 case TopicChange(topics) =>
6 processTopicChange(topics)
7 case PartitionModifications(topic) =>
8 processPartitionModifications(topic)
9 case LeaderAndIsrChange(partitions) =>
10 processLeaderAndIsrChange(partitions)
11 // ...
12 }
13}

5. 具体事件处理示例

示例1:Broker下线处理

scala
1def processBrokerChange(): Unit = {
2 // 1. 获取当前存活的Broker列表
3 val curBrokers = zkClient.getAllBrokersInCluster()
4 val curBrokerIds = curBrokers.map(_.id).toSet
5
6 // 2. 找出新增和删除的Broker
7 val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds
8 val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds
9
10 // 3. 更新Controller上下文
11 controllerContext.liveBrokerIds = curBrokerIds
12
13 // 4. 处理新增Broker
14 if (newBrokerIds.nonEmpty) {
15 onBrokerStartup(newBrokerIds.toSeq)
16 }
17
18 // 5. 处理下线Broker
19 if (deadBrokerIds.nonEmpty) {
20 onBrokerFailure(deadBrokerIds.toSeq)
21 }
22}
23
24def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {
25 // 1. 找出受影响的分区(Leader在下线Broker上)
26 val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo
27 .filter { case (_, leaderIsrAndControllerEpoch) =>
28 deadBrokers.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
29 }.keySet
30
31 // 2. 为这些分区选举新Leader
32 partitionStateMachine.handleStateChanges(
33 partitionsWithoutLeader.toSeq,
34 OnlinePartition,
35 Option(OfflinePartitionLeaderElectionStrategy)
36 )
37
38 // 3. 从ISR中移除下线的副本
39 replicaStateMachine.handleStateChanges(
40 getReplicasOnBrokers(deadBrokers),
41 OfflineReplica
42 )
43}

示例2:分区Leader选举

scala
1def electLeader(
2 partition: TopicPartition,
3 strategy: PartitionLeaderElectionStrategy
4): Option[Int] = {
5
6 val assignment = controllerContext.partitionReplicaAssignment(partition)
7 val liveReplicas = assignment.filter(r =>
8 controllerContext.isReplicaOnline(r, partition))
9
10 strategy match {
11 case OfflinePartitionLeaderElectionStrategy =>
12 // 从ISR中选择第一个存活的副本
13 liveReplicas.find(r =>
14 controllerContext.partitionLeadershipInfo(partition)
15 .leaderAndIsr.isr.contains(r)
16 )
17
18 case PreferredReplicaPartitionLeaderElectionStrategy =>
19 // 选择preferred replica(AR中的第一个)
20 if (liveReplicas.head == assignment.head)
21 Some(assignment.head)
22 else
23 None
24 }
25}

6. 状态机

Kafka使用两个状态机管理状态变更:

ReplicaStateMachine(副本状态机):

1NewReplica -> OnlineReplica -> OfflineReplica -> ReplicaDeletionStarted
2 -> ReplicaDeletionSuccessful
3 -> NonExistentReplica

PartitionStateMachine(分区状态机):

1NewPartition -> OnlinePartition -> OfflinePartition -> NonExistentPartition

7. 请求发送

scala
1// Controller向Broker发送请求
2def sendRequest(
3 brokerId: Int,
4 request: AbstractControlRequest
5): Unit = {
6
7 request match {
8 case leaderAndIsrRequest: LeaderAndIsrRequest =>
9 // 发送Leader和ISR变更请求
10 controllerChannelManager.sendRequest(brokerId, leaderAndIsrRequest)
11
12 case updateMetadataRequest: UpdateMetadataRequest =>
13 // 发送元数据更新请求
14 controllerChannelManager.sendRequest(brokerId, updateMetadataRequest)
15
16 case stopReplicaRequest: StopReplicaRequest =>
17 // 发送停止副本请求
18 controllerChannelManager.sendRequest(brokerId, stopReplicaRequest)
19 }
20}

8. 元数据更新

scala
1// 更新ZK中的元数据
2def updateLeaderAndIsr(
3 partition: TopicPartition,
4 newLeaderAndIsr: LeaderAndIsr
5): Unit = {
6
7 val path = s"/brokers/topics/${partition.topic}/partitions/${partition.partition}/state"
8
9 zkClient.updatePersistentPath(path, newLeaderAndIsr.toJson)
10}

9. Controller Epoch

scala
1// 每次Controller变更时递增
2var controllerEpoch: Int = 0
3
4def incrementControllerEpoch(): Unit = {
5 controllerEpoch += 1
6 zkClient.updatePersistentPath("/controller_epoch", controllerEpoch.toString)
7}

作用:

  • 防止脑裂
  • 拒绝过期Controller的请求
  • 保证操作的顺序性

10. 性能优化

批量处理:

scala
1// 批量处理分区变更
2def batchLeaderElection(
3 partitions: Set[TopicPartition]
4): Unit = {
5 // 一次性处理多个分区
6}

异步发送:

scala
1// 异步发送请求给Broker
2controllerChannelManager.sendRequest(brokerId, request, callback)

最佳实践:

  • 监控Controller的事件队列大小
  • 避免频繁的分区重分配
  • 合理设置ZK的session timeout
  • 关注Controller切换的频率

15. Kafka 中 Zookeeper 的作用?

答案:

ZooKeeper在Kafka中扮演着非常重要的角色,主要用于集群管理、元数据存储和分布式协调。

1. ZooKeeper的核心作用

作用1:Controller选举

  • 通过临时节点/controller实现Controller选举
  • 保证集群中只有一个Controller
  • Controller宕机后自动重新选举

作用2:Broker注册

  • 每个Broker启动时在/brokers/ids/[broker_id]创建临时节点
  • 存储Broker的host、port等信息
  • Broker宕机后节点自动删除

作用3:Topic注册

  • 存储Topic的配置信息
  • 存储分区和副本的分配信息
  • 路径:/brokers/topics/[topic_name]

作用4:分区状态管理

  • 存储每个分区的Leader、ISR等信息
  • 路径:/brokers/topics/[topic]/partitions/[partition]/state

作用5:消费者组管理(旧版)

  • 存储消费者组信息(新版已迁移到Kafka内部)
  • 存储消费offset(新版使用__consumer_offsets)

作用6:配置管理

  • 存储Topic和Broker的动态配置
  • 支持配置的动态变更

2. ZooKeeper节点结构

1/
2├── controller # Controller信息
3├── controller_epoch # Controller版本号
4├── brokers
5│ ├── ids # Broker列表
6│ │ ├── 0 # Broker 0的信息
7│ │ ├── 1 # Broker 1的信息
8│ │ └── 2 # Broker 2的信息
9│ ├── topics # Topic信息
10│ │ └── test-topic
11│ │ ├── partitions
12│ │ │ ├── 0
13│ │ │ │ └── state # 分区0的状态
14│ │ │ └── 1
15│ │ │ └── state # 分区1的状态
16│ └── seqid # Broker ID序列号
17├── config
18│ ├── topics # Topic配置
19│ ├── brokers # Broker配置
20│ └── changes # 配置变更通知
21├── admin
22│ ├── delete_topics # 待删除的Topic
23│ └── reassign_partitions # 分区重分配
24├── isr_change_notification # ISR变更通知
25└── consumers # 消费者组(旧版)
26 └── group-1
27 ├── ids # 消费者列表
28 └── offsets # 消费offset

3. 详细节点说明

Controller节点:

json
1// /controller
2{
3 "version": 1,
4 "brokerid": 0,
5 "timestamp": "1609459200000"
6}

Broker节点:

json
1// /brokers/ids/0
2{
3 "version": 4,
4 "host": "localhost",
5 "port": 9092,
6 "jmx_port": 9999,
7 "timestamp": "1609459200000",
8 "endpoints": ["PLAINTEXT://localhost:9092"],
9 "rack": null
10}

Topic节点:

json
1// /brokers/topics/test-topic
2{
3 "version": 2,
4 "partitions": {
5 "0": [0, 1, 2], // 分区0的副本在Broker 0,1,2上
6 "1": [1, 2, 0], // 分区1的副本在Broker 1,2,0上
7 "2": [2, 0, 1] // 分区2的副本在Broker 2,0,1上
8 },
9 "adding_replicas": {},
10 "removing_replicas": {}
11}

分区状态节点:

json
1// /brokers/topics/test-topic/partitions/0/state
2{
3 "controller_epoch": 1,
4 "leader": 0, // Leader副本在Broker 0
5 "version": 1,
6 "leader_epoch": 0,
7 "isr": [0, 1, 2] // ISR列表
8}

4. ZooKeeper的监听机制

Broker监听:

scala
1// Controller监听Broker变化
2zkClient.subscribeChildChanges("/brokers/ids", new IZkChildListener {
3 override def handleChildChange(parentPath: String,
4 currentChildren: java.util.List[String]) {
5 // Broker上下线处理
6 eventManager.put(BrokerChange)
7 }
8})

Topic监听:

scala
1// Controller监听Topic变化
2zkClient.subscribeChildChanges("/brokers/topics", new IZkChildListener {
3 override def handleChildChange(parentPath: String,
4 currentChildren: java.util.List[String]) {
5 // Topic创建/删除处理
6 eventManager.put(TopicChange(currentChildren.toSet))
7 }
8})

分区状态监听:

scala
1// Controller监听分区状态变化
2zkClient.subscribeDataChanges(
3 s"/brokers/topics/$topic/partitions/$partition/state",
4 new IZkDataListener {
5 override def handleDataChange(dataPath: String, data: Object) {
6 // 分区状态变更处理
7 }
8 override def handleDataDeleted(dataPath: String) {
9 // 分区删除处理
10 }
11 }
12)

5. ZooKeeper在Kafka中的工作流程

Broker启动流程:

11. 连接ZooKeeper
22. 在/brokers/ids/[broker_id]创建临时节点
33. 注册监听器监听Controller变化
44. 尝试竞选Controller
55. 从ZooKeeper加载元数据

Topic创建流程:

11. 在/brokers/topics/[topic]写入分区分配信息
22. Controller监听到变化
33. Controller为每个分区选举Leader
44. Controller更新/brokers/topics/[topic]/partitions/[partition]/state
55. Controller通知相关Broker

分区Leader选举流程:

11. Controller检测到Leader失效
22. 从ISR中选择新Leader
33. 更新ZooKeeper中的分区状态
44. 发送LeaderAndIsrRequest给相关Broker
55. 发送UpdateMetadataRequest给所有Broker

6. ZooKeeper的性能影响

性能瓶颈:

  • ZooKeeper写入性能有限(约1000 TPS)
  • 大量监听器会增加ZooKeeper负载
  • 网络延迟影响元数据更新速度
  • 频繁的元数据变更会影响性能

优化措施:

properties
1# 增加ZooKeeper会话超时时间
2zookeeper.session.timeout.ms=18000
3
4# 减少不必要的元数据更新
5# 批量处理分区变更

7. ZooKeeper的问题

问题1:扩展性限制

  • ZooKeeper集群规模有限(通常3-7个节点)
  • 大规模Kafka集群元数据量大
  • 监听器数量过多影响性能

问题2:运维复杂

  • 需要单独部署和维护ZooKeeper集群
  • 增加了系统复杂度
  • 故障排查困难

问题3:性能问题

  • Controller变更时需要重新加载所有元数据
  • 大量分区时元数据加载慢
  • 频繁的ZK写入影响性能

8. 新版Kafka的改进(KRaft模式)

KRaft模式(Kafka 2.8+):

1移除ZooKeeper依赖
2使用Raft协议实现元数据管理
3元数据存储在Kafka内部Topic中
4提高性能和可扩展性

对比:

特性ZooKeeper模式KRaft模式
依赖需要ZooKeeper无需ZooKeeper
元数据存储ZooKeeperKafka内部
Controller选举ZooKeeperRaft协议
扩展性受限更好
运维复杂度
性能一般更好

最佳实践:

  • 合理设置ZooKeeper会话超时时间
  • 监控ZooKeeper的性能指标
  • 避免频繁的元数据变更
  • 考虑升级到KRaft模式(生产环境需谨慎)
  • 定期清理ZooKeeper中的过期数据

16. Kafka为什么要抛弃 Zookeeper?

答案:

Kafka从2.8版本开始引入KRaft模式(Kafka Raft),逐步移除对ZooKeeper的依赖,主要原因如下:

1. 性能问题

元数据加载慢:

  • Controller切换时需要从ZK重新加载所有元数据
  • 大规模集群(数万分区)加载耗时可达数分钟
  • 影响集群的可用性和恢复时间

写入性能瓶颈:

  • ZooKeeper写入TPS有限(约1000-2000)
  • 频繁的元数据更新会成为瓶颈
  • 限制了Kafka的扩展性

2. 扩展性限制

集群规模限制:

  • ZooKeeper集群通常3-7个节点
  • 监听器数量过多影响性能
  • 难以支持超大规模Kafka集群

分区数量限制:

  • 大量分区导致ZK元数据膨胀
  • Controller管理开销大
  • 影响整体性能

3. 运维复杂度

额外组件:

  • 需要单独部署和维护ZooKeeper集群
  • 增加运维成本和复杂度
  • 故障点增加

版本兼容性:

  • Kafka和ZooKeeper版本兼容性问题
  • 升级困难

4. 架构问题

双写问题:

  • 元数据既在ZK又在Kafka内部
  • 数据一致性难以保证
  • 增加复杂度

依赖外部系统:

  • Kafka依赖ZooKeeper的稳定性
  • ZK故障影响Kafka可用性

5. KRaft模式的优势

性能提升:

  • Controller切换更快(秒级)
  • 元数据更新更高效
  • 支持更大规模集群

简化架构:

  • 移除ZooKeeper依赖
  • 元数据存储在Kafka内部
  • 降低运维复杂度

更好的扩展性:

  • 支持百万级分区
  • Controller性能更好
  • 集群恢复更快

6. KRaft架构

核心组件:

  • Quorum Controller:基于Raft协议的Controller
  • Metadata Topic:存储元数据的内部Topic
  • Metadata Log:元数据变更日志

工作原理:

11. 使用Raft协议选举Controller Leader
22. 元数据存储在__cluster_metadata Topic中
33. Controller变更通过日志复制同步
44. Broker从元数据Topic读取最新状态

7. 对比总结

特性ZooKeeper模式KRaft模式
外部依赖需要ZK无需ZK
Controller切换分钟级秒级
元数据存储ZooKeeperKafka内部
分区数量受限(数万)更高(百万)
运维复杂度
性能一般更好
稳定性成熟逐步成熟

最佳实践:

  • 新集群建议使用KRaft模式
  • 现有集群可规划迁移(Kafka 3.3+支持)
  • 生产环境需充分测试
  • 关注社区稳定性反馈

17. 看过源码?那说说 Kafka 处理请求的全流程?

答案:

Kafka使用Reactor模式处理客户端请求,采用多线程异步处理提高吞吐量。

1. 整体架构

核心组件:

  • Acceptor线程:接受新连接
  • Processor线程(网络线程):处理网络IO
  • RequestChannel:请求队列
  • KafkaRequestHandler线程(IO线程):处理业务逻辑
  • ResponseQueue:响应队列

2. 处理流程

1客户端 -> Acceptor -> Processor -> RequestQueue -> Handler -> ResponseQueue -> Processor -> 客户端

3. 详细步骤

步骤1:接受连接(Acceptor)

scala
1class Acceptor extends Runnable {
2 def run(): Unit = {
3 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
4 while (isRunning) {
5 val ready = nioSelector.select(500)
6 if (ready > 0) {
7 val keys = nioSelector.selectedKeys()
8 keys.forEach { key =>
9 if (key.isAcceptable) {
10 accept(key)
11 }
12 }
13 }
14 }
15 }
16
17 def accept(key: SelectionKey): Unit = {
18 val socketChannel = serverSocketChannel.accept()
19 socketChannel.configureBlocking(false)
20 // 轮询分配给Processor
21 val processor = processors(currentProcessor)
22 processor.accept(socketChannel)
23 currentProcessor = (currentProcessor + 1) % processors.length
24 }
25}

步骤2:读取请求(Processor)

scala
1class Processor extends Runnable {
2 def run(): Unit = {
3 while (isRunning) {
4 // 处理新连接
5 configureNewConnections()
6
7 // 处理响应
8 processNewResponses()
9
10 // 处理网络IO
11 poll()
12
13 // 处理完成的接收
14 processCompletedReceives()
15
16 // 处理完成的发送
17 processCompletedSends()
18 }
19 }
20
21 def processCompletedReceives(): Unit = {
22 selector.completedReceives().forEach { receive =>
23 val channel = receive.source()
24 val request = parseRequest(receive.payload())
25 // 放入请求队列
26 requestChannel.sendRequest(request)
27 }
28 }
29}

步骤3:请求入队(RequestChannel)

scala
1class RequestChannel(numProcessors: Int) {
2 // 请求队列
3 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
4
5 // 响应队列(每个Processor一个)
6 private val responseQueues = Array.fill(numProcessors)(
7 new LinkedBlockingQueue[RequestChannel.Response]()
8 )
9
10 def sendRequest(request: Request): Unit = {
11 requestQueue.put(request)
12 }
13
14 def receiveRequest(timeout: Long): Request = {
15 requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
16 }
17}

步骤4:业务处理(KafkaRequestHandler)

scala
1class KafkaRequestHandler extends Runnable {
2 def run(): Unit = {
3 while (!stopped) {
4 // 从请求队列获取请求
5 val req = requestChannel.receiveRequest(300)
6
7 if (req != null) {
8 req.requestId match {
9 case ApiKeys.PRODUCE => handleProduceRequest(req)
10 case ApiKeys.FETCH => handleFetchRequest(req)
11 case ApiKeys.METADATA => handleMetadataRequest(req)
12 case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(req)
13 // ...
14 }
15 }
16 }
17 }
18}

步骤5:Produce请求处理

scala
1def handleProduceRequest(request: RequestChannel.Request): Unit = {
2 val produceRequest = request.body[ProduceRequest]
3
4 // 1. 验证权限
5 authorize(request.session, Write, Resource(Topic, topicPartition.topic))
6
7 // 2. 追加消息到日志
8 val results = replicaManager.appendRecords(
9 timeout = produceRequest.timeout,
10 requiredAcks = produceRequest.acks,
11 internalTopicsAllowed = internalTopicsAllowed,
12 origin = AppendOrigin.Client,
13 entriesPerPartition = produceRequest.partitionRecordsOrFail
14 )
15
16 // 3. 等待副本同步(如果acks=-1)
17 if (produceRequest.acks == -1) {
18 results.foreach { case (tp, result) =>
19 result.onComplete { response =>
20 sendResponse(request, response)
21 }
22 }
23 } else {
24 // 立即响应
25 sendResponse(request, results)
26 }
27}

步骤6:Fetch请求处理

scala
1def handleFetchRequest(request: RequestChannel.Request): Unit = {
2 val fetchRequest = request.body[FetchRequest]
3
4 // 1. 读取消息
5 val fetchInfo = replicaManager.fetchMessages(
6 timeout = fetchRequest.maxWait,
7 replicaId = fetchRequest.replicaId,
8 fetchMinBytes = fetchRequest.minBytes,
9 fetchMaxBytes = fetchRequest.maxBytes,
10 fetchInfos = fetchRequest.fetchData
11 )
12
13 // 2. 如果数据不足且未超时,延迟响应
14 if (fetchInfo.bytesRead < fetchRequest.minBytes &&
15 !fetchInfo.isTimedOut) {
16 // 加入延迟队列
17 delayedFetchPurgatory.tryCompleteElseWatch(
18 delayedFetch, Seq(topicPartition)
19 )
20 } else {
21 // 立即响应
22 sendResponse(request, fetchInfo)
23 }
24}

步骤7:发送响应(Processor)

scala
1def processNewResponses(): Unit = {
2 var currentResponse: RequestChannel.Response = null
3 while ({currentResponse = dequeueResponse(); currentResponse != null}) {
4 val channelId = currentResponse.request.context.connectionId
5
6 currentResponse match {
7 case response: SendResponse =>
8 sendResponse(response)
9 case response: NoOpResponse =>
10 // 不需要响应
11 case response: CloseConnectionResponse =>
12 close(channelId)
13 }
14 }
15}
16
17def sendResponse(response: SendResponse): Unit = {
18 val responseSend = response.responseSend
19 selector.send(responseSend)
20}

4. 延迟操作(DelayedOperation)

延迟Produce:

scala
1class DelayedProduce extends DelayedOperation {
2 override def tryComplete(): Boolean = {
3 // 检查是否所有ISR副本都已同步
4 val allReplicasCaughtUp = partition.inSyncReplicas.forall { replica =>
5 replica.logEndOffset >= requiredOffset
6 }
7
8 if (allReplicasCaughtUp) {
9 forceComplete()
10 true
11 } else {
12 false
13 }
14 }
15
16 override def onComplete(): Unit = {
17 // 发送响应给客户端
18 responseCallback(response)
19 }
20}

5. 零拷贝技术

scala
1// 使用FileChannel.transferTo实现零拷贝
2def sendFile(channel: FileChannel,
3 position: Long,
4 count: Long): Long = {
5 channel.transferTo(position, count, socketChannel)
6}

6. 线程模型配置

properties
1# Acceptor线程数(通常为1)
2num.network.threads=3
3
4# Processor线程数(网络线程)
5num.io.threads=8
6
7# 请求队列大小
8queued.max.requests=500

7. 性能优化

批量处理:

  • 批量读取请求
  • 批量写入日志
  • 批量发送响应

异步处理:

  • 网络IO异步化
  • 磁盘IO异步化
  • 使用回调机制

零拷贝:

  • sendfile系统调用
  • mmap内存映射
  • 减少数据拷贝

最佳实践:

  • 合理配置线程数
  • 监控请求队列大小
  • 优化网络参数
  • 使用批量操作

18. RabbitMQ 中无法路由的消息会去到哪里?

答案:

RabbitMQ中无法路由的消息处理方式取决于发布消息时的配置。

1. 默认行为

直接丢弃:

  • 默认情况下,无法路由的消息会被直接丢弃
  • 生产者不会收到任何通知
  • 消息丢失且无法追踪

2. mandatory参数

设置mandatory=true:

java
1// 发送消息时设置mandatory参数
2channel.basicPublish(
3 exchange,
4 routingKey,
5 true, // mandatory=true
6 MessageProperties.PERSISTENT_TEXT_PLAIN,
7 message.getBytes()
8);
9
10// 添加ReturnListener监听无法路由的消息
11channel.addReturnListener(new ReturnListener() {
12 @Override
13 public void handleReturn(int replyCode,
14 String replyText,
15 String exchange,
16 String routingKey,
17 AMQP.BasicProperties properties,
18 byte[] body) {
19 System.out.println("无法路由的消息: " + new String(body));
20 System.out.println("原因: " + replyText);
21 // 处理无法路由的消息
22 handleUnroutableMessage(body);
23 }
24});

工作原理:

  • 消息无法路由时,会返回给生产者
  • 触发ReturnListener回调
  • 生产者可以记录日志或重新发送

3. 备份交换机(Alternate Exchange)

配置备份交换机:

java
1// 声明主交换机时指定备份交换机
2Map<String, Object> args = new HashMap<>();
3args.put("alternate-exchange", "backup-exchange");
4
5channel.exchangeDeclare(
6 "main-exchange",
7 "direct",
8 true,
9 false,
10 args
11);
12
13// 声明备份交换机(通常使用fanout类型)
14channel.exchangeDeclare("backup-exchange", "fanout", true);
15
16// 声明备份队列
17channel.queueDeclare("backup-queue", true, false, false, null);
18
19// 绑定备份队列到备份交换机
20channel.queueBind("backup-queue", "backup-exchange", "");

工作原理:

  • 消息无法在主交换机路由时
  • 自动转发到备份交换机
  • 备份交换机通常使用fanout类型,将消息发送到所有绑定的队列

4. 处理策略对比

策略优点缺点适用场景
默认丢弃简单消息丢失可容忍消息丢失
mandatory生产者可感知需要处理回调需要知道路由失败
备份交换机自动处理,不丢失配置复杂不能丢失消息

5. 最佳实践

方案1:mandatory + 日志记录

java
1channel.addReturnListener((replyCode, replyText, exchange,
2 routingKey, properties, body) -> {
3 // 记录到日志或数据库
4 logger.error("消息路由失败: exchange={}, routingKey={}, message={}",
5 exchange, routingKey, new String(body));
6
7 // 发送告警
8 alertService.sendAlert("消息路由失败");
9});

方案2:备份交换机 + 监控队列

java
1// 定期检查备份队列
2@Scheduled(fixedDelay = 60000)
3public void checkBackupQueue() {
4 long messageCount = getQueueMessageCount("backup-queue");
5 if (messageCount > 0) {
6 logger.warn("备份队列有{}条消息", messageCount);
7 // 处理备份队列中的消息
8 processBackupMessages();
9 }
10}

推荐配置:

  • 重要消息:使用备份交换机
  • 一般消息:使用mandatory参数
  • 开发环境:使用mandatory便于调试
  • 生产环境:根据业务重要性选择

19. RabbitMQ 中消息什么时候会进入死信交换机?

答案:

死信(Dead Letter)是指无法被正常消费的消息,RabbitMQ提供了死信交换机(DLX)来处理这些消息。

1. 产生死信的三种情况

情况1:消息被拒绝(reject/nack)且requeue=false

java
1// 消费者拒绝消息且不重新入队
2channel.basicReject(deliveryTag, false); // requeue=false
3
4// 或使用basicNack
5channel.basicNack(deliveryTag, false, false); // requeue=false

情况2:消息TTL过期

java
1// 设置消息TTL
2AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
3 .expiration("10000") // 10秒后过期
4 .build();
5
6channel.basicPublish(exchange, routingKey, properties, message);
7
8// 或设置队列TTL
9Map<String, Object> args = new HashMap<>();
10args.put("x-message-ttl", 10000); // 队列中所有消息10秒后过期
11channel.queueDeclare("my-queue", true, false, false, args);

情况3:队列达到最大长度

java
1// 设置队列最大长度
2Map<String, Object> args = new HashMap<>();
3args.put("x-max-length", 1000); // 最多1000条消息
4channel.queueDeclare("my-queue", true, false, false, args);
5
6// 或设置队列最大字节数
7args.put("x-max-length-bytes", 1048576); // 最多1MB

2. 配置死信交换机

完整配置示例:

java
1// 1. 声明死信交换机
2channel.exchangeDeclare("dlx-exchange", "direct", true);
3
4// 2. 声明死信队列
5channel.queueDeclare("dlx-queue", true, false, false, null);
6
7// 3. 绑定死信队列到死信交换机
8channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");
9
10// 4. 声明业务队列,指定死信交换机
11Map<String, Object> args = new HashMap<>();
12args.put("x-dead-letter-exchange", "dlx-exchange"); // 死信交换机
13args.put("x-dead-letter-routing-key", "dlx-routing-key"); // 死信路由键(可选)
14args.put("x-message-ttl", 10000); // 消息TTL
15args.put("x-max-length", 1000); // 队列最大长度
16
17channel.queueDeclare("business-queue", true, false, false, args);

3. 死信消息的属性变化

死信消息会携带额外信息:

java
1// 消费死信队列
2channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {
3 @Override
4 public void handleDelivery(String consumerTag, Envelope envelope,
5 AMQP.BasicProperties properties, byte[] body) {
6 // 获取死信原因
7 Map<String, Object> headers = properties.getHeaders();
8
9 // x-death包含死信详细信息
10 List<Map<String, Object>> xDeath =
11 (List<Map<String, Object>>) headers.get("x-death");
12
13 if (xDeath != null && !xDeath.isEmpty()) {
14 Map<String, Object> death = xDeath.get(0);
15 String reason = (String) death.get("reason"); // 死信原因
16 String queue = (String) death.get("queue"); // 原队列
17 String exchange = (String) death.get("exchange"); // 原交换机
18 Long count = (Long) death.get("count"); // 死信次数
19
20 System.out.println("死信原因: " + reason);
21 System.out.println("原队列: " + queue);
22 }
23 }
24});

死信原因(reason):

  • rejected:消息被拒绝
  • expired:消息过期
  • maxlen:队列达到最大长度

4. 死信队列的应用场景

场景1:消息重试

java
1// 业务队列消费失败后进入死信队列
2// 死信队列消费者进行重试
3channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {
4 @Override
5 public void handleDelivery(String consumerTag, Envelope envelope,
6 AMQP.BasicProperties properties, byte[] body) {
7 try {
8 // 重试处理
9 retryProcess(body);
10 channel.basicAck(envelope.getDeliveryTag(), false);
11 } catch (Exception e) {
12 // 重试失败,记录日志
13 logger.error("重试失败", e);
14 channel.basicNack(envelope.getDeliveryTag(), false, false);
15 }
16 }
17});

场景2:延迟队列

java
1// 利用TTL + 死信实现延迟队列
2// 1. 声明延迟队列(无消费者,消息过期后进入死信)
3Map<String, Object> delayArgs = new HashMap<>();
4delayArgs.put("x-dead-letter-exchange", "business-exchange");
5delayArgs.put("x-dead-letter-routing-key", "business-key");
6delayArgs.put("x-message-ttl", 30000); // 30秒延迟
7
8channel.queueDeclare("delay-queue", true, false, false, delayArgs);
9
10// 2. 发送消息到延迟队列
11channel.basicPublish("", "delay-queue", null, message);
12
13// 3. 30秒后消息自动进入业务队列

场景3:异常消息处理

java
1// 死信队列专门处理异常消息
2channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {
3 @Override
4 public void handleDelivery(String consumerTag, Envelope envelope,
5 AMQP.BasicProperties properties, byte[] body) {
6 // 记录异常消息
7 saveToDatabase(body, properties);
8
9 // 发送告警
10 alertService.sendAlert("发现异常消息");
11
12 channel.basicAck(envelope.getDeliveryTag(), false);
13 }
14});

5. 最佳实践

配置建议:

  • 所有业务队列都配置死信交换机
  • 死信队列设置合理的TTL和最大长度
  • 监控死信队列的消息数量
  • 定期清理死信队列

注意事项:

  • 死信消息也可能再次成为死信(避免循环)
  • 死信交换机可以是任意类型
  • 死信routing key可以重新指定

20. 说一下 AMQP 协议?

答案:

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层协议,为面向消息的中间件设计。

1. AMQP协议概述

定义:

  • 开放标准的应用层协议
  • 用于消息中间件
  • 与平台和语言无关
  • 提供统一的消息服务

版本:

  • AMQP 0-9-1:RabbitMQ主要使用的版本
  • AMQP 1.0:OASIS标准,与0-9-1不兼容

2. AMQP核心概念

核心组件:

  • Broker:消息代理服务器
  • Virtual Host:虚拟主机,隔离环境
  • Exchange:交换机,接收并路由消息
  • Queue:队列,存储消息
  • Binding:绑定,连接Exchange和Queue
  • Routing Key:路由键,消息路由规则
  • Message:消息,包含消息体和属性

3. AMQP工作模型

消息流转过程:

1Producer -> Exchange -> Binding -> Queue -> Consumer

详细流程:

  1. 生产者发送消息到Exchange
  2. Exchange根据Routing Key和Binding规则路由消息
  3. 消息被路由到一个或多个Queue
  4. 消费者从Queue获取消息
  5. 消费者处理完成后发送ACK

4. Exchange类型

Direct Exchange(直连交换机):

java
1// 精确匹配routing key
2channel.exchangeDeclare("direct-exchange", "direct");
3channel.queueBind("queue1", "direct-exchange", "key1");
4
5// 发送消息
6channel.basicPublish("direct-exchange", "key1", null, message);

Fanout Exchange(扇出交换机):

java
1// 广播到所有绑定的队列,忽略routing key
2channel.exchangeDeclare("fanout-exchange", "fanout");
3channel.queueBind("queue1", "fanout-exchange", "");
4channel.queueBind("queue2", "fanout-exchange", "");

Topic Exchange(主题交换机):

java
1// 支持通配符匹配
2// * 匹配一个单词
3// # 匹配零个或多个单词
4channel.exchangeDeclare("topic-exchange", "topic");
5channel.queueBind("queue1", "topic-exchange", "user.*.create");
6channel.queueBind("queue2", "topic-exchange", "user.#");

Headers Exchange(头交换机):

java
1// 根据消息头属性路由
2Map<String, Object> headers = new HashMap<>();
3headers.put("x-match", "all"); // all或any
4headers.put("format", "pdf");
5headers.put("type", "report");
6
7channel.queueBind("queue1", "headers-exchange", "", headers);

5. AMQP消息属性

基本属性:

java
1AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
2 .contentType("application/json") // 内容类型
3 .contentEncoding("UTF-8") // 编码
4 .deliveryMode(2) // 持久化(1=非持久化,2=持久化)
5 .priority(5) // 优先级(0-9)
6 .correlationId("correlation-id") // 关联ID
7 .replyTo("reply-queue") // 回复队列
8 .expiration("10000") // 过期时间(毫秒)
9 .messageId("message-id") // 消息ID
10 .timestamp(new Date()) // 时间戳
11 .type("order") // 消息类型
12 .userId("user") // 用户ID
13 .appId("app") // 应用ID
14 .headers(customHeaders) // 自定义头
15 .build();

6. AMQP连接模型

层次结构:

1Connection(TCP连接)
2 └── Channel(虚拟连接)
3 ├── Exchange
4 ├── Queue
5 └── Binding

连接管理:

java
1// 创建连接
2ConnectionFactory factory = new ConnectionFactory();
3factory.setHost("localhost");
4factory.setPort(5672);
5factory.setVirtualHost("/");
6factory.setUsername("guest");
7factory.setPassword("guest");
8
9Connection connection = factory.newConnection();
10
11// 创建Channel
12Channel channel = connection.createChannel();
13
14// 使用完毕后关闭
15channel.close();
16connection.close();

7. AMQP确认机制

生产者确认:

java
1// 开启发送确认
2channel.confirmSelect();
3
4// 同步确认
5channel.basicPublish(exchange, routingKey, null, message);
6channel.waitForConfirms();
7
8// 异步确认
9channel.addConfirmListener(new ConfirmListener() {
10 public void handleAck(long deliveryTag, boolean multiple) {
11 // 消息确认
12 }
13 public void handleNack(long deliveryTag, boolean multiple) {
14 // 消息未确认
15 }
16});

消费者确认:

java
1// 手动确认
2channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
3 @Override
4 public void handleDelivery(String consumerTag, Envelope envelope,
5 AMQP.BasicProperties properties, byte[] body) {
6 try {
7 processMessage(body);
8 // 确认消息
9 channel.basicAck(envelope.getDeliveryTag(), false);
10 } catch (Exception e) {
11 // 拒绝消息
12 channel.basicNack(envelope.getDeliveryTag(), false, true);
13 }
14 }
15});

8. AMQP事务

java
1// 开启事务
2channel.txSelect();
3
4try {
5 // 发送消息
6 channel.basicPublish(exchange, routingKey, null, message1);
7 channel.basicPublish(exchange, routingKey, null, message2);
8
9 // 提交事务
10 channel.txCommit();
11} catch (Exception e) {
12 // 回滚事务
13 channel.txRollback();
14}

9. AMQP协议特性

可靠性:

  • 消息持久化
  • 发送确认
  • 消费确认
  • 事务支持

灵活性:

  • 多种Exchange类型
  • 灵活的路由规则
  • 支持优先级队列

安全性:

  • 用户认证
  • 虚拟主机隔离
  • 权限控制
  • SSL/TLS支持

10. AMQP vs 其他协议

特性AMQPMQTTSTOMP
设计目标企业消息IoT简单文本
复杂度
功能丰富基础基础
性能中等中等
可靠性中等中等

最佳实践:

  • 理解AMQP模型和概念
  • 合理使用Exchange类型
  • 正确配置消息属性
  • 使用确认机制保证可靠性
  • 避免过度使用事务(性能开销大)

21. 说一下 RabbitMQ 的事务机制?

答案:

RabbitMQ提供了事务机制来保证消息的可靠性,但由于性能开销较大,生产环境更推荐使用发送者确认模式。

1. 事务基本用法

java
1Channel channel = connection.createChannel();
2
3try {
4 // 开启事务
5 channel.txSelect();
6
7 // 发送消息
8 channel.basicPublish(exchange, routingKey, null, message1.getBytes());
9 channel.basicPublish(exchange, routingKey, null, message2.getBytes());
10
11 // 模拟业务逻辑
12 doBusinessLogic();
13
14 // 提交事务
15 channel.txCommit();
16
17} catch (Exception e) {
18 // 回滚事务
19 channel.txRollback();
20 logger.error("事务回滚", e);
21}

2. 事务的工作原理

事务流程:

  1. txSelect():将Channel设置为事务模式
  2. 发送消息:消息暂存在内存中
  3. txCommit():提交事务,消息持久化到磁盘
  4. txRollback():回滚事务,丢弃所有未提交的消息

3. 事务的特点

优点:

  • 保证消息的原子性
  • 要么全部成功,要么全部失败
  • 实现简单

缺点:

  • 性能开销大(吞吐量降低约250倍)
  • 阻塞式操作,影响并发
  • 不适合高并发场景

4. 发送者确认模式(推荐)

普通确认模式:

java
1// 开启确认模式
2channel.confirmSelect();
3
4// 发送消息
5channel.basicPublish(exchange, routingKey, null, message.getBytes());
6
7// 同步等待确认
8if (channel.waitForConfirms()) {
9 System.out.println("消息发送成功");
10} else {
11 System.out.println("消息发送失败");
12}

批量确认模式:

java
1channel.confirmSelect();
2
3// 发送多条消息
4for (int i = 0; i < 100; i++) {
5 channel.basicPublish(exchange, routingKey, null, message.getBytes());
6}
7
8// 批量等待确认
9channel.waitForConfirmsOrDie();

异步确认模式(最佳):

java
1channel.confirmSelect();
2
3// 添加确认监听器
4channel.addConfirmListener(new ConfirmListener() {
5 @Override
6 public void handleAck(long deliveryTag, boolean multiple) {
7 // 消息确认成功
8 if (multiple) {
9 // 批量确认
10 confirmedMessages.headMap(deliveryTag + 1).clear();
11 } else {
12 // 单条确认
13 confirmedMessages.remove(deliveryTag);
14 }
15 }
16
17 @Override
18 public void handleNack(long deliveryTag, boolean multiple) {
19 // 消息确认失败,需要重发
20 if (multiple) {
21 // 批量失败
22 resendMessages(deliveryTag);
23 } else {
24 // 单条失败
25 resendMessage(deliveryTag);
26 }
27 }
28});
29
30// 发送消息
31long nextPublishSeqNo = channel.getNextPublishSeqNo();
32channel.basicPublish(exchange, routingKey, null, message.getBytes());
33confirmedMessages.put(nextPublishSeqNo, message);

5. 事务 vs 发送者确认

特性事务模式发送者确认模式
性能低(约250倍差距)
吞吐量
实现复杂度简单中等
阻塞性阻塞非阻塞(异步)
适用场景低并发高并发

6. 消费者事务

java
1// 消费者也可以使用事务
2channel.txSelect();
3
4QueueingConsumer consumer = new QueueingConsumer(channel);
5channel.basicConsume(queueName, false, consumer);
6
7while (true) {
8 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
9
10 try {
11 // 处理消息
12 processMessage(delivery.getBody());
13
14 // 确认消息
15 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
16
17 // 提交事务
18 channel.txCommit();
19 } catch (Exception e) {
20 // 回滚事务
21 channel.txRollback();
22 }
23}

7. 最佳实践

推荐方案:

java
1// 生产者:使用异步确认模式
2public class ReliableProducer {
3 private final ConcurrentNavigableMap<Long, String> outstandingConfirms =
4 new ConcurrentSkipListMap<>();
5
6 public void sendMessage(String message) throws IOException {
7 channel.confirmSelect();
8
9 channel.addConfirmListener(new ConfirmListener() {
10 public void handleAck(long deliveryTag, boolean multiple) {
11 cleanOutstandingConfirms(deliveryTag, multiple);
12 }
13
14 public void handleNack(long deliveryTag, boolean multiple) {
15 String msg = outstandingConfirms.get(deliveryTag);
16 logger.error("消息发送失败: {}", msg);
17 // 重发或记录
18 resendMessage(msg);
19 }
20 });
21
22 long nextSeqNo = channel.getNextPublishSeqNo();
23 channel.basicPublish(exchange, routingKey, null, message.getBytes());
24 outstandingConfirms.put(nextSeqNo, message);
25 }
26
27 private void cleanOutstandingConfirms(long sequenceNumber, boolean multiple) {
28 if (multiple) {
29 outstandingConfirms.headMap(sequenceNumber + 1).clear();
30 } else {
31 outstandingConfirms.remove(sequenceNumber);
32 }
33 }
34}

注意事项:

  • 避免在高并发场景使用事务
  • 优先使用异步确认模式
  • 事务和确认模式不能同时使用
  • 确认模式下需要处理重发逻辑

22. RabbitMQ 中主要有哪几个角色或者说概念?

答案:

RabbitMQ基于AMQP协议,包含多个核心概念和组件。

1. 核心组件

Broker(消息代理):

  • RabbitMQ服务器实例
  • 接收、存储和转发消息
  • 管理Exchange、Queue等资源

Virtual Host(虚拟主机):

  • 逻辑隔离单元
  • 类似于命名空间
  • 不同vhost之间完全隔离
  • 每个vhost有独立的Exchange、Queue、权限
java
1// 连接到指定vhost
2factory.setVirtualHost("/my-vhost");

2. 消息路由组件

Exchange(交换机):

  • 接收生产者发送的消息
  • 根据路由规则将消息路由到Queue
  • 不存储消息

类型:

  • Direct:精确匹配routing key
  • Fanout:广播到所有绑定的队列
  • Topic:通配符匹配
  • Headers:根据消息头匹配
java
1// 声明Exchange
2channel.exchangeDeclare("my-exchange", "direct", true, false, null);

Queue(队列):

  • 存储消息的容器
  • 遵循FIFO原则
  • 消息最终被消费者消费
java
1// 声明Queue
2channel.queueDeclare("my-queue", true, false, false, null);

Binding(绑定):

  • 连接Exchange和Queue的规则
  • 定义消息如何从Exchange路由到Queue
  • 包含routing key或其他匹配条件
java
1// 绑定Queue到Exchange
2channel.queueBind("my-queue", "my-exchange", "routing-key");

3. 消息组件

Message(消息):

  • 消息体(Body):实际数据
  • 消息属性(Properties):元数据
java
1AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
2 .contentType("application/json")
3 .deliveryMode(2) // 持久化
4 .priority(5)
5 .build();

Routing Key(路由键):

  • 生产者发送消息时指定
  • Exchange根据routing key路由消息

Binding Key(绑定键):

  • 绑定Queue到Exchange时指定
  • 用于匹配routing key

4. 角色组件

Producer(生产者):

  • 发送消息的应用程序
  • 连接到Broker
  • 发送消息到Exchange
java
1channel.basicPublish(exchange, routingKey, properties, message.getBytes());

Consumer(消费者):

  • 接收消息的应用程序
  • 订阅Queue
  • 处理消息
java
1channel.basicConsume(queueName, false, consumer);

Connection(连接):

  • 应用程序与Broker之间的TCP连接
  • 可以包含多个Channel
java
1Connection connection = factory.newConnection();

Channel(信道):

  • 建立在Connection之上的虚拟连接
  • 大部分操作在Channel上完成
  • 轻量级,可创建多个
java
1Channel channel = connection.createChannel();

5. 高级概念

Dead Letter Exchange(死信交换机):

  • 处理无法正常消费的消息
  • 配置在Queue上

Alternate Exchange(备份交换机):

  • 处理无法路由的消息
  • 配置在Exchange上

TTL(Time To Live):

  • 消息或队列的过期时间
  • 过期后进入死信队列或被删除

Priority Queue(优先级队列):

  • 支持消息优先级
  • 高优先级消息先被消费
java
1Map<String, Object> args = new HashMap<>();
2args.put("x-max-priority", 10);
3channel.queueDeclare("priority-queue", true, false, false, args);

6. 管理组件

User(用户):

  • 访问RabbitMQ的账户
  • 有不同的权限级别

Permission(权限):

  • 配置权限:创建/删除Exchange、Queue
  • 写权限:发送消息
  • 读权限:消费消息

Policy(策略):

  • 动态配置Queue和Exchange的行为
  • 如镜像队列、TTL等

7. 组件关系图

1Producer -> Connection -> Channel -> Exchange -> Binding -> Queue -> Consumer
2 |
3 Routing Key
4 |
5 Binding Key

8. 完整示例

java
1// 创建连接和Channel
2ConnectionFactory factory = new ConnectionFactory();
3factory.setHost("localhost");
4factory.setVirtualHost("/my-vhost");
5Connection connection = factory.newConnection();
6Channel channel = connection.createChannel();
7
8// 声明Exchange
9channel.exchangeDeclare("my-exchange", "direct", true);
10
11// 声明Queue
12channel.queueDeclare("my-queue", true, false, false, null);
13
14// 绑定
15channel.queueBind("my-queue", "my-exchange", "my-routing-key");
16
17// 生产者发送消息
18channel.basicPublish("my-exchange", "my-routing-key", null, "Hello".getBytes());
19
20// 消费者接收消息
21channel.basicConsume("my-queue", false, new DefaultConsumer(channel) {
22 @Override
23 public void handleDelivery(String consumerTag, Envelope envelope,
24 AMQP.BasicProperties properties, byte[] body) {
25 System.out.println("收到消息: " + new String(body));
26 channel.basicAck(envelope.getDeliveryTag(), false);
27 }
28});

最佳实践:

  • 合理使用Virtual Host进行环境隔离
  • 一个Connection包含多个Channel
  • 正确配置Exchange类型
  • 理解Binding规则
  • 使用持久化保证消息可靠性

23. RabbitMQ 的 routing key 和 binding key 的最大长度是多少字节?

答案:

RabbitMQ对routing key和binding key的长度有明确限制。

1. 长度限制

最大长度:255字节(255 bytes)

  • Routing Key:最大255字节
  • Binding Key:最大255字节

2. 限制原因

协议层面:

  • AMQP 0-9-1协议规定short string类型最大255字节
  • routing key和binding key都是short string类型
  • 这是协议级别的硬性限制

性能考虑:

  • 较短的key提高路由效率
  • 减少内存占耗
  • 加快匹配速度

3. 实际测试

java
1// 测试routing key长度限制
2public void testRoutingKeyLength() {
3 Channel channel = connection.createChannel();
4
5 // 255字节的routing key - 正常
6 String routingKey255 = StringUtils.repeat("a", 255);
7 channel.basicPublish("exchange", routingKey255, null, "test".getBytes());
8
9 // 256字节的routing key - 会抛出异常
10 try {
11 String routingKey256 = StringUtils.repeat("a", 256);
12 channel.basicPublish("exchange", routingKey256, null, "test".getBytes());
13 } catch (IOException e) {
14 // com.rabbitmq.client.ShutdownSignalException:
15 // channel error; protocol method: #method<channel.close>(reply-code=406, ...)
16 System.out.println("超过长度限制");
17 }
18}

4. 字符编码影响

UTF-8编码:

  • 英文字符:1字节
  • 中文字符:通常3字节
  • 特殊字符:1-4字节
java
1// 中文routing key示例
2String chineseKey = "订单.创建.成功"; // 约18字节(6个中文字符)
3String maxChineseKey = StringUtils.repeat("中", 85); // 约255字节(85个中文字符)

5. Topic Exchange的通配符

通配符不占用额外长度:

java
1// 以下都是有效的binding key
2"user.*.create" // 约14字节
3"order.#" // 约7字节
4"*.*.*.*.*.*.*.*.#" // 约17字节
5
6// 但总长度仍不能超过255字节
7String longPattern = "a.b.c.d.e..." // 最多255字节

6. 最佳实践

命名建议:

java
1// 推荐:简洁明了的routing key
2"order.created"
3"user.updated"
4"payment.success"
5
6// 避免:过长的routing key
7"com.example.service.order.domain.event.OrderCreatedEvent.v1.production"

层次结构:

java
1// 使用点号分隔的层次结构
2"业务域.实体.操作.状态"
3
4// 示例
5"order.payment.process.success" // 订单.支付.处理.成功
6"user.profile.update.complete" // 用户.资料.更新.完成

7. 超长key的处理方案

方案1:使用哈希值

java
1// 将长key转换为哈希值
2String longKey = "very.long.routing.key.that.exceeds.limit...";
3String shortKey = DigestUtils.md5Hex(longKey).substring(0, 32);
4channel.basicPublish("exchange", shortKey, null, message);

方案2:使用消息头

java
1// 将详细信息放在消息头中
2Map<String, Object> headers = new HashMap<>();
3headers.put("full-routing-info", longRoutingInfo);
4
5AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
6 .headers(headers)
7 .build();
8
9channel.basicPublish("exchange", "short-key", props, message);

方案3:重新设计路由规则

java
1// 简化routing key设计
2// 原来:com.company.department.team.service.module.action.status
3// 简化:service.action.status

8. 相关限制

其他长度限制:

  • Queue名称:最大255字节
  • Exchange名称:最大255字节
  • Virtual Host名称:最大255字节
  • 消息头键名:最大255字节

注意事项:

  • 超过限制会导致channel关闭
  • 建议保持key简短(50字节以内)
  • 使用英文和数字,避免特殊字符
  • 合理设计命名规范

24. 说说 RabbitMQ 的工作模式?

答案:

RabbitMQ支持多种工作模式(Work Pattern),适用于不同的业务场景。

1. Simple模式(简单模式)

特点:

  • 一个生产者,一个消费者
  • 最简单的模式
  • 不使用Exchange
java
1// 生产者
2channel.queueDeclare("simple-queue", true, false, false, null);
3channel.basicPublish("", "simple-queue", null, message.getBytes());
4
5// 消费者
6channel.basicConsume("simple-queue", true, new DefaultConsumer(channel) {
7 @Override
8 public void handleDelivery(String consumerTag, Envelope envelope,
9 AMQP.BasicProperties properties, byte[] body) {
10 System.out.println("收到消息: " + new String(body));
11 }
12});

适用场景:

  • 简单的点对点通信
  • 测试和学习

2. Work Queue模式(工作队列模式)

特点:

  • 一个生产者,多个消费者
  • 消息轮询分发给消费者
  • 实现任务分发和负载均衡
java
1// 生产者
2for (int i = 0; i < 100; i++) {
3 String message = "Task " + i;
4 channel.basicPublish("", "work-queue", null, message.getBytes());
5}
6
7// 消费者1和消费者2
8channel.basicQos(1); // 公平分发,每次只处理一条消息
9channel.basicConsume("work-queue", false, new DefaultConsumer(channel) {
10 @Override
11 public void handleDelivery(String consumerTag, Envelope envelope,
12 AMQP.BasicProperties properties, byte[] body) {
13 try {
14 System.out.println("处理任务: " + new String(body));
15 Thread.sleep(1000); // 模拟耗时操作
16 channel.basicAck(envelope.getDeliveryTag(), false);
17 } catch (Exception e) {
18 channel.basicNack(envelope.getDeliveryTag(), false, true);
19 }
20 }
21});

适用场景:

  • 任务分发
  • 负载均衡
  • 异步处理

3. Publish/Subscribe模式(发布订阅模式)

特点:

  • 使用Fanout Exchange
  • 消息广播到所有绑定的队列
  • 一个消息被多个消费者接收
java
1// 声明Fanout Exchange
2channel.exchangeDeclare("logs", "fanout");
3
4// 生产者
5channel.basicPublish("logs", "", null, message.getBytes());
6
7// 消费者1:保存到数据库
8String queue1 = channel.queueDeclare().getQueue();
9channel.queueBind(queue1, "logs", "");
10channel.basicConsume(queue1, true, (consumerTag, delivery) -> {
11 saveToDatabase(new String(delivery.getBody()));
12});
13
14// 消费者2:写入日志文件
15String queue2 = channel.queueDeclare().getQueue();
16channel.queueBind(queue2, "logs", "");
17channel.basicConsume(queue2, true, (consumerTag, delivery) -> {
18 writeToFile(new String(delivery.getBody()));
19});

适用场景:

  • 日志收集
  • 消息广播
  • 事件通知

4. Routing模式(路由模式)

特点:

  • 使用Direct Exchange
  • 根据routing key精确匹配
  • 选择性接收消息
java
1// 声明Direct Exchange
2channel.exchangeDeclare("direct-logs", "direct");
3
4// 生产者:发送不同级别的日志
5channel.basicPublish("direct-logs", "error", null, "错误日志".getBytes());
6channel.basicPublish("direct-logs", "info", null, "信息日志".getBytes());
7channel.basicPublish("direct-logs", "warning", null, "警告日志".getBytes());
8
9// 消费者1:只接收error日志
10String errorQueue = channel.queueDeclare().getQueue();
11channel.queueBind(errorQueue, "direct-logs", "error");
12channel.basicConsume(errorQueue, true, (consumerTag, delivery) -> {
13 System.out.println("Error: " + new String(delivery.getBody()));
14});
15
16// 消费者2:接收error和warning日志
17String alertQueue = channel.queueDeclare().getQueue();
18channel.queueBind(alertQueue, "direct-logs", "error");
19channel.queueBind(alertQueue, "direct-logs", "warning");
20channel.basicConsume(alertQueue, true, (consumerTag, delivery) -> {
21 sendAlert(new String(delivery.getBody()));
22});

适用场景:

  • 日志分级处理
  • 消息分类
  • 选择性订阅

5. Topics模式(主题模式)

特点:

  • 使用Topic Exchange
  • 支持通配符匹配
  • 更灵活的路由规则
java
1// 声明Topic Exchange
2channel.exchangeDeclare("topic-logs", "topic");
3
4// 生产者:发送不同主题的消息
5channel.basicPublish("topic-logs", "user.order.created", null, msg1);
6channel.basicPublish("topic-logs", "user.profile.updated", null, msg2);
7channel.basicPublish("topic-logs", "system.error.database", null, msg3);
8
9// 消费者1:接收所有user相关消息
10String userQueue = channel.queueDeclare().getQueue();
11channel.queueBind(userQueue, "topic-logs", "user.#");
12
13// 消费者2:接收所有error消息
14String errorQueue = channel.queueDeclare().getQueue();
15channel.queueBind(errorQueue, "topic-logs", "*.error.*");
16
17// 消费者3:接收所有created消息
18String createdQueue = channel.queueDeclare().getQueue();
19channel.queueBind(createdQueue, "topic-logs", "*.*.created");

通配符规则:

  • *:匹配一个单词
  • #:匹配零个或多个单词

适用场景:

  • 复杂的消息路由
  • 多维度消息分类
  • 灵活的订阅规则

6. RPC模式(远程过程调用模式)

特点:

  • 客户端发送请求,等待响应
  • 使用reply_to和correlation_id
  • 实现同步调用
java
1// RPC客户端
2public String call(String message) throws Exception {
3 // 创建回调队列
4 String replyQueueName = channel.queueDeclare().getQueue();
5 String corrId = UUID.randomUUID().toString();
6
7 // 设置属性
8 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
9 .correlationId(corrId)
10 .replyTo(replyQueueName)
11 .build();
12
13 // 发送请求
14 channel.basicPublish("", "rpc_queue", props, message.getBytes());
15
16 // 等待响应
17 BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
18
19 channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
20 if (delivery.getProperties().getCorrelationId().equals(corrId)) {
21 response.offer(new String(delivery.getBody()));
22 }
23 });
24
25 return response.take();
26}
27
28// RPC服务端
29channel.basicConsume("rpc_queue", false, (consumerTag, delivery) -> {
30 String message = new String(delivery.getBody());
31
32 // 处理请求
33 String result = processRequest(message);
34
35 // 发送响应
36 AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
37 .correlationId(delivery.getProperties().getCorrelationId())
38 .build();
39
40 channel.basicPublish("", delivery.getProperties().getReplyTo(),
41 replyProps, result.getBytes());
42
43 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
44});

适用场景:

  • 远程方法调用
  • 同步请求响应
  • 微服务间通信

7. 模式对比

模式Exchange类型特点适用场景
Simple默认点对点简单通信
Work Queue默认多消费者负载均衡任务分发
Pub/SubFanout广播消息广播
RoutingDirect精确匹配消息分类
TopicsTopic通配符匹配复杂路由
RPC默认请求响应同步调用

最佳实践:

  • 根据业务场景选择合适的模式
  • Work Queue模式使用basicQos实现公平分发
  • Pub/Sub模式使用临时队列
  • Topics模式合理设计routing key层次
  • RPC模式设置超时时间

25. 说说 RabbitMQ 的集群模式?

答案:

RabbitMQ提供了多种集群模式来实现高可用和负载均衡。

1. 普通集群模式(默认模式)

特点:

  • 元数据在所有节点同步(Exchange、Queue定义)
  • 消息只存储在声明队列的节点上
  • 其他节点只存储队列的元数据指针
  • 消费时需要从存储节点拉取消息

架构:

1Node1: Queue A (数据) + Queue B (元数据)
2Node2: Queue B (数据) + Queue A (元数据)
3Node3: Queue A (元数据) + Queue B (元数据)

配置示例:

bash
1# Node1
2rabbitmq-server -detached
3
4# Node2加入集群
5rabbitmqctl stop_app
6rabbitmqctl join_cluster rabbit@node1
7rabbitmqctl start_app
8
9# Node3加入集群
10rabbitmqctl stop_app
11rabbitmqctl join_cluster rabbit@node1
12rabbitmqctl start_app
13
14# 查看集群状态
15rabbitmqctl cluster_status

优点:

  • 配置简单
  • 节省存储空间
  • 提高吞吐量

缺点:

  • 队列所在节点宕机,消息丢失
  • 跨节点消费性能下降
  • 不是真正的高可用

2. 镜像队列模式(高可用模式)

特点:

  • 队列在多个节点上有副本
  • 主队列(Master)和镜像队列(Mirror)
  • 消息在所有镜像节点同步
  • Master宕机,自动选举新Master

配置方式1:通过Policy配置

bash
1# 设置所有队列镜像到所有节点
2rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
3
4# 设置镜像到2个节点
5rabbitmqctl set_policy ha-two "^" '{"ha-mode":"exactly","ha-params":2}'
6
7# 设置镜像到指定节点
8rabbitmqctl set_policy ha-nodes "^" \
9 '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}'

配置方式2:通过管理界面

1Admin -> Policies -> Add Policy
2- Name: ha-all
3- Pattern: ^
4- Apply to: Queues
5- Definition: ha-mode = all

配置方式3:通过代码

java
1Map<String, Object> args = new HashMap<>();
2args.put("x-ha-policy", "all");
3channel.queueDeclare("ha-queue", true, false, false, args);

同步模式配置:

bash
1# 自动同步(推荐)
2rabbitmqctl set_policy ha-all "^" \
3 '{"ha-mode":"all","ha-sync-mode":"automatic"}'
4
5# 手动同步
6rabbitmqctl sync_queue queue_name

工作原理:

11. 消息发送到Master队列
22. Master同步消息到所有Mirror
33. 所有Mirror确认后,Master返回确认
44. Master宕机,选举新Master
55. 新Master继续提供服务

优点:

  • 真正的高可用
  • 数据不丢失
  • 自动故障转移

缺点:

  • 性能开销大
  • 网络带宽消耗高
  • 不适合大量消息

3. 仲裁队列模式(Quorum Queue)

特点:

  • RabbitMQ 3.8+引入
  • 基于Raft协议
  • 更好的数据安全性
  • 推荐的高可用方案

声明仲裁队列:

java
1Map<String, Object> args = new HashMap<>();
2args.put("x-queue-type", "quorum");
3channel.queueDeclare("quorum-queue", true, false, false, args);

通过Policy配置:

bash
1rabbitmqctl set_policy quorum-policy "^quorum\." \
2 '{"queue-type":"quorum","delivery-limit":3}'

特性:

  • 自动复制到多个节点(默认3个)
  • 使用Raft协议保证一致性
  • 支持消息重试限制
  • 更好的性能

对比镜像队列:

特性镜像队列仲裁队列
协议自定义Raft
性能一般更好
数据安全更好
内存使用较低
推荐度不推荐推荐

4. Federation插件(联邦模式)

特点:

  • 连接不同的RabbitMQ集群
  • 跨数据中心消息传递
  • 松耦合

配置:

bash
1# 启用插件
2rabbitmq-plugins enable rabbitmq_federation
3rabbitmq-plugins enable rabbitmq_federation_management
4
5# 配置upstream
6rabbitmqctl set_parameter federation-upstream my-upstream \
7 '{"uri":"amqp://remote-server","expires":3600000}'
8
9# 配置policy
10rabbitmqctl set_policy federate-me "^federated\." \
11 '{"federation-upstream-set":"all"}'

适用场景:

  • 跨地域部署
  • 数据中心互联
  • 灾备方案

5. Shovel插件(铲子模式)

特点:

  • 在不同集群间转发消息
  • 更灵活的消息路由
  • 支持消息转换

配置:

bash
1rabbitmq-plugins enable rabbitmq_shovel
2rabbitmq-plugins enable rabbitmq_shovel_management
3
4rabbitmqctl set_parameter shovel my-shovel \
5 '{"src-uri":"amqp://","src-queue":"source-queue",
6 "dest-uri":"amqp://remote","dest-queue":"dest-queue"}'

6. 负载均衡方案

使用HAProxy:

1# haproxy.cfg
2listen rabbitmq
3 bind *:5672
4 mode tcp
5 balance roundrobin
6 server rabbit1 node1:5672 check inter 5s rise 2 fall 3
7 server rabbit2 node2:5672 check inter 5s rise 2 fall 3
8 server rabbit3 node3:5672 check inter 5s rise 2 fall 3

使用Nginx:

nginx
1stream {
2 upstream rabbitmq {
3 server node1:5672;
4 server node2:5672;
5 server node3:5672;
6 }
7
8 server {
9 listen 5672;
10 proxy_pass rabbitmq;
11 }
12}

客户端配置:

java
1ConnectionFactory factory = new ConnectionFactory();
2factory.setAddresses(new Address[] {
3 new Address("node1", 5672),
4 new Address("node2", 5672),
5 new Address("node3", 5672)
6});
7factory.setAutomaticRecoveryEnabled(true);

7. 最佳实践

集群规划:

  • 至少3个节点(奇数个)
  • 使用仲裁队列替代镜像队列
  • 配置自动恢复机制
  • 监控集群状态

性能优化:

  • 合理设置镜像节点数量
  • 使用SSD存储
  • 优化网络配置
  • 限制队列长度

高可用配置:

java
1ConnectionFactory factory = new ConnectionFactory();
2factory.setAutomaticRecoveryEnabled(true);
3factory.setNetworkRecoveryInterval(10000);
4factory.setRequestedHeartbeat(60);
5factory.setConnectionTimeout(30000);

26. 为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?而选择自己实现 NameServer?

答案:

RocketMQ选择自研NameServer而非ZooKeeper,主要基于以下考虑:

1. 架构简单性

NameServer特点:

  • 无状态设计
  • 节点之间不通信
  • 不需要选举
  • 部署简单

ZooKeeper特点:

  • 有状态设计
  • 需要ZAB协议保证一致性
  • 需要选举Leader
  • 部署复杂
java
1// NameServer启动非常简单
2public class NamesrvStartup {
3 public static void main(String[] args) {
4 NamesrvController controller = createNamesrvController(args);
5 start(controller);
6 }
7}

2. 性能考虑

NameServer优势:

  • 纯内存操作
  • 无磁盘IO
  • 响应速度快(毫秒级)
  • 支持高并发

ZooKeeper限制:

  • 需要持久化到磁盘
  • 写入性能有限(约1000-2000 TPS)
  • 响应相对较慢
  • 大量监听器影响性能

性能对比:

1NameServer:
2- 路由查询: <1ms
3- 注册更新: <5ms
4- 无磁盘IO
5
6ZooKeeper:
7- 读取: 几毫秒
8- 写入: 10-50ms
9- 需要磁盘IO

3. 功能需求匹配

RocketMQ的需求:

  • 路由信息查询(读多写少)
  • 最终一致性即可
  • 不需要强一致性
  • 允许短暂不一致

NameServer设计:

java
1// 路由信息定期更新,允许短暂不一致
2public class RouteInfoManager {
3 // Broker每30秒发送心跳
4 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
5
6 // 路由信息表
7 private final HashMap<String, List<QueueData>> topicQueueTable;
8 private final HashMap<String, BrokerData> brokerAddrTable;
9
10 // 注册Broker
11 public void registerBroker(String brokerName, String brokerAddr) {
12 // 直接更新内存,不需要强一致性保证
13 this.brokerAddrTable.put(brokerName, brokerData);
14 }
15}

ZooKeeper提供:

  • 强一致性(CP)
  • 分布式锁
  • 选举功能
  • 对RocketMQ来说是过度设计

4. 可用性设计

NameServer高可用:

  • 多个NameServer节点独立运行
  • 客户端连接任意可用节点
  • 某个节点宕机不影响其他节点
  • 无单点故障
java
1// 客户端配置多个NameServer地址
2producer.setNamesrvAddr("192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876");
3
4// 客户端自动切换
5public class MQClientInstance {
6 public void updateNameServerAddressList(String addrs) {
7 String[] addrArray = addrs.split(";");
8 // 随机选择一个NameServer
9 Collections.shuffle(Arrays.asList(addrArray));
10 }
11}

ZooKeeper高可用:

  • 需要过半节点存活
  • Leader宕机需要重新选举
  • 选举期间不可用
  • 有单点风险

5. 运维成本

NameServer优势:

  • 部署简单,启动即可
  • 无需配置集群关系
  • 无需调优参数
  • 故障恢复快

ZooKeeper劣势:

  • 需要配置集群
  • 需要调优(内存、GC等)
  • 需要监控选举状态
  • 运维复杂

6. 依赖管理

NameServer:

  • RocketMQ自带组件
  • 版本统一管理
  • 无外部依赖

ZooKeeper:

  • 额外的外部依赖
  • 版本兼容性问题
  • 增加系统复杂度

7. NameServer工作原理

注册流程:

java
1// Broker启动时向所有NameServer注册
2public class BrokerController {
3 public void registerBrokerAll() {
4 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
5
6 for (String namesrvAddr : nameServerAddressList) {
7 try {
8 // 向每个NameServer注册
9 this.registerBroker(namesrvAddr);
10 } catch (Exception e) {
11 log.warn("registerBroker Exception", e);
12 }
13 }
14 }
15
16 // 每30秒发送心跳
17 this.scheduledExecutorService.scheduleAtFixedRate(() -> {
18 this.registerBrokerAll();
19 }, 10, 30, TimeUnit.SECONDS);
20}

路由发现:

java
1// Producer/Consumer从NameServer获取路由信息
2public class MQClientInstance {
3 public TopicRouteData getTopicRouteInfoFromNameServer(String topic) {
4 // 从任意一个NameServer获取
5 for (String addr : this.remotingClient.getNameServerAddressList()) {
6 try {
7 return this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, addr);
8 } catch (Exception e) {
9 // 失败则尝试下一个
10 continue;
11 }
12 }
13 return null;
14 }
15
16 // 定期更新路由信息(默认30秒)
17 this.scheduledExecutorService.scheduleAtFixedRate(() -> {
18 this.updateTopicRouteInfoFromNameServer();
19 }, 10, 30, TimeUnit.SECONDS);
20}

8. 数据一致性处理

最终一致性模型:

java
1// NameServer之间不通信,通过Broker心跳实现最终一致
2// 1. Broker向所有NameServer发送心跳
3// 2. 每个NameServer独立维护路由表
4// 3. 短暂不一致可以接受(最多30秒)
5
6// 客户端容错机制
7public class DefaultMQProducerImpl {
8 private SendResult sendDefaultImpl(Message msg) {
9 // 获取路由信息
10 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
11
12 // 选择队列发送
13 MessageQueue mq = selectOneMessageQueue(topicPublishInfo);
14
15 // 发送失败自动重试其他Broker
16 for (int times = 0; times < retryTimes; times++) {
17 try {
18 return this.sendKernelImpl(msg, mq);
19 } catch (Exception e) {
20 // 选择其他队列重试
21 mq = selectOneMessageQueue(topicPublishInfo);
22 }
23 }
24 }
25}

9. 对比总结

特性NameServerZooKeeper
架构无状态有状态
一致性最终一致强一致(CP)
性能中等
复杂度简单复杂
运维成本
依赖外部依赖
适用场景路由注册配置管理、分布式锁

10. 最佳实践

NameServer部署:

  • 至少部署2个节点
  • 推荐3个节点
  • 节点分布在不同机器
  • 配置自动重启

配置示例:

properties
1# NameServer配置
2listenPort=9876
3# 路由信息清理间隔(默认10秒)
4scanNotActiveBrokerInterval=10000

客户端配置:

java
1// 配置多个NameServer地址
2DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
3producer.setNamesrvAddr("192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876");
4
5// 配置重试次数
6producer.setRetryTimesWhenSendFailed(3);

总结: RocketMQ选择自研NameServer是基于实际需求的权衡,追求简单、高效、易运维,而非盲目使用ZooKeeper。这体现了"合适的才是最好的"设计理念。

27. 说一下 Kafka 为什么性能高?

答案:

Kafka的高性能是多种技术手段综合作用的结果。

1. 顺序写磁盘

原理:

  • 消息追加到日志文件末尾
  • 避免随机IO
  • 顺序写性能接近内存

性能对比:

1顺序写磁盘: 600MB/s
2随机写磁盘: 100KB/s
3顺序写内存: 几GB/s

实现:

java
1// Kafka日志追加
2public class Log {
3 public void append(MemoryRecords records) {
4 // 追加到活跃段文件末尾
5 segment.append(records);
6 // 顺序写,性能极高
7 }
8}

2. 零拷贝技术(Zero-Copy)

传统IO流程:

1磁盘 -> 内核缓冲区 -> 用户缓冲区 -> Socket缓冲区 -> 网卡
2(4次拷贝,4次上下文切换)

零拷贝流程:

1磁盘 -> 内核缓冲区 -> 网卡
2(2次拷贝,2次上下文切换)

实现:

java
1// 使用FileChannel.transferTo实现零拷贝
2public long writeTo(GatheringByteChannel channel, long position, int length) {
3 return fileChannel.transferTo(position, length, channel);
4}
5
6// sendfile系统调用
7// 数据直接从文件传输到Socket,不经过用户空间

性能提升:

  • 减少CPU拷贝
  • 减少上下文切换
  • 提升2-3倍吞吐量

3. 页缓存(Page Cache)

原理:

  • 利用操作系统页缓存
  • 写入时先写入页缓存
  • 读取时优先从页缓存读取
  • 操作系统负责刷盘

优势:

java
1// Kafka不维护自己的缓存
2// 充分利用OS的页缓存
3// 1. 避免GC压力
4// 2. 进程重启数据不丢失
5// 3. 自动预读和后写

效果:

  • 写入:内存速度
  • 读取:大部分命中缓存
  • 减少磁盘IO

4. 批量处理

生产者批量发送:

java
1// 配置批量大小
2props.put("batch.size", 16384); // 16KB
3props.put("linger.ms", 10); // 等待10ms
4
5// 消息累积到batch.size或linger.ms后批量发送
6public class RecordAccumulator {
7 public RecordAppendResult append(TopicPartition tp, Record record) {
8 // 追加到批次
9 Deque<ProducerBatch> dq = getOrCreateDeque(tp);
10 ProducerBatch last = dq.peekLast();
11
12 if (last != null) {
13 // 追加到现有批次
14 FutureRecordMetadata future = last.tryAppend(record);
15 if (future != null) {
16 return new RecordAppendResult(future, false);
17 }
18 }
19
20 // 创建新批次
21 ProducerBatch batch = new ProducerBatch(tp, records, now);
22 dq.addLast(batch);
23 }
24}

消费者批量拉取:

java
1// 配置每次拉取的数据量
2props.put("fetch.min.bytes", 1024); // 最小1KB
3props.put("fetch.max.bytes", 52428800); // 最大50MB
4props.put("max.poll.records", 500); // 最多500条
5
6// 一次拉取多条消息
7ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

批量写入磁盘:

java
1// 批量刷盘
2public void flush() {
3 // 批量写入多条消息
4 fileChannel.write(buffers);
5}

5. 数据压缩

支持的压缩算法:

java
1// 生产者配置压缩
2props.put("compression.type", "lz4"); // gzip, snappy, lz4, zstd
3
4// 压缩比对比
5// gzip: 压缩比最高,CPU消耗大
6// snappy: 平衡
7// lz4: 速度最快
8// zstd: 新算法,综合最优

效果:

  • 减少网络传输
  • 减少磁盘存储
  • 提升吞吐量

6. 分区并行

分区设计:

java
1// Topic分为多个分区
2// 每个分区独立读写
3// 并行处理提升吞吐量
4
5// 创建多分区Topic
6kafka-topics.sh --create \
7 --topic my-topic \
8 --partitions 10 \
9 --replication-factor 3

并行写入:

java
1// 多个生产者并行写入不同分区
2producer.send(new ProducerRecord<>("topic", partition, key, value));

并行消费:

java
1// 消费者组内多个消费者并行消费不同分区
2// 分区数 = 最大并行度

7. 高效的数据结构

日志段(Log Segment):

java
1// 每个分区由多个段文件组成
200000000000000000000.log // 第一个段
300000000000000368769.log // 第二个段
400000000000000737337.log // 第三个段
5
6// 优势:
7// 1. 快速定位消息
8// 2. 方便清理过期数据
9// 3. 支持并行写入

索引文件:

java
1// 偏移量索引
200000000000000000000.index
3// 时间戳索引
400000000000000000000.timeindex
5
6// 稀疏索引,快速定位
7public class OffsetIndex {
8 // 每隔4KB记录一个索引项
9 private final int indexIntervalBytes = 4096;
10
11 public OffsetPosition lookup(long targetOffset) {
12 // 二分查找
13 int slot = indexSlotFor(targetOffset);
14 return parseEntry(slot);
15 }
16}

8. 网络模型优化

Reactor模式:

scala
1// Acceptor线程接受连接
2// Processor线程处理网络IO
3// Handler线程处理业务逻辑
4
5class SocketServer {
6 val processors = new Array[Processor](numProcessors)
7 val acceptor = new Acceptor(...)
8
9 // 多线程并行处理
10 processors.foreach(_.start())
11 acceptor.start()
12}

NIO非阻塞:

java
1// 使用Java NIO
2Selector selector = Selector.open();
3channel.register(selector, SelectionKey.OP_READ);
4
5// 单线程处理多个连接
6while (true) {
7 selector.select();
8 Set<SelectionKey> keys = selector.selectedKeys();
9 // 处理就绪的连接
10}

9. 内存管理优化

内存池:

java
1// BufferPool复用ByteBuffer
2public class BufferPool {
3 private final Deque<ByteBuffer> free;
4
5 public ByteBuffer allocate(int size) {
6 // 从池中获取,避免频繁分配
7 ByteBuffer buffer = free.pollFirst();
8 if (buffer == null) {
9 buffer = ByteBuffer.allocate(size);
10 }
11 return buffer;
12 }
13
14 public void deallocate(ByteBuffer buffer) {
15 // 归还到池中
16 free.add(buffer);
17 }
18}

避免GC:

  • 使用堆外内存
  • 复用对象
  • 减少临时对象创建

10. 配置优化

生产者配置:

properties
1# 批量大小
2batch.size=16384
3linger.ms=10
4
5# 压缩
6compression.type=lz4
7
8# 缓冲区
9buffer.memory=33554432
10
11# 并发请求
12max.in.flight.requests.per.connection=5

Broker配置:

properties
1# 网络线程
2num.network.threads=8
3
4# IO线程
5num.io.threads=8
6
7# 日志刷盘策略
8log.flush.interval.messages=10000
9log.flush.interval.ms=1000
10
11# 副本拉取线程
12num.replica.fetchers=4

消费者配置:

properties
1# 拉取大小
2fetch.min.bytes=1024
3fetch.max.bytes=52428800
4max.poll.records=500
5
6# 并行度(分区数)

11. 性能数据

吞吐量:

  • 单机:几十万到百万TPS
  • 集群:千万级TPS

延迟:

  • 端到端延迟:几毫秒到几十毫秒
  • 99分位延迟:小于100ms

总结: Kafka的高性能来自于:

  1. 顺序写磁盘
  2. 零拷贝技术
  3. 页缓存利用
  4. 批量处理
  5. 数据压缩
  6. 分区并行
  7. 高效数据结构
  8. 网络模型优化
  9. 内存管理
  10. 合理配置

28. RabbitMQ 怎么实现延迟队列?

答案:

RabbitMQ实现延迟队列主要有两种方式:TTL+死信队列和延迟插件。

1. TTL + 死信队列(推荐方案)

原理:

  • 消息设置TTL过期时间
  • 过期后进入死信交换机
  • 死信队列的消费者处理延迟消息

实现步骤:

步骤1:声明死信交换机和队列

java
1// 1. 声明死信交换机
2channel.exchangeDeclare("dlx.exchange", "direct", true);
3
4// 2. 声明死信队列(实际业务队列)
5channel.queueDeclare("dlx.queue", true, false, false, null);
6
7// 3. 绑定
8channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing.key");

步骤2:声明延迟队列

java
1// 声明延迟队列,配置死信交换机
2Map<String, Object> args = new HashMap<>();
3args.put("x-dead-letter-exchange", "dlx.exchange");
4args.put("x-dead-letter-routing-key", "dlx.routing.key");
5args.put("x-message-ttl", 30000); // 30秒延迟
6
7channel.queueDeclare("delay.queue", true, false, false, args);

步骤3:发送延迟消息

java
1// 发送消息到延迟队列
2channel.basicPublish("", "delay.queue", null, message.getBytes());
3
4// 消息30秒后自动进入dlx.queue

步骤4:消费延迟消息

java
1// 消费死信队列(实际业务队列)
2channel.basicConsume("dlx.queue", false, new DefaultConsumer(channel) {
3 @Override
4 public void handleDelivery(String consumerTag, Envelope envelope,
5 AMQP.BasicProperties properties, byte[] body) {
6 System.out.println("收到延迟消息: " + new String(body));
7 channel.basicAck(envelope.getDeliveryTag(), false);
8 }
9});

2. 支持多种延迟时间

方案:为每种延迟时间创建一个队列

java
1// 延迟队列工厂
2public class DelayQueueFactory {
3
4 // 创建延迟队列
5 public void createDelayQueue(int delaySeconds) {
6 String queueName = "delay.queue." + delaySeconds;
7
8 Map<String, Object> args = new HashMap<>();
9 args.put("x-dead-letter-exchange", "dlx.exchange");
10 args.put("x-dead-letter-routing-key", "dlx.routing.key");
11 args.put("x-message-ttl", delaySeconds * 1000);
12
13 channel.queueDeclare(queueName, true, false, false, args);
14 }
15
16 // 发送延迟消息
17 public void sendDelayMessage(String message, int delaySeconds) {
18 String queueName = "delay.queue." + delaySeconds;
19 channel.basicPublish("", queueName, null, message.getBytes());
20 }
21}
22
23// 使用
24factory.createDelayQueue(10); // 10秒延迟队列
25factory.createDelayQueue(30); // 30秒延迟队列
26factory.createDelayQueue(60); // 60秒延迟队列
27
28factory.sendDelayMessage("10秒后执行", 10);
29factory.sendDelayMessage("30秒后执行", 30);

3. 消息级别TTL(灵活延迟)

设置消息TTL:

java
1// 每条消息设置不同的TTL
2AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
3 .expiration(String.valueOf(delayMillis)) // 消息级别TTL
4 .build();
5
6channel.basicPublish("", "delay.queue", properties, message.getBytes());

注意事项:

  • 消息级别TTL可能导致队头阻塞
  • 如果队头消息未过期,后面的消息即使过期也不会被处理
  • 建议使用队列级别TTL

4. 延迟插件方案(rabbitmq_delayed_message_exchange)

安装插件:

bash
1# 下载插件
2wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
3
4# 复制到插件目录
5cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/plugins/
6
7# 启用插件
8rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用插件:

java
1// 1. 声明延迟交换机
2Map<String, Object> args = new HashMap<>();
3args.put("x-delayed-type", "direct");
4
5channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);
6
7// 2. 声明队列并绑定
8channel.queueDeclare("delayed.queue", true, false, false, null);
9channel.queueBind("delayed.queue", "delayed.exchange", "delayed.routing.key");
10
11// 3. 发送延迟消息
12Map<String, Object> headers = new HashMap<>();
13headers.put("x-delay", 30000); // 延迟30秒
14
15AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
16 .headers(headers)
17 .build();
18
19channel.basicPublish("delayed.exchange", "delayed.routing.key", properties, message.getBytes());
20
21// 4. 消费消息
22channel.basicConsume("delayed.queue", false, new DefaultConsumer(channel) {
23 @Override
24 public void handleDelivery(String consumerTag, Envelope envelope,
25 AMQP.BasicProperties properties, byte[] body) {
26 System.out.println("收到延迟消息: " + new String(body));
27 channel.basicAck(envelope.getDeliveryTag(), false);
28 }
29});

插件优势:

  • 支持任意延迟时间
  • 不会队头阻塞
  • 使用简单

插件劣势:

  • 需要安装插件
  • 性能相对较低
  • 延迟消息存储在内存中

5. Spring Boot集成

配置类:

java
1@Configuration
2public class DelayQueueConfig {
3
4 // 死信交换机
5 @Bean
6 public DirectExchange dlxExchange() {
7 return new DirectExchange("dlx.exchange");
8 }
9
10 // 死信队列
11 @Bean
12 public Queue dlxQueue() {
13 return new Queue("dlx.queue");
14 }
15
16 // 绑定
17 @Bean
18 public Binding dlxBinding() {
19 return BindingBuilder.bind(dlxQueue())
20 .to(dlxExchange())
21 .with("dlx.routing.key");
22 }
23
24 // 延迟队列
25 @Bean
26 public Queue delayQueue() {
27 Map<String, Object> args = new HashMap<>();
28 args.put("x-dead-letter-exchange", "dlx.exchange");
29 args.put("x-dead-letter-routing-key", "dlx.routing.key");
30 args.put("x-message-ttl", 30000);
31 return new Queue("delay.queue", true, false, false, args);
32 }
33}

生产者:

java
1@Service
2public class DelayMessageProducer {
3
4 @Autowired
5 private RabbitTemplate rabbitTemplate;
6
7 public void sendDelayMessage(String message, int delaySeconds) {
8 MessageProperties properties = new MessageProperties();
9 properties.setExpiration(String.valueOf(delaySeconds * 1000));
10
11 Message msg = new Message(message.getBytes(), properties);
12 rabbitTemplate.send("", "delay.queue", msg);
13 }
14}

消费者:

java
1@Component
2public class DelayMessageConsumer {
3
4 @RabbitListener(queues = "dlx.queue")
5 public void handleDelayMessage(String message) {
6 System.out.println("收到延迟消息: " + message);
7 // 处理业务逻辑
8 }
9}

6. 应用场景

订单超时取消:

java
1// 创建订单后发送30分钟延迟消息
2public void createOrder(Order order) {
3 // 保存订单
4 orderRepository.save(order);
5
6 // 发送延迟消息
7 delayProducer.sendDelayMessage(order.getId(), 30 * 60);
8}
9
10// 30分钟后检查订单状态
11@RabbitListener(queues = "dlx.queue")
12public void checkOrderStatus(String orderId) {
13 Order order = orderRepository.findById(orderId);
14 if (order.getStatus() == OrderStatus.UNPAID) {
15 // 取消订单
16 order.setStatus(OrderStatus.CANCELLED);
17 orderRepository.save(order);
18 }
19}

定时任务:

java
1// 发送定时提醒
2public void scheduleReminder(String userId, String content, int delaySeconds) {
3 ReminderMessage message = new ReminderMessage(userId, content);
4 delayProducer.sendDelayMessage(JSON.toJSONString(message), delaySeconds);
5}

7. 方案对比

方案优点缺点适用场景
TTL+死信无需插件,稳定固定延迟时间固定延迟场景
消息TTL灵活队头阻塞不推荐
延迟插件灵活,无阻塞需要插件任意延迟场景

最佳实践:

  • 固定延迟时间:使用TTL+死信队列
  • 任意延迟时间:使用延迟插件
  • 监控延迟队列长度
  • 设置合理的TTL
  • 处理消息幂等性

29. RocketMQ 的延迟消息是怎么实现的?

答案:

RocketMQ通过内部延迟队列机制实现延迟消息,支持18个预设延迟级别。

实现原理:

  • 延迟消息先发送到内部Topic(SCHEDULE_TOPIC_XXXX)
  • 定时任务扫描到期消息
  • 将到期消息投递到目标Topic

使用示例:

java
1Message message = new Message("TopicTest", "Hello".getBytes());
2message.setDelayTimeLevel(3); // level 3 = 10秒延迟
3producer.send(message);

延迟级别:

11s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

30. RocketMQ 的开发参考了 Kafka,那两者在架构和功能上有什么区别?

答案:

核心差异对比:

特性KafkaRocketMQ
设计目标日志收集、流处理业务消息传递
注册中心ZooKeeper/KRaftNameServer
存储模型分区独立存储统一CommitLog
消息过滤客户端过滤服务端过滤(Tag/SQL)
延迟消息不支持支持18个延迟级别
事务消息支持Exactly Once支持分布式事务
消息重试需自己实现内置重试机制
死信队列需自己实现内置死信队列
吞吐量极高(百万级)高(十万级)

选型建议:

  • 选Kafka:日志收集、大数据处理、高吞吐量场景
  • 选RocketMQ:业务消息、需要事务/延迟消息、金融级应用

31. 说一下 RocketMQ 中关于事务消息的实现?

答案:

RocketMQ事务消息通过两阶段提交和事务回查机制实现分布式事务。

实现流程:

11. 发送Half消息(预提交)
22. 执行本地事务
33. 提交或回滚消息
44. 超时则Broker回查事务状态

代码示例:

java
1// 1. 创建事务生产者
2TransactionMQProducer producer = new TransactionMQProducer("group");
3producer.setTransactionListener(new TransactionListenerImpl());
4
5// 2. 实现事务监听器
6public class TransactionListenerImpl implements TransactionListener {
7 @Override
8 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
9 try {
10 // 执行本地事务
11 orderService.createOrder((Order) arg);
12 return LocalTransactionState.COMMIT_MESSAGE;
13 } catch (Exception e) {
14 return LocalTransactionState.ROLLBACK_MESSAGE;
15 }
16 }
17
18 @Override
19 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
20 // 事务回查
21 String status = transactionLogService.getStatus(msg.getTransactionId());
22 return "COMMIT".equals(status) ?
23 LocalTransactionState.COMMIT_MESSAGE :
24 LocalTransactionState.ROLLBACK_MESSAGE;
25 }
26}
27
28// 3. 发送事务消息
29Message message = new Message("OrderTopic", order.toBytes());
30producer.sendMessageInTransaction(message, order);

最佳实践:

  • 本地事务要快速执行
  • 记录事务日志用于回查
  • 下游消费要幂等处理

学习指南

核心要点:

  • 消息队列的基本概念和模型
  • 主流MQ产品的架构和特性
  • 消息可靠性和一致性保证
  • 消息队列在分布式系统中的应用

学习路径建议:

  1. 掌握消息队列的基本概念
  2. 深入理解主流MQ产品的特性
  3. 熟悉消息队列的高可用方案
  4. 学习消息队列在实际项目中的应用
forum

评论区 / Comments