Java 消息队列面试题集
总题数: 31道 | 重点领域: MQ原理、RabbitMQ、RocketMQ、Kafka | 难度分布: 中高级
本文档整理了 Java 消息队列的完整31道面试题目,涵盖消息队列原理、主流MQ产品特性等各个方面。
面试题目列表
1. 什么是消息队列?
答案:
消息队列(Message Queue,简称MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息来实现应用程序之间的异步通信。
核心概念:
- 生产者(Producer):负责产生和发送消息到消息队列
- 消费者(Consumer):从消息队列中获取消息并进行处理
- 消息队列(Queue):存储消息的容器,遵循FIFO原则
- 消息代理(Broker):消息队列服务器,负责接收、存储和转发消息
工作原理:
- 生产者将消息发送到消息队列
- 消息队列将消息持久化存储
- 消费者从队列中拉取或被推送消息
- 消费者处理完消息后,向队列发送确认
2. 为什么需要消息队列?
答案:
消息队列主要解决以下几个核心问题:
1. 应用解耦
- 系统间不直接调用,通过消息队列进行通信
- 降低系统间的耦合度,提高系统的可维护性
- 某个服务宕机不会影响其他服务
2. 异步处理
- 将非核心业务异步化,提升系统响应速度
- 例如:用户注册后,发送邮件、短信等操作可以异步处理
- 主流程快速返回,提升用户体验
3. 流量削峰
- 在高并发场景下,通过消息队列缓冲请求
- 防止系统因瞬时流量过大而崩溃
- 消费者按照自己的处理能力消费消息
4. 数据分发
- 一个消息可以被多个消费者消费
- 实现数据的一对多分发
5. 最终一致性
- 通过消息队列实现分布式事务的最终一致性
- 保证数据在各个系统间的一致性
3. 说一下消息队列的模型有哪些?
答案:
消息队列主要有两种消息模型:
1. 点对点模型(Point-to-Point,P2P)
特点:
- 基于队列(Queue)实现
- 一个消息只能被一个消费者消费
- 消息被消费后会从队列中删除
- 支持多个消费者,但每条消息只会被其中一个消费者处理
应用场景:
- 任务分发系统
- 订单处理系统
2. 发布/订阅模型(Publish/Subscribe,Pub/Sub)
特点:
- 基于主题(Topic)实现
- 一个消息可以被多个消费者消费
- 消息被消费后不会立即删除
- 生产者发布消息到主题,订阅该主题的所有消费者都会收到消息
应用场景:
- 消息广播
- 日志收集
- 事件通知
主流MQ对模型的支持:
- RabbitMQ:支持两种模型,通过Exchange类型实现
- Kafka:主要基于发布/订阅模型
- RocketMQ:支持两种模型
4. 简述下消息队列核心的一些术语?
答案:
1. 基础概念
- Producer(生产者):负责生产消息并发送到消息队列
- Consumer(消费者):从消息队列中获取消息并消费
- Broker(消息代理):消息队列服务器,负责接收、存储、转发消息
- Message(消息):生产者和消费者之间传递的数据单元
2. 消息组织
- Queue(队列):存储消息的容器,遵循FIFO原则
- Topic(主题):消息的逻辑分类,用于发布/订阅模式
- Partition(分区):Topic的物理分组,提高并发能力
- Tag(标签):消息的标记,用于消息过滤
3. 消费相关
- Consumer Group(消费者组):多个消费者组成的组,共同消费一个Topic
- Offset(偏移量):消费者在分区中的消费位置
- Acknowledgment(确认):消费者处理完消息后的确认机制
4. 高级特性
- Dead Letter Queue(死信队列):无法被正常消费的消息存放的队列
- Delay Message(延迟消息):指定时间后才能被消费的消息
- Transaction Message(事务消息):支持分布式事务的消息
- Message Filter(消息过滤):根据条件过滤消息
5. 可靠性相关
- Persistence(持久化):将消息存储到磁盘,防止丢失
- Replication(副本):消息的多副本存储,保证高可用
- Retry(重试):消息消费失败后的重试机制
5. 如何保证消息不丢失?
答案:
消息丢失可能发生在三个阶段,需要分别处理:
1. 生产者阶段(消息发送丢失)
解决方案:
- 同步发送 + 确认机制:等待Broker确认后再返回
- 异步发送 + 回调:通过回调函数确认消息是否发送成功
- 失败重试:发送失败时进行重试
- 本地消息表:先保存到本地数据库,发送成功后删除
RabbitMQ:
1// 开启发送确认2channel.confirmSelect();3channel.basicPublish(...);4channel.waitForConfirms(); // 等待确认Kafka:
1// 设置acks=all,等待所有副本确认2props.put("acks", "all");2. Broker阶段(存储丢失)
解决方案:
- 消息持久化:将消息写入磁盘
- 主从复制:多副本机制,防止单点故障
- 刷盘策略:同步刷盘(可靠但慢)vs 异步刷盘(快但可能丢失)
RabbitMQ:
1// 队列持久化2channel.queueDeclare(queueName, true, false, false, null);3// 消息持久化4channel.basicPublish("", queueName, 5 MessageProperties.PERSISTENT_TEXT_PLAIN, message);RocketMQ:
1// 同步刷盘2brokerConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);Kafka:
1# 副本数量2replication.factor=33# 最小同步副本数4min.insync.replicas=23. 消费者阶段(消费丢失)
解决方案:
- 手动确认(ACK):消费成功后再确认
- 关闭自动提交:防止消息未处理就提交offset
- 消费重试:消费失败时重试
- 死信队列:多次失败后进入死信队列
RabbitMQ:
1// 手动确认2channel.basicConsume(queueName, false, consumer);3// 处理完成后确认4channel.basicAck(deliveryTag, false);Kafka:
1// 关闭自动提交2props.put("enable.auto.commit", "false");3// 手动提交4consumer.commitSync();综合方案:
- 生产者:使用确认机制 + 重试
- Broker:持久化 + 多副本
- 消费者:手动确认 + 消费重试
- 监控告警:及时发现问题
6. 如何处理重复消息?
答案:
消息重复的原因主要有:网络抖动、消费者宕机重启、消息重试等。解决重复消息的核心思想是幂等性。
1. 业务层面保证幂等性
数据库唯一约束:
1// 使用唯一索引防止重复插入2CREATE UNIQUE INDEX idx_order_id ON orders(order_id);分布式锁:
1// 使用Redis分布式锁2String lockKey = "lock:order:" + orderId;3if (redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS)) {4 try {5 // 处理业务逻辑6 processOrder(orderId);7 } finally {8 redisTemplate.delete(lockKey);9 }10}2. 消息去重表
1// 使用消息ID作为唯一键2@Transactional3public void handleMessage(String messageId, String content) {4 // 检查消息是否已处理5 if (messageRecordMapper.exists(messageId)) {6 return; // 已处理,直接返回7 }8 9 // 处理业务逻辑10 processBusiness(content);11 12 // 记录消息已处理13 messageRecordMapper.insert(messageId);14}3. 全局唯一ID
1// 使用雪花算法生成全局唯一ID2long messageId = snowflakeIdGenerator.nextId();3message.setKey(String.valueOf(messageId));4. 状态机机制
1// 订单状态流转2public void updateOrderStatus(String orderId, OrderStatus newStatus) {3 Order order = orderMapper.selectById(orderId);4 5 // 只有在特定状态才能流转6 if (order.getStatus() == OrderStatus.CREATED && 7 newStatus == OrderStatus.PAID) {8 orderMapper.updateStatus(orderId, newStatus);9 }10}5. Token机制
1// 生成唯一token2String token = UUID.randomUUID().toString();3redisTemplate.opsForValue().set("token:" + token, "1", 5, TimeUnit.MINUTES);45// 消费时验证token6if (redisTemplate.delete("token:" + token) > 0) {7 // token存在且删除成功,处理业务8 processBusiness();9}最佳实践:
- 优先使用业务层面的幂等性设计
- 数据库层面使用唯一索引
- 关键业务使用分布式锁
- 记录消息处理状态
7. 如何保证消息的有序性?
答案:
1. 为什么会出现乱序?
- 多个生产者并发发送
- 消息在多个分区/队列中存储
- 多个消费者并发消费
2. Kafka保证有序性
单分区有序:
1// 方案1:指定分区2producer.send(new ProducerRecord<>("topic", partition, key, value));34// 方案2:使用相同的key,会路由到同一分区5producer.send(new ProducerRecord<>("topic", orderId, message));67// 消费者:一个分区只能被消费者组中的一个消费者消费配置要求:
1# 生产者:设置为1,保证消息顺序发送2max.in.flight.requests.per.connection=134# 或使用幂等性生产者(Kafka 1.0+)5enable.idempotence=true3. RocketMQ保证有序性
全局有序:
1// 只创建一个队列2// 只有一个生产者和一个消费者分区有序(推荐):
1// 发送消息时指定MessageQueueSelector2producer.send(message, new MessageQueueSelector() {3 @Override4 public MessageQueue select(List<MessageQueue> mqs, 5 Message msg, Object arg) {6 // 根据订单ID选择队列7 Long orderId = (Long) arg;8 int index = (int) (orderId % mqs.size());9 return mqs.get(index);10 }11}, orderId);1213// 消费者使用MessageListenerOrderly14consumer.registerMessageListener(new MessageListenerOrderly() {15 @Override16 public ConsumeOrderlyStatus consumeMessage(17 List<MessageExt> msgs, ConsumeOrderlyContext context) {18 // 顺序消费19 return ConsumeOrderlyStatus.SUCCESS;20 }21});4. RabbitMQ保证有序性
1// 使用单个队列 + 单个消费者2channel.basicQos(1); // 每次只处理一条消息34// 或使用一致性哈希交换机5Map<String, Object> args = new HashMap<>();6args.put("hash-header", "order-id");7channel.exchangeDeclare("exchange", "x-consistent-hash", true, false, args);5. 业务层面保证有序
使用版本号:
1// 消息携带版本号2public class OrderMessage {3 private String orderId;4 private Integer version;5 private String content;6}78// 消费时检查版本号9if (message.getVersion() > currentVersion) {10 processMessage(message);11 updateVersion(message.getVersion());12}使用时间戳:
1// 只处理最新的消息2if (message.getTimestamp() > lastProcessedTimestamp) {3 processMessage(message);4}最佳实践:
- 使用相同的key/分区键保证相关消息路由到同一分区
- 单分区单消费者保证顺序消费
- 业务层面设计幂等性和版本控制
8. 如何处理消息堆积?
答案:
1. 消息堆积的原因
- 消费者消费速度慢于生产者生产速度
- 消费者出现故障或宕机
- 消费逻辑复杂,处理时间长
- 消费者数量不足
2. 临时解决方案
增加消费者数量:
1// 快速扩容消费者实例2// 注意:消费者数量不能超过分区数(Kafka)或队列数(RocketMQ)提高消费并行度:
1// RocketMQ:增加消费线程数2consumer.setConsumeThreadMin(20);3consumer.setConsumeThreadMax(64);45// Kafka:增加分区数6kafka-topics.sh --alter --topic my-topic --partitions 10批量消费:
1// RocketMQ批量消费2consumer.setConsumeMessageBatchMaxSize(100);34// Kafka批量拉取5props.put("max.poll.records", 500);3. 紧急处理方案
临时队列转储:
1// 创建临时Topic,将消息快速转移2// 使用更多消费者并行处理临时Topic3public void transferMessages() {4 // 从堆积队列消费5 List<Message> messages = consumer.poll();6 7 // 快速转发到临时队列8 for (Message msg : messages) {9 producer.send("temp-topic", msg);10 }11}降级处理:
1// 对非核心消息进行降级处理2if (isNonCriticalMessage(message)) {3 // 简化处理逻辑或直接丢弃4 logAndDiscard(message);5} else {6 // 正常处理7 processMessage(message);8}4. 长期优化方案
优化消费逻辑:
1// 异步处理耗时操作2@Async3public void processTimeConsumingTask(Message message) {4 // 耗时操作5}67// 批量处理数据库操作8public void batchInsert(List<Order> orders) {9 orderMapper.batchInsert(orders);10}合理设置消费参数:
1// Kafka2props.put("fetch.min.bytes", 1024 * 1024); // 1MB3props.put("fetch.max.wait.ms", 500);45// RocketMQ6consumer.setPullBatchSize(32);7consumer.setConsumeMessageBatchMaxSize(10);5. 监控和预警
1// 监控消息堆积数量2long lag = getConsumerLag();3if (lag > threshold) {4 sendAlert("消息堆积告警:" + lag);5}67// Kafka监控8kafka-consumer-groups.sh --describe --group my-group910// RocketMQ监控11mqadmin consumerProgress -g my-group6. 架构优化
- 增加分区/队列数:提高并行度
- 使用消息过滤:减少无效消息
- 分离核心和非核心消息:不同Topic不同优先级
- 引入缓存:减少数据库查询
- 数据库优化:添加索引、优化SQL
最佳实践:
- 提前规划好分区数和消费者数
- 做好监控和告警
- 优化消费逻辑,提高处理速度
- 准备好扩容方案
9. 消息队列设计成推消息还是拉消息?推拉模式的优缺点?
答案:
1. 推模式(Push)
工作原理:
- Broker主动将消息推送给消费者
- 消费者被动接收消息
优点:
- 实时性高,消息到达后立即推送
- 消费者实现简单,无需主动拉取
- 适合消息量少、实时性要求高的场景
缺点:
- Broker需要维护每个消费者的状态
- 难以控制推送速率,可能压垮消费者
- 消费者处理能力不同,难以做到流量控制
适用场景:
- 实时性要求高
- 消息量不大
- 消费者处理能力稳定
代表:RabbitMQ(默认推模式)
2. 拉模式(Pull)
工作原理:
- 消费者主动从Broker拉取消息
- Broker被动等待消费者请求
优点:
- 消费者可以根据自己的处理能力拉取消息
- 支持批量拉取,提高吞吐量
- Broker实现简单,无需维护消费者状态
- 消费者可以控制消费速率
缺点:
- 实时性相对较差
- 消费者需要循环拉取,可能造成空轮询
- 消费者实现相对复杂
适用场景:
- 高吞吐量场景
- 消费者处理能力差异大
- 需要批量处理消息
代表:Kafka、RocketMQ(默认拉模式)
3. Kafka的拉模式实现
1// 消费者主动拉取2while (true) {3 ConsumerRecords<String, String> records = 4 consumer.poll(Duration.ofMillis(100));5 6 for (ConsumerRecord<String, String> record : records) {7 processMessage(record);8 }9}优化空轮询:
1# 长轮询:如果没有数据,Broker会等待一段时间2fetch.min.bytes=13fetch.max.wait.ms=5004. RabbitMQ的推模式实现
1// 推模式2channel.basicConsume(queueName, false, new DefaultConsumer(channel) {3 @Override4 public void handleDelivery(String consumerTag, Envelope envelope,5 AMQP.BasicProperties properties, byte[] body) {6 processMessage(body);7 channel.basicAck(envelope.getDeliveryTag(), false);8 }9});1011// RabbitMQ也支持拉模式12GetResponse response = channel.basicGet(queueName, false);13if (response != null) {14 processMessage(response.getBody());15 channel.basicAck(response.getEnvelope().getDeliveryTag(), false);16}5. 混合模式
RocketMQ的长轮询:
1// 消费者拉取,但Broker会hold住请求2// 如果有新消息立即返回,否则等待一段时间3consumer.setPullTimeoutMillis(30000);对比总结:
| 特性 | 推模式 | 拉模式 |
|---|---|---|
| 实时性 | 高 | 相对较低 |
| 吞吐量 | 较低 | 高 |
| 流量控制 | 困难 | 容易 |
| Broker复杂度 | 高 | 低 |
| 消费者复杂度 | 低 | 高 |
| 批量处理 | 不支持 | 支持 |
最佳实践:
- 高吞吐量场景:选择拉模式(Kafka、RocketMQ)
- 实时性要求高:选择推模式(RabbitMQ)
- 现代MQ:采用长轮询,兼顾实时性和吞吐量
10. RocketMQ 的事务消息有什么缺点?你还了解过别的事务消息实现吗?
答案:
1. RocketMQ事务消息原理
1// 发送事务消息2TransactionMQProducer producer = new TransactionMQProducer("group");3producer.setTransactionListener(new TransactionListener() {4 @Override5 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {6 // 执行本地事务7 try {8 executeLocalBusiness();9 return LocalTransactionState.COMMIT_MESSAGE;10 } catch (Exception e) {11 return LocalTransactionState.ROLLBACK_MESSAGE;12 }13 }14 15 @Override16 public LocalTransactionState checkLocalTransaction(MessageExt msg) {17 // 事务回查18 return checkLocalTransactionStatus();19 }20});2122// 发送半消息23producer.sendMessageInTransaction(message, null);流程:
- 发送半消息(Half Message)到Broker
- 执行本地事务
- 根据本地事务结果提交或回滚消息
- 如果长时间未收到确认,Broker回查事务状态
2. RocketMQ事务消息的缺点
缺点1:只支持单向消息
- 只保证生产者的本地事务和发送消息的一致性
- 不保证消费者的消费和本地事务的一致性
- 消费者仍需自己保证幂等性
缺点2:事务回查的性能开销
- Broker需要定期回查事务状态
- 回查频率过高影响性能
- 需要业务方实现回查接口
缺点3:半消息的存储开销
- 半消息需要单独存储
- 增加了存储成本
缺点4:不支持跨集群事务
- 只能在单个RocketMQ集群内使用
缺点5:实现复杂
- 需要实现事务监听器
- 需要实现事务回查逻辑
- 需要考虑各种异常情况
3. 其他事务消息实现
Kafka事务消息:
1// 生产者配置2props.put("transactional.id", "my-transactional-id");3props.put("enable.idempotence", true);45// 使用事务6producer.initTransactions();7try {8 producer.beginTransaction();9 10 // 发送消息11 producer.send(record1);12 producer.send(record2);13 14 // 执行本地事务15 executeLocalTransaction();16 17 // 提交事务18 producer.commitTransaction();19} catch (Exception e) {20 producer.abortTransaction();21}2223// 消费者配置24props.put("isolation.level", "read_committed");特点:
- 支持跨分区、跨Topic的事务
- 保证Exactly-Once语义
- 支持消费-转换-生产的原子性
- 性能相对较好
缺点:
- 不支持本地事务和消息发送的原子性
- 主要用于消息系统内部的事务
4. 基于本地消息表的事务实现
1@Transactional2public void createOrder(Order order) {3 // 1. 插入订单4 orderMapper.insert(order);5 6 // 2. 插入本地消息表7 LocalMessage message = new LocalMessage();8 message.setContent(JSON.toJSONString(order));9 message.setStatus(MessageStatus.PENDING);10 messageMapper.insert(message);11}1213// 定时任务扫描本地消息表14@Scheduled(fixedDelay = 1000)15public void sendPendingMessages() {16 List<LocalMessage> messages = 17 messageMapper.selectPendingMessages();18 19 for (LocalMessage message : messages) {20 try {21 // 发送消息22 producer.send(message.getContent());23 24 // 更新状态为已发送25 message.setStatus(MessageStatus.SENT);26 messageMapper.update(message);27 } catch (Exception e) {28 // 发送失败,下次继续重试29 }30 }31}优点:
- 实现简单
- 可靠性高
- 支持任何消息队列
缺点:
- 需要额外的表和定时任务
- 有一定延迟
- 需要处理消息表的清理
5. 基于TCC的分布式事务
1// Try阶段:预留资源2public void tryCreateOrder(Order order) {3 // 冻结库存4 inventoryService.freeze(order.getProductId(), order.getQuantity());5 // 发送消息到MQ6 producer.send(createOrderMessage);7}89// Confirm阶段:确认提交10public void confirmCreateOrder(Order order) {11 // 扣减库存12 inventoryService.deduct(order.getProductId(), order.getQuantity());13}1415// Cancel阶段:回滚16public void cancelCreateOrder(Order order) {17 // 释放库存18 inventoryService.unfreeze(order.getProductId(), order.getQuantity());19}对比总结:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| RocketMQ事务消息 | 实现相对简单 | 只支持单向、需要回查 | 生产者事务 |
| Kafka事务 | 性能好、支持跨分区 | 不支持本地事务 | 消息系统内部 |
| 本地消息表 | 可靠性高、通用 | 有延迟、需要额外表 | 通用场景 |
| TCC | 一致性强 | 实现复杂 | 强一致性要求 |
最佳实践:
- 生产者本地事务 + 消息发送:使用RocketMQ事务消息或本地消息表
- 消息系统内部事务:使用Kafka事务
- 强一致性要求:使用TCC或Saga
- 最终一致性:使用可靠消息最终一致性方案
11. 说一下 Kafka 中关于事务消息的实现?
答案:
Kafka从0.11版本开始支持事务消息,主要用于实现Exactly-Once语义。
1. 核心概念
Transaction Coordinator(事务协调器):
- 负责管理事务状态
- 每个生产者都有一个对应的事务协调器
- 存储在__transaction_state内部Topic中
TransactionalId(事务ID):
- 唯一标识一个生产者
- 用于故障恢复和幂等性保证
Producer Epoch:
- 生产者的版本号
- 防止僵尸实例
2. 事务实现原理
1// 配置事务生产者2props.put("transactional.id", "my-transactional-id");3props.put("enable.idempotence", true);4props.put("acks", "all");56KafkaProducer<String, String> producer = new KafkaProducer<>(props);78// 初始化事务9producer.initTransactions();1011try {12 // 开启事务13 producer.beginTransaction();14 15 // 发送消息16 producer.send(new ProducerRecord<>("topic1", "key1", "value1"));17 producer.send(new ProducerRecord<>("topic2", "key2", "value2"));18 19 // 提交事务20 producer.commitTransaction();21} catch (Exception e) {22 // 回滚事务23 producer.abortTransaction();24}3. 事务流程
阶段1:查找事务协调器
- 生产者根据transactional.id找到对应的事务协调器
- 发送FindCoordinator请求
阶段2:初始化事务
- 生产者向协调器发送InitPidRequest
- 协调器分配PID(Producer ID)和Epoch
- 协调器在__transaction_state中记录事务信息
阶段3:开始事务
- 调用beginTransaction()标记事务开始
- 本地操作,不与Broker交互
阶段4:消费-转换-生产(可选)
1// 读取消息、处理、发送到新Topic2producer.beginTransaction();3ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));4for (ConsumerRecord<String, String> record : records) {5 // 处理消息6 String result = process(record.value());7 // 发送到新Topic8 producer.send(new ProducerRecord<>("output-topic", result));9}10// 提交消费位移到事务11producer.sendOffsetsToTransaction(offsets, "consumer-group-id");12producer.commitTransaction();阶段5:提交或回滚
提交流程:
- 生产者发送EndTxnRequest(COMMIT)
- 协调器写入PREPARE_COMMIT到事务日志
- 协调器向所有相关分区写入事务提交标记
- 协调器写入COMPLETE_COMMIT到事务日志
回滚流程:
- 生产者发送EndTxnRequest(ABORT)
- 协调器写入PREPARE_ABORT到事务日志
- 协调器向所有相关分区写入事务回滚标记
- 协调器写入COMPLETE_ABORT到事务日志
4. 消费者读取事务消息
1// 配置隔离级别2props.put("isolation.level", "read_committed");34KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);5// 只能读取已提交的消息隔离级别:
- read_uncommitted:默认,读取所有消息
- read_committed:只读取已提交的事务消息
5. 事务保证
原子性:
- 多个分区的消息要么全部成功,要么全部失败
- 通过两阶段提交实现
幂等性:
- 通过PID + Epoch + Sequence Number保证
- 防止消息重复
隔离性:
- 未提交的消息对read_committed消费者不可见
6. 事务状态机
1Empty -> Ongoing -> PrepareCommit -> CompleteCommit2 \-> PrepareAbort -> CompleteAbort7. 性能影响
优点:
- 保证Exactly-Once语义
- 支持跨分区、跨Topic的原子性
缺点:
- 性能开销较大(约30%)
- 延迟增加
- 需要额外的存储空间
最佳实践:
- 只在需要强一致性时使用事务
- 合理设置transaction.timeout.ms
- 监控事务协调器的性能
12. 你了解 Kafka 中的时间轮实现吗?
答案:
Kafka使用时间轮(Timing Wheel)算法来高效管理大量延时任务,主要用于处理延时操作、心跳检测、超时控制等。
1. 为什么需要时间轮?
传统方案的问题:
- JDK Timer:单线程,任务阻塞会影响其他任务
- ScheduledExecutorService:基于堆实现,插入和删除O(logN)
- DelayQueue:基于优先队列,性能不够高
Kafka的需求:
- 需要管理大量延时任务(百万级)
- 插入、删除操作要快(O(1))
- 任务执行要准确
2. 时间轮原理
基本结构:
1时间轮是一个环形数组,每个槽位存储一个任务链表23 0 1 2 3 4 5 6 74 [*]-[*]-[*]-[*]-[*]-[*]-[*]-[*]5 | | | | | | | |6 任务 任务 任务 ... 任务核心参数:
- tickMs:时间轮的时间精度(滴答间隔),如1ms
- wheelSize:时间轮的槽位数,如20
- interval:时间轮的时间跨度 = tickMs × wheelSize
- currentTime:当前时间指针
3. Kafka的时间轮实现
TimingWheel类:
1class TimingWheel(2 tickMs: Long, // 时间精度3 wheelSize: Int, // 槽位数4 startMs: Long, // 起始时间5 taskCounter: AtomicInteger,6 queue: DelayQueue[TimerTaskList]7) {8 private val interval = tickMs * wheelSize // 时间跨度9 private val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 10 new TimerTaskList(taskCounter) 11 }12 private var currentTime = startMs - (startMs % tickMs)13 14 // 上层时间轮(用于更长的延时)15 @volatile private var overflowWheel: TimingWheel = null16}4. 层级时间轮
多层结构:
1第1层:tickMs=1ms, wheelSize=20, interval=20ms2第2层:tickMs=20ms, wheelSize=20, interval=400ms3第3层:tickMs=400ms,wheelSize=20, interval=8000ms4...任务分配:
- 延时 < 20ms:放入第1层
- 延时 < 400ms:放入第2层
- 延时 < 8000ms:放入第3层
- 以此类推
5. 添加任务
1def add(timerTaskEntry: TimerTaskEntry): Boolean = {2 val expiration = timerTaskEntry.expirationMs3 4 if (timerTaskEntry.cancelled) {5 false6 } else if (expiration < currentTime + tickMs) {7 // 已过期,立即执行8 false9 } else if (expiration < currentTime + interval) {10 // 在当前时间轮范围内11 val virtualId = expiration / tickMs12 val bucket = buckets((virtualId % wheelSize.toLong).toInt)13 bucket.add(timerTaskEntry)14 15 // 设置过期时间16 if (bucket.setExpiration(virtualId * tickMs)) {17 queue.offer(bucket)18 }19 true20 } else {21 // 超出当前时间轮范围,放入上层时间轮22 if (overflowWheel == null) addOverflowWheel()23 overflowWheel.add(timerTaskEntry)24 }25}6. 推进时间轮
1def advanceClock(timeMs: Long): Unit = {2 if (timeMs >= currentTime + tickMs) {3 currentTime = timeMs - (timeMs % tickMs)4 5 // 推进上层时间轮6 if (overflowWheel != null) {7 overflowWheel.advanceClock(currentTime)8 }9 }10}7. 任务执行流程
1// SystemTimer类2def advanceClock(timeoutMs: Long): Boolean = {3 // 从DelayQueue获取到期的bucket4 val bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)5 6 if (bucket != null) {7 // 推进时间轮8 timingWheel.advanceClock(bucket.getExpiration)9 10 // 执行bucket中的所有任务11 bucket.flush(reinsert)12 true13 } else {14 false15 }16}8. 时间复杂度
| 操作 | 时间复杂度 |
|---|---|
| 添加任务 | O(1) |
| 删除任务 | O(1) |
| 执行任务 | O(1) |
9. Kafka中的应用场景
延时操作:
- Produce请求的延时响应
- Fetch请求的延时响应
- 等待ISR副本同步
超时控制:
- 事务超时
- 请求超时
- 会话超时
心跳检测:
- 消费者心跳
- 副本同步心跳
10. 优势
- 高性能:O(1)的插入和删除
- 低延迟:精确的时间控制
- 可扩展:支持层级结构,处理任意长的延时
- 内存友好:只存储任务引用,不复制任务
最佳实践:
- 根据业务需求设置合适的tickMs和wheelSize
- 避免创建过多的时间轮层级
- 及时清理已取消的任务
13. 说说 Kafka 的索引设计有什么亮点?
答案:
Kafka的索引设计非常巧妙,主要包括偏移量索引和时间戳索引,实现了快速定位消息的功能。
1. 索引文件结构
三种文件:
100000000000000000000.log # 日志文件(存储消息)200000000000000000000.index # 偏移量索引文件300000000000000000000.timeindex # 时间戳索引文件文件命名:
- 文件名是该segment的起始offset
- 例如:00000000000000368769.log表示从offset 368769开始
2. 偏移量索引(OffsetIndex)
索引格式:
1[相对offset, 物理位置]2[4 bytes, 4 bytes]示例:
1相对offset 物理位置20 0310 1024420 2048530 3072特点:
- 稀疏索引:不是每条消息都有索引,而是每隔一定数量的消息建立一个索引
- 相对offset:存储的是相对于segment起始offset的相对值
- 固定大小:每个索引项8字节,便于二分查找
- 内存映射:使用mmap技术,提高访问速度
3. 时间戳索引(TimeIndex)
索引格式:
1[时间戳, 相对offset]2[8 bytes, 4 bytes]示例:
1时间戳 相对offset21609459200000 031609459260000 10041609459320000 200用途:
- 根据时间戳查找消息
- 支持基于时间的消息清理
- 支持基于时间的消息消费
4. 查找消息的过程
根据offset查找消息:
1// 1. 定位segment文件2// 假设要查找offset=368800的消息3// 找到segment: 00000000000000368769.log45// 2. 在index文件中二分查找6// 查找小于等于368800的最大索引项7// 假设找到:相对offset=31, 物理位置=307289// 3. 从物理位置3072开始顺序扫描log文件10// 直到找到offset=368800的消息根据时间戳查找消息:
1// 1. 在timeindex文件中二分查找2// 找到小于等于目标时间戳的最大索引项3// 得到相对offset45// 2. 使用相对offset在index文件中查找6// 得到物理位置78// 3. 从物理位置开始顺序扫描5. 索引的亮点
亮点1:稀疏索引
- 不为每条消息建索引,节省空间
- 通过
log.index.interval.bytes控制索引密度(默认4KB) - 平衡了空间和查找效率
亮点2:内存映射文件(mmap)
1// Kafka使用MappedByteBuffer2MappedByteBuffer mmap = channel.map(3 FileChannel.MapMode.READ_WRITE, 0, fileSize);优势:
- 减少系统调用
- 利用操作系统的页缓存
- 零拷贝技术
- 自动预读
亮点3:二分查找
- 索引项固定大小,支持O(log n)的二分查找
- 快速定位到接近目标的位置
亮点4:分段存储
- 每个segment独立索引
- 便于删除过期数据
- 提高并发性能
亮点5:延迟写入
1// 索引不是实时写入,而是批量写入2// 减少磁盘IO次数6. 索引文件的创建和维护
创建时机:
1// 当写入的消息大小累计超过log.index.interval.bytes时2if (bytesSinceLastIndexEntry > indexIntervalBytes) {3 // 添加索引项4 offsetIndex.append(offset, position);5 timeIndex.append(timestamp, offset);6 bytesSinceLastIndexEntry = 0;7}索引恢复:
1// Broker启动时,检查索引文件是否完整2// 如果不完整,重建索引3def rebuildIndex() {4 // 遍历log文件5 // 重新构建index和timeindex6}7. 配置参数
1# 索引间隔(字节)2log.index.interval.bytes=409634# segment大小5log.segment.bytes=107374182467# 索引文件大小8segment.index.bytes=104857608. 性能优化
预分配空间:
1// 索引文件预分配固定大小2// 避免频繁扩容3val indexFile = new File("00000000000000000000.index")4indexFile.setLength(10 * 1024 * 1024) // 10MB批量刷盘:
1// 不是每次写入都刷盘2// 定期或达到一定量后刷盘3if (needFlush) {4 mmap.force() // 强制刷盘5}9. 索引的限制
- 稀疏性:需要顺序扫描部分log文件
- 内存占用:大量segment会占用较多内存
- 重建成本:索引损坏时重建耗时
10. 实际应用
消费者从指定offset消费:
1consumer.seek(new TopicPartition("topic", 0), 368800);消费者从指定时间消费:
1Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();2timestampsToSearch.put(partition, timestamp);3Map<TopicPartition, OffsetAndTimestamp> offsets = 4 consumer.offsetsForTimes(timestampsToSearch);最佳实践:
- 合理设置segment大小和索引间隔
- 监控索引文件大小和数量
- 定期清理过期数据
- 避免频繁的随机访问
14. 看过源码?那说说 Kafka 控制器事件处理全流程?
答案:
Kafka控制器(Controller)是集群中的一个特殊Broker,负责管理整个集群的元数据和状态变更。
1. 控制器的职责
核心职责:
- 分区Leader选举:当Leader副本失效时选举新Leader
- 分区重分配:执行分区副本的重新分配
- Topic管理:创建、删除Topic
- 副本管理:管理ISR列表的变更
- Broker上下线:处理Broker的加入和离开
2. 控制器选举
选举过程:
1// 1. 每个Broker启动时尝试在ZK创建/controller节点2try {3 zkClient.createEphemeralNodeForController("/controller", brokerId);4 // 创建成功,成为Controller5 becomeController();6} catch (NodeExistsException e) {7 // 节点已存在,注册监听器8 zkClient.registerControllerChangeListener();9}Controller信息:
1{2 "version": 1,3 "brokerid": 0,4 "timestamp": "1609459200000"5}3. 事件队列机制
ControllerEventManager:
1class ControllerEventManager(2 controllerId: Int,3 processor: ControllerEventProcessor,4 eventQueueTimeHist: Histogram5) {6 // 事件队列7 private val queue = new LinkedBlockingQueue[ControllerEvent]8 9 // 事件处理线程10 private val thread = new ControllerEventThread("controller-event-thread")11}事件类型:
1sealed trait ControllerEvent2case object Startup extends ControllerEvent3case object BrokerChange extends ControllerEvent4case class TopicChange(topics: Set[String]) extends ControllerEvent5case class PartitionModifications(topic: String) extends ControllerEvent6case class LeaderAndIsrChange(partitions: Set[TopicPartition]) extends ControllerEvent7case class ControllerChange extends ControllerEvent8case object Reelect extends ControllerEvent4. 事件处理全流程
流程图:
1事件产生 -> 加入队列 -> 事件线程取出 -> 状态机处理 -> 发送请求 -> 更新元数据详细步骤:
步骤1:事件产生
1// ZK监听器触发事件2class BrokerChangeListener extends IZkChildListener {3 override def handleChildChange(parentPath: String, 4 currentChildren: java.util.List[String]) {5 // 将事件加入队列6 eventManager.put(BrokerChange)7 }8}步骤2:事件入队
1def put(event: ControllerEvent): Unit = {2 queue.put(event)3}步骤3:事件处理线程
1class ControllerEventThread extends ShutdownableThread {2 override def doWork(): Unit = {3 // 从队列取出事件4 val event = queue.take()5 6 try {7 // 处理事件8 processor.process(event)9 } catch {10 case e: Throwable => error("Error processing event", e)11 }12 }13}步骤4:事件处理器
1def process(event: ControllerEvent): Unit = {2 event match {3 case BrokerChange =>4 processBrokerChange()5 case TopicChange(topics) =>6 processTopicChange(topics)7 case PartitionModifications(topic) =>8 processPartitionModifications(topic)9 case LeaderAndIsrChange(partitions) =>10 processLeaderAndIsrChange(partitions)11 // ...12 }13}5. 具体事件处理示例
示例1:Broker下线处理
1def processBrokerChange(): Unit = {2 // 1. 获取当前存活的Broker列表3 val curBrokers = zkClient.getAllBrokersInCluster()4 val curBrokerIds = curBrokers.map(_.id).toSet5 6 // 2. 找出新增和删除的Broker7 val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds8 val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds9 10 // 3. 更新Controller上下文11 controllerContext.liveBrokerIds = curBrokerIds12 13 // 4. 处理新增Broker14 if (newBrokerIds.nonEmpty) {15 onBrokerStartup(newBrokerIds.toSeq)16 }17 18 // 5. 处理下线Broker19 if (deadBrokerIds.nonEmpty) {20 onBrokerFailure(deadBrokerIds.toSeq)21 }22}2324def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {25 // 1. 找出受影响的分区(Leader在下线Broker上)26 val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo27 .filter { case (_, leaderIsrAndControllerEpoch) =>28 deadBrokers.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader)29 }.keySet30 31 // 2. 为这些分区选举新Leader32 partitionStateMachine.handleStateChanges(33 partitionsWithoutLeader.toSeq,34 OnlinePartition,35 Option(OfflinePartitionLeaderElectionStrategy)36 )37 38 // 3. 从ISR中移除下线的副本39 replicaStateMachine.handleStateChanges(40 getReplicasOnBrokers(deadBrokers),41 OfflineReplica42 )43}示例2:分区Leader选举
1def electLeader(2 partition: TopicPartition,3 strategy: PartitionLeaderElectionStrategy4): Option[Int] = {5 6 val assignment = controllerContext.partitionReplicaAssignment(partition)7 val liveReplicas = assignment.filter(r => 8 controllerContext.isReplicaOnline(r, partition))9 10 strategy match {11 case OfflinePartitionLeaderElectionStrategy =>12 // 从ISR中选择第一个存活的副本13 liveReplicas.find(r => 14 controllerContext.partitionLeadershipInfo(partition)15 .leaderAndIsr.isr.contains(r)16 )17 18 case PreferredReplicaPartitionLeaderElectionStrategy =>19 // 选择preferred replica(AR中的第一个)20 if (liveReplicas.head == assignment.head)21 Some(assignment.head)22 else23 None24 }25}6. 状态机
Kafka使用两个状态机管理状态变更:
ReplicaStateMachine(副本状态机):
1NewReplica -> OnlineReplica -> OfflineReplica -> ReplicaDeletionStarted 2 -> ReplicaDeletionSuccessful3 -> NonExistentReplicaPartitionStateMachine(分区状态机):
1NewPartition -> OnlinePartition -> OfflinePartition -> NonExistentPartition7. 请求发送
1// Controller向Broker发送请求2def sendRequest(3 brokerId: Int,4 request: AbstractControlRequest5): Unit = {6 7 request match {8 case leaderAndIsrRequest: LeaderAndIsrRequest =>9 // 发送Leader和ISR变更请求10 controllerChannelManager.sendRequest(brokerId, leaderAndIsrRequest)11 12 case updateMetadataRequest: UpdateMetadataRequest =>13 // 发送元数据更新请求14 controllerChannelManager.sendRequest(brokerId, updateMetadataRequest)15 16 case stopReplicaRequest: StopReplicaRequest =>17 // 发送停止副本请求18 controllerChannelManager.sendRequest(brokerId, stopReplicaRequest)19 }20}8. 元数据更新
1// 更新ZK中的元数据2def updateLeaderAndIsr(3 partition: TopicPartition,4 newLeaderAndIsr: LeaderAndIsr5): Unit = {6 7 val path = s"/brokers/topics/${partition.topic}/partitions/${partition.partition}/state"8 9 zkClient.updatePersistentPath(path, newLeaderAndIsr.toJson)10}9. Controller Epoch
1// 每次Controller变更时递增2var controllerEpoch: Int = 034def incrementControllerEpoch(): Unit = {5 controllerEpoch += 16 zkClient.updatePersistentPath("/controller_epoch", controllerEpoch.toString)7}作用:
- 防止脑裂
- 拒绝过期Controller的请求
- 保证操作的顺序性
10. 性能优化
批量处理:
1// 批量处理分区变更2def batchLeaderElection(3 partitions: Set[TopicPartition]4): Unit = {5 // 一次性处理多个分区6}异步发送:
1// 异步发送请求给Broker2controllerChannelManager.sendRequest(brokerId, request, callback)最佳实践:
- 监控Controller的事件队列大小
- 避免频繁的分区重分配
- 合理设置ZK的session timeout
- 关注Controller切换的频率
15. Kafka 中 Zookeeper 的作用?
答案:
ZooKeeper在Kafka中扮演着非常重要的角色,主要用于集群管理、元数据存储和分布式协调。
1. ZooKeeper的核心作用
作用1:Controller选举
- 通过临时节点
/controller实现Controller选举 - 保证集群中只有一个Controller
- Controller宕机后自动重新选举
作用2:Broker注册
- 每个Broker启动时在
/brokers/ids/[broker_id]创建临时节点 - 存储Broker的host、port等信息
- Broker宕机后节点自动删除
作用3:Topic注册
- 存储Topic的配置信息
- 存储分区和副本的分配信息
- 路径:
/brokers/topics/[topic_name]
作用4:分区状态管理
- 存储每个分区的Leader、ISR等信息
- 路径:
/brokers/topics/[topic]/partitions/[partition]/state
作用5:消费者组管理(旧版)
- 存储消费者组信息(新版已迁移到Kafka内部)
- 存储消费offset(新版使用__consumer_offsets)
作用6:配置管理
- 存储Topic和Broker的动态配置
- 支持配置的动态变更
2. ZooKeeper节点结构
1/2├── controller # Controller信息3├── controller_epoch # Controller版本号4├── brokers5│ ├── ids # Broker列表6│ │ ├── 0 # Broker 0的信息7│ │ ├── 1 # Broker 1的信息8│ │ └── 2 # Broker 2的信息9│ ├── topics # Topic信息10│ │ └── test-topic11│ │ ├── partitions12│ │ │ ├── 013│ │ │ │ └── state # 分区0的状态14│ │ │ └── 115│ │ │ └── state # 分区1的状态16│ └── seqid # Broker ID序列号17├── config18│ ├── topics # Topic配置19│ ├── brokers # Broker配置20│ └── changes # 配置变更通知21├── admin22│ ├── delete_topics # 待删除的Topic23│ └── reassign_partitions # 分区重分配24├── isr_change_notification # ISR变更通知25└── consumers # 消费者组(旧版)26 └── group-127 ├── ids # 消费者列表28 └── offsets # 消费offset3. 详细节点说明
Controller节点:
1// /controller2{3 "version": 1,4 "brokerid": 0,5 "timestamp": "1609459200000"6}Broker节点:
1// /brokers/ids/02{3 "version": 4,4 "host": "localhost",5 "port": 9092,6 "jmx_port": 9999,7 "timestamp": "1609459200000",8 "endpoints": ["PLAINTEXT://localhost:9092"],9 "rack": null10}Topic节点:
1// /brokers/topics/test-topic2{3 "version": 2,4 "partitions": {5 "0": [0, 1, 2], // 分区0的副本在Broker 0,1,2上6 "1": [1, 2, 0], // 分区1的副本在Broker 1,2,0上7 "2": [2, 0, 1] // 分区2的副本在Broker 2,0,1上8 },9 "adding_replicas": {},10 "removing_replicas": {}11}分区状态节点:
1// /brokers/topics/test-topic/partitions/0/state2{3 "controller_epoch": 1,4 "leader": 0, // Leader副本在Broker 05 "version": 1,6 "leader_epoch": 0,7 "isr": [0, 1, 2] // ISR列表8}4. ZooKeeper的监听机制
Broker监听:
1// Controller监听Broker变化2zkClient.subscribeChildChanges("/brokers/ids", new IZkChildListener {3 override def handleChildChange(parentPath: String, 4 currentChildren: java.util.List[String]) {5 // Broker上下线处理6 eventManager.put(BrokerChange)7 }8})Topic监听:
1// Controller监听Topic变化2zkClient.subscribeChildChanges("/brokers/topics", new IZkChildListener {3 override def handleChildChange(parentPath: String, 4 currentChildren: java.util.List[String]) {5 // Topic创建/删除处理6 eventManager.put(TopicChange(currentChildren.toSet))7 }8})分区状态监听:
1// Controller监听分区状态变化2zkClient.subscribeDataChanges(3 s"/brokers/topics/$topic/partitions/$partition/state",4 new IZkDataListener {5 override def handleDataChange(dataPath: String, data: Object) {6 // 分区状态变更处理7 }8 override def handleDataDeleted(dataPath: String) {9 // 分区删除处理10 }11 }12)5. ZooKeeper在Kafka中的工作流程
Broker启动流程:
11. 连接ZooKeeper22. 在/brokers/ids/[broker_id]创建临时节点33. 注册监听器监听Controller变化44. 尝试竞选Controller55. 从ZooKeeper加载元数据Topic创建流程:
11. 在/brokers/topics/[topic]写入分区分配信息22. Controller监听到变化33. Controller为每个分区选举Leader44. Controller更新/brokers/topics/[topic]/partitions/[partition]/state55. Controller通知相关Broker分区Leader选举流程:
11. Controller检测到Leader失效22. 从ISR中选择新Leader33. 更新ZooKeeper中的分区状态44. 发送LeaderAndIsrRequest给相关Broker55. 发送UpdateMetadataRequest给所有Broker6. ZooKeeper的性能影响
性能瓶颈:
- ZooKeeper写入性能有限(约1000 TPS)
- 大量监听器会增加ZooKeeper负载
- 网络延迟影响元数据更新速度
- 频繁的元数据变更会影响性能
优化措施:
1# 增加ZooKeeper会话超时时间2zookeeper.session.timeout.ms=1800034# 减少不必要的元数据更新5# 批量处理分区变更7. ZooKeeper的问题
问题1:扩展性限制
- ZooKeeper集群规模有限(通常3-7个节点)
- 大规模Kafka集群元数据量大
- 监听器数量过多影响性能
问题2:运维复杂
- 需要单独部署和维护ZooKeeper集群
- 增加了系统复杂度
- 故障排查困难
问题3:性能问题
- Controller变更时需要重新加载所有元数据
- 大量分区时元数据加载慢
- 频繁的ZK写入影响性能
8. 新版Kafka的改进(KRaft模式)
KRaft模式(Kafka 2.8+):
1移除ZooKeeper依赖2使用Raft协议实现元数据管理3元数据存储在Kafka内部Topic中4提高性能和可扩展性对比:
| 特性 | ZooKeeper模式 | KRaft模式 |
|---|---|---|
| 依赖 | 需要ZooKeeper | 无需ZooKeeper |
| 元数据存储 | ZooKeeper | Kafka内部 |
| Controller选举 | ZooKeeper | Raft协议 |
| 扩展性 | 受限 | 更好 |
| 运维复杂度 | 高 | 低 |
| 性能 | 一般 | 更好 |
最佳实践:
- 合理设置ZooKeeper会话超时时间
- 监控ZooKeeper的性能指标
- 避免频繁的元数据变更
- 考虑升级到KRaft模式(生产环境需谨慎)
- 定期清理ZooKeeper中的过期数据
16. Kafka为什么要抛弃 Zookeeper?
答案:
Kafka从2.8版本开始引入KRaft模式(Kafka Raft),逐步移除对ZooKeeper的依赖,主要原因如下:
1. 性能问题
元数据加载慢:
- Controller切换时需要从ZK重新加载所有元数据
- 大规模集群(数万分区)加载耗时可达数分钟
- 影响集群的可用性和恢复时间
写入性能瓶颈:
- ZooKeeper写入TPS有限(约1000-2000)
- 频繁的元数据更新会成为瓶颈
- 限制了Kafka的扩展性
2. 扩展性限制
集群规模限制:
- ZooKeeper集群通常3-7个节点
- 监听器数量过多影响性能
- 难以支持超大规模Kafka集群
分区数量限制:
- 大量分区导致ZK元数据膨胀
- Controller管理开销大
- 影响整体性能
3. 运维复杂度
额外组件:
- 需要单独部署和维护ZooKeeper集群
- 增加运维成本和复杂度
- 故障点增加
版本兼容性:
- Kafka和ZooKeeper版本兼容性问题
- 升级困难
4. 架构问题
双写问题:
- 元数据既在ZK又在Kafka内部
- 数据一致性难以保证
- 增加复杂度
依赖外部系统:
- Kafka依赖ZooKeeper的稳定性
- ZK故障影响Kafka可用性
5. KRaft模式的优势
性能提升:
- Controller切换更快(秒级)
- 元数据更新更高效
- 支持更大规模集群
简化架构:
- 移除ZooKeeper依赖
- 元数据存储在Kafka内部
- 降低运维复杂度
更好的扩展性:
- 支持百万级分区
- Controller性能更好
- 集群恢复更快
6. KRaft架构
核心组件:
- Quorum Controller:基于Raft协议的Controller
- Metadata Topic:存储元数据的内部Topic
- Metadata Log:元数据变更日志
工作原理:
11. 使用Raft协议选举Controller Leader22. 元数据存储在__cluster_metadata Topic中33. Controller变更通过日志复制同步44. Broker从元数据Topic读取最新状态7. 对比总结
| 特性 | ZooKeeper模式 | KRaft模式 |
|---|---|---|
| 外部依赖 | 需要ZK | 无需ZK |
| Controller切换 | 分钟级 | 秒级 |
| 元数据存储 | ZooKeeper | Kafka内部 |
| 分区数量 | 受限(数万) | 更高(百万) |
| 运维复杂度 | 高 | 低 |
| 性能 | 一般 | 更好 |
| 稳定性 | 成熟 | 逐步成熟 |
最佳实践:
- 新集群建议使用KRaft模式
- 现有集群可规划迁移(Kafka 3.3+支持)
- 生产环境需充分测试
- 关注社区稳定性反馈
17. 看过源码?那说说 Kafka 处理请求的全流程?
答案:
Kafka使用Reactor模式处理客户端请求,采用多线程异步处理提高吞吐量。
1. 整体架构
核心组件:
- Acceptor线程:接受新连接
- Processor线程(网络线程):处理网络IO
- RequestChannel:请求队列
- KafkaRequestHandler线程(IO线程):处理业务逻辑
- ResponseQueue:响应队列
2. 处理流程
1客户端 -> Acceptor -> Processor -> RequestQueue -> Handler -> ResponseQueue -> Processor -> 客户端3. 详细步骤
步骤1:接受连接(Acceptor)
1class Acceptor extends Runnable {2 def run(): Unit = {3 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)4 while (isRunning) {5 val ready = nioSelector.select(500)6 if (ready > 0) {7 val keys = nioSelector.selectedKeys()8 keys.forEach { key =>9 if (key.isAcceptable) {10 accept(key)11 }12 }13 }14 }15 }16 17 def accept(key: SelectionKey): Unit = {18 val socketChannel = serverSocketChannel.accept()19 socketChannel.configureBlocking(false)20 // 轮询分配给Processor21 val processor = processors(currentProcessor)22 processor.accept(socketChannel)23 currentProcessor = (currentProcessor + 1) % processors.length24 }25}步骤2:读取请求(Processor)
1class Processor extends Runnable {2 def run(): Unit = {3 while (isRunning) {4 // 处理新连接5 configureNewConnections()6 7 // 处理响应8 processNewResponses()9 10 // 处理网络IO11 poll()12 13 // 处理完成的接收14 processCompletedReceives()15 16 // 处理完成的发送17 processCompletedSends()18 }19 }20 21 def processCompletedReceives(): Unit = {22 selector.completedReceives().forEach { receive =>23 val channel = receive.source()24 val request = parseRequest(receive.payload())25 // 放入请求队列26 requestChannel.sendRequest(request)27 }28 }29}步骤3:请求入队(RequestChannel)
1class RequestChannel(numProcessors: Int) {2 // 请求队列3 private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)4 5 // 响应队列(每个Processor一个)6 private val responseQueues = Array.fill(numProcessors)(7 new LinkedBlockingQueue[RequestChannel.Response]()8 )9 10 def sendRequest(request: Request): Unit = {11 requestQueue.put(request)12 }13 14 def receiveRequest(timeout: Long): Request = {15 requestQueue.poll(timeout, TimeUnit.MILLISECONDS)16 }17}步骤4:业务处理(KafkaRequestHandler)
1class KafkaRequestHandler extends Runnable {2 def run(): Unit = {3 while (!stopped) {4 // 从请求队列获取请求5 val req = requestChannel.receiveRequest(300)6 7 if (req != null) {8 req.requestId match {9 case ApiKeys.PRODUCE => handleProduceRequest(req)10 case ApiKeys.FETCH => handleFetchRequest(req)11 case ApiKeys.METADATA => handleMetadataRequest(req)12 case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(req)13 // ...14 }15 }16 }17 }18}步骤5:Produce请求处理
1def handleProduceRequest(request: RequestChannel.Request): Unit = {2 val produceRequest = request.body[ProduceRequest]3 4 // 1. 验证权限5 authorize(request.session, Write, Resource(Topic, topicPartition.topic))6 7 // 2. 追加消息到日志8 val results = replicaManager.appendRecords(9 timeout = produceRequest.timeout,10 requiredAcks = produceRequest.acks,11 internalTopicsAllowed = internalTopicsAllowed,12 origin = AppendOrigin.Client,13 entriesPerPartition = produceRequest.partitionRecordsOrFail14 )15 16 // 3. 等待副本同步(如果acks=-1)17 if (produceRequest.acks == -1) {18 results.foreach { case (tp, result) =>19 result.onComplete { response =>20 sendResponse(request, response)21 }22 }23 } else {24 // 立即响应25 sendResponse(request, results)26 }27}步骤6:Fetch请求处理
1def handleFetchRequest(request: RequestChannel.Request): Unit = {2 val fetchRequest = request.body[FetchRequest]3 4 // 1. 读取消息5 val fetchInfo = replicaManager.fetchMessages(6 timeout = fetchRequest.maxWait,7 replicaId = fetchRequest.replicaId,8 fetchMinBytes = fetchRequest.minBytes,9 fetchMaxBytes = fetchRequest.maxBytes,10 fetchInfos = fetchRequest.fetchData11 )12 13 // 2. 如果数据不足且未超时,延迟响应14 if (fetchInfo.bytesRead < fetchRequest.minBytes && 15 !fetchInfo.isTimedOut) {16 // 加入延迟队列17 delayedFetchPurgatory.tryCompleteElseWatch(18 delayedFetch, Seq(topicPartition)19 )20 } else {21 // 立即响应22 sendResponse(request, fetchInfo)23 }24}步骤7:发送响应(Processor)
1def processNewResponses(): Unit = {2 var currentResponse: RequestChannel.Response = null3 while ({currentResponse = dequeueResponse(); currentResponse != null}) {4 val channelId = currentResponse.request.context.connectionId5 6 currentResponse match {7 case response: SendResponse =>8 sendResponse(response)9 case response: NoOpResponse =>10 // 不需要响应11 case response: CloseConnectionResponse =>12 close(channelId)13 }14 }15}1617def sendResponse(response: SendResponse): Unit = {18 val responseSend = response.responseSend19 selector.send(responseSend)20}4. 延迟操作(DelayedOperation)
延迟Produce:
1class DelayedProduce extends DelayedOperation {2 override def tryComplete(): Boolean = {3 // 检查是否所有ISR副本都已同步4 val allReplicasCaughtUp = partition.inSyncReplicas.forall { replica =>5 replica.logEndOffset >= requiredOffset6 }7 8 if (allReplicasCaughtUp) {9 forceComplete()10 true11 } else {12 false13 }14 }15 16 override def onComplete(): Unit = {17 // 发送响应给客户端18 responseCallback(response)19 }20}5. 零拷贝技术
1// 使用FileChannel.transferTo实现零拷贝2def sendFile(channel: FileChannel, 3 position: Long, 4 count: Long): Long = {5 channel.transferTo(position, count, socketChannel)6}6. 线程模型配置
1# Acceptor线程数(通常为1)2num.network.threads=334# Processor线程数(网络线程)5num.io.threads=867# 请求队列大小8queued.max.requests=5007. 性能优化
批量处理:
- 批量读取请求
- 批量写入日志
- 批量发送响应
异步处理:
- 网络IO异步化
- 磁盘IO异步化
- 使用回调机制
零拷贝:
- sendfile系统调用
- mmap内存映射
- 减少数据拷贝
最佳实践:
- 合理配置线程数
- 监控请求队列大小
- 优化网络参数
- 使用批量操作
18. RabbitMQ 中无法路由的消息会去到哪里?
答案:
RabbitMQ中无法路由的消息处理方式取决于发布消息时的配置。
1. 默认行为
直接丢弃:
- 默认情况下,无法路由的消息会被直接丢弃
- 生产者不会收到任何通知
- 消息丢失且无法追踪
2. mandatory参数
设置mandatory=true:
1// 发送消息时设置mandatory参数2channel.basicPublish(3 exchange,4 routingKey,5 true, // mandatory=true6 MessageProperties.PERSISTENT_TEXT_PLAIN,7 message.getBytes()8);910// 添加ReturnListener监听无法路由的消息11channel.addReturnListener(new ReturnListener() {12 @Override13 public void handleReturn(int replyCode,14 String replyText,15 String exchange,16 String routingKey,17 AMQP.BasicProperties properties,18 byte[] body) {19 System.out.println("无法路由的消息: " + new String(body));20 System.out.println("原因: " + replyText);21 // 处理无法路由的消息22 handleUnroutableMessage(body);23 }24});工作原理:
- 消息无法路由时,会返回给生产者
- 触发ReturnListener回调
- 生产者可以记录日志或重新发送
3. 备份交换机(Alternate Exchange)
配置备份交换机:
1// 声明主交换机时指定备份交换机2Map<String, Object> args = new HashMap<>();3args.put("alternate-exchange", "backup-exchange");45channel.exchangeDeclare(6 "main-exchange",7 "direct",8 true,9 false,10 args11);1213// 声明备份交换机(通常使用fanout类型)14channel.exchangeDeclare("backup-exchange", "fanout", true);1516// 声明备份队列17channel.queueDeclare("backup-queue", true, false, false, null);1819// 绑定备份队列到备份交换机20channel.queueBind("backup-queue", "backup-exchange", "");工作原理:
- 消息无法在主交换机路由时
- 自动转发到备份交换机
- 备份交换机通常使用fanout类型,将消息发送到所有绑定的队列
4. 处理策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 默认丢弃 | 简单 | 消息丢失 | 可容忍消息丢失 |
| mandatory | 生产者可感知 | 需要处理回调 | 需要知道路由失败 |
| 备份交换机 | 自动处理,不丢失 | 配置复杂 | 不能丢失消息 |
5. 最佳实践
方案1:mandatory + 日志记录
1channel.addReturnListener((replyCode, replyText, exchange, 2 routingKey, properties, body) -> {3 // 记录到日志或数据库4 logger.error("消息路由失败: exchange={}, routingKey={}, message={}", 5 exchange, routingKey, new String(body));6 7 // 发送告警8 alertService.sendAlert("消息路由失败");9});方案2:备份交换机 + 监控队列
1// 定期检查备份队列2@Scheduled(fixedDelay = 60000)3public void checkBackupQueue() {4 long messageCount = getQueueMessageCount("backup-queue");5 if (messageCount > 0) {6 logger.warn("备份队列有{}条消息", messageCount);7 // 处理备份队列中的消息8 processBackupMessages();9 }10}推荐配置:
- 重要消息:使用备份交换机
- 一般消息:使用mandatory参数
- 开发环境:使用mandatory便于调试
- 生产环境:根据业务重要性选择
19. RabbitMQ 中消息什么时候会进入死信交换机?
答案:
死信(Dead Letter)是指无法被正常消费的消息,RabbitMQ提供了死信交换机(DLX)来处理这些消息。
1. 产生死信的三种情况
情况1:消息被拒绝(reject/nack)且requeue=false
1// 消费者拒绝消息且不重新入队2channel.basicReject(deliveryTag, false); // requeue=false34// 或使用basicNack5channel.basicNack(deliveryTag, false, false); // requeue=false情况2:消息TTL过期
1// 设置消息TTL2AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()3 .expiration("10000") // 10秒后过期4 .build();56channel.basicPublish(exchange, routingKey, properties, message);78// 或设置队列TTL9Map<String, Object> args = new HashMap<>();10args.put("x-message-ttl", 10000); // 队列中所有消息10秒后过期11channel.queueDeclare("my-queue", true, false, false, args);情况3:队列达到最大长度
1// 设置队列最大长度2Map<String, Object> args = new HashMap<>();3args.put("x-max-length", 1000); // 最多1000条消息4channel.queueDeclare("my-queue", true, false, false, args);56// 或设置队列最大字节数7args.put("x-max-length-bytes", 1048576); // 最多1MB2. 配置死信交换机
完整配置示例:
1// 1. 声明死信交换机2channel.exchangeDeclare("dlx-exchange", "direct", true);34// 2. 声明死信队列5channel.queueDeclare("dlx-queue", true, false, false, null);67// 3. 绑定死信队列到死信交换机8channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");910// 4. 声明业务队列,指定死信交换机11Map<String, Object> args = new HashMap<>();12args.put("x-dead-letter-exchange", "dlx-exchange"); // 死信交换机13args.put("x-dead-letter-routing-key", "dlx-routing-key"); // 死信路由键(可选)14args.put("x-message-ttl", 10000); // 消息TTL15args.put("x-max-length", 1000); // 队列最大长度1617channel.queueDeclare("business-queue", true, false, false, args);3. 死信消息的属性变化
死信消息会携带额外信息:
1// 消费死信队列2channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {3 @Override4 public void handleDelivery(String consumerTag, Envelope envelope,5 AMQP.BasicProperties properties, byte[] body) {6 // 获取死信原因7 Map<String, Object> headers = properties.getHeaders();8 9 // x-death包含死信详细信息10 List<Map<String, Object>> xDeath = 11 (List<Map<String, Object>>) headers.get("x-death");12 13 if (xDeath != null && !xDeath.isEmpty()) {14 Map<String, Object> death = xDeath.get(0);15 String reason = (String) death.get("reason"); // 死信原因16 String queue = (String) death.get("queue"); // 原队列17 String exchange = (String) death.get("exchange"); // 原交换机18 Long count = (Long) death.get("count"); // 死信次数19 20 System.out.println("死信原因: " + reason);21 System.out.println("原队列: " + queue);22 }23 }24});死信原因(reason):
- rejected:消息被拒绝
- expired:消息过期
- maxlen:队列达到最大长度
4. 死信队列的应用场景
场景1:消息重试
1// 业务队列消费失败后进入死信队列2// 死信队列消费者进行重试3channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {4 @Override5 public void handleDelivery(String consumerTag, Envelope envelope,6 AMQP.BasicProperties properties, byte[] body) {7 try {8 // 重试处理9 retryProcess(body);10 channel.basicAck(envelope.getDeliveryTag(), false);11 } catch (Exception e) {12 // 重试失败,记录日志13 logger.error("重试失败", e);14 channel.basicNack(envelope.getDeliveryTag(), false, false);15 }16 }17});场景2:延迟队列
1// 利用TTL + 死信实现延迟队列2// 1. 声明延迟队列(无消费者,消息过期后进入死信)3Map<String, Object> delayArgs = new HashMap<>();4delayArgs.put("x-dead-letter-exchange", "business-exchange");5delayArgs.put("x-dead-letter-routing-key", "business-key");6delayArgs.put("x-message-ttl", 30000); // 30秒延迟78channel.queueDeclare("delay-queue", true, false, false, delayArgs);910// 2. 发送消息到延迟队列11channel.basicPublish("", "delay-queue", null, message);1213// 3. 30秒后消息自动进入业务队列场景3:异常消息处理
1// 死信队列专门处理异常消息2channel.basicConsume("dlx-queue", false, new DefaultConsumer(channel) {3 @Override4 public void handleDelivery(String consumerTag, Envelope envelope,5 AMQP.BasicProperties properties, byte[] body) {6 // 记录异常消息7 saveToDatabase(body, properties);8 9 // 发送告警10 alertService.sendAlert("发现异常消息");11 12 channel.basicAck(envelope.getDeliveryTag(), false);13 }14});5. 最佳实践
配置建议:
- 所有业务队列都配置死信交换机
- 死信队列设置合理的TTL和最大长度
- 监控死信队列的消息数量
- 定期清理死信队列
注意事项:
- 死信消息也可能再次成为死信(避免循环)
- 死信交换机可以是任意类型
- 死信routing key可以重新指定
20. 说一下 AMQP 协议?
答案:
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层协议,为面向消息的中间件设计。
1. AMQP协议概述
定义:
- 开放标准的应用层协议
- 用于消息中间件
- 与平台和语言无关
- 提供统一的消息服务
版本:
- AMQP 0-9-1:RabbitMQ主要使用的版本
- AMQP 1.0:OASIS标准,与0-9-1不兼容
2. AMQP核心概念
核心组件:
- Broker:消息代理服务器
- Virtual Host:虚拟主机,隔离环境
- Exchange:交换机,接收并路由消息
- Queue:队列,存储消息
- Binding:绑定,连接Exchange和Queue
- Routing Key:路由键,消息路由规则
- Message:消息,包含消息体和属性
3. AMQP工作模型
消息流转过程:
1Producer -> Exchange -> Binding -> Queue -> Consumer详细流程:
- 生产者发送消息到Exchange
- Exchange根据Routing Key和Binding规则路由消息
- 消息被路由到一个或多个Queue
- 消费者从Queue获取消息
- 消费者处理完成后发送ACK
4. Exchange类型
Direct Exchange(直连交换机):
1// 精确匹配routing key2channel.exchangeDeclare("direct-exchange", "direct");3channel.queueBind("queue1", "direct-exchange", "key1");45// 发送消息6channel.basicPublish("direct-exchange", "key1", null, message);Fanout Exchange(扇出交换机):
1// 广播到所有绑定的队列,忽略routing key2channel.exchangeDeclare("fanout-exchange", "fanout");3channel.queueBind("queue1", "fanout-exchange", "");4channel.queueBind("queue2", "fanout-exchange", "");Topic Exchange(主题交换机):
1// 支持通配符匹配2// * 匹配一个单词3// # 匹配零个或多个单词4channel.exchangeDeclare("topic-exchange", "topic");5channel.queueBind("queue1", "topic-exchange", "user.*.create");6channel.queueBind("queue2", "topic-exchange", "user.#");Headers Exchange(头交换机):
1// 根据消息头属性路由2Map<String, Object> headers = new HashMap<>();3headers.put("x-match", "all"); // all或any4headers.put("format", "pdf");5headers.put("type", "report");67channel.queueBind("queue1", "headers-exchange", "", headers);5. AMQP消息属性
基本属性:
1AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()2 .contentType("application/json") // 内容类型3 .contentEncoding("UTF-8") // 编码4 .deliveryMode(2) // 持久化(1=非持久化,2=持久化)5 .priority(5) // 优先级(0-9)6 .correlationId("correlation-id") // 关联ID7 .replyTo("reply-queue") // 回复队列8 .expiration("10000") // 过期时间(毫秒)9 .messageId("message-id") // 消息ID10 .timestamp(new Date()) // 时间戳11 .type("order") // 消息类型12 .userId("user") // 用户ID13 .appId("app") // 应用ID14 .headers(customHeaders) // 自定义头15 .build();6. AMQP连接模型
层次结构:
1Connection(TCP连接)2 └── Channel(虚拟连接)3 ├── Exchange4 ├── Queue5 └── Binding连接管理:
1// 创建连接2ConnectionFactory factory = new ConnectionFactory();3factory.setHost("localhost");4factory.setPort(5672);5factory.setVirtualHost("/");6factory.setUsername("guest");7factory.setPassword("guest");89Connection connection = factory.newConnection();1011// 创建Channel12Channel channel = connection.createChannel();1314// 使用完毕后关闭15channel.close();16connection.close();7. AMQP确认机制
生产者确认:
1// 开启发送确认2channel.confirmSelect();34// 同步确认5channel.basicPublish(exchange, routingKey, null, message);6channel.waitForConfirms();78// 异步确认9channel.addConfirmListener(new ConfirmListener() {10 public void handleAck(long deliveryTag, boolean multiple) {11 // 消息确认12 }13 public void handleNack(long deliveryTag, boolean multiple) {14 // 消息未确认15 }16});消费者确认:
1// 手动确认2channel.basicConsume(queueName, false, new DefaultConsumer(channel) {3 @Override4 public void handleDelivery(String consumerTag, Envelope envelope,5 AMQP.BasicProperties properties, byte[] body) {6 try {7 processMessage(body);8 // 确认消息9 channel.basicAck(envelope.getDeliveryTag(), false);10 } catch (Exception e) {11 // 拒绝消息12 channel.basicNack(envelope.getDeliveryTag(), false, true);13 }14 }15});8. AMQP事务
1// 开启事务2channel.txSelect();34try {5 // 发送消息6 channel.basicPublish(exchange, routingKey, null, message1);7 channel.basicPublish(exchange, routingKey, null, message2);8 9 // 提交事务10 channel.txCommit();11} catch (Exception e) {12 // 回滚事务13 channel.txRollback();14}9. AMQP协议特性
可靠性:
- 消息持久化
- 发送确认
- 消费确认
- 事务支持
灵活性:
- 多种Exchange类型
- 灵活的路由规则
- 支持优先级队列
安全性:
- 用户认证
- 虚拟主机隔离
- 权限控制
- SSL/TLS支持
10. AMQP vs 其他协议
| 特性 | AMQP | MQTT | STOMP |
|---|---|---|---|
| 设计目标 | 企业消息 | IoT | 简单文本 |
| 复杂度 | 高 | 低 | 低 |
| 功能 | 丰富 | 基础 | 基础 |
| 性能 | 中等 | 高 | 中等 |
| 可靠性 | 高 | 中等 | 中等 |
最佳实践:
- 理解AMQP模型和概念
- 合理使用Exchange类型
- 正确配置消息属性
- 使用确认机制保证可靠性
- 避免过度使用事务(性能开销大)
21. 说一下 RabbitMQ 的事务机制?
答案:
RabbitMQ提供了事务机制来保证消息的可靠性,但由于性能开销较大,生产环境更推荐使用发送者确认模式。
1. 事务基本用法
1Channel channel = connection.createChannel();23try {4 // 开启事务5 channel.txSelect();6 7 // 发送消息8 channel.basicPublish(exchange, routingKey, null, message1.getBytes());9 channel.basicPublish(exchange, routingKey, null, message2.getBytes());10 11 // 模拟业务逻辑12 doBusinessLogic();13 14 // 提交事务15 channel.txCommit();16 17} catch (Exception e) {18 // 回滚事务19 channel.txRollback();20 logger.error("事务回滚", e);21}2. 事务的工作原理
事务流程:
txSelect():将Channel设置为事务模式- 发送消息:消息暂存在内存中
txCommit():提交事务,消息持久化到磁盘txRollback():回滚事务,丢弃所有未提交的消息
3. 事务的特点
优点:
- 保证消息的原子性
- 要么全部成功,要么全部失败
- 实现简单
缺点:
- 性能开销大(吞吐量降低约250倍)
- 阻塞式操作,影响并发
- 不适合高并发场景
4. 发送者确认模式(推荐)
普通确认模式:
1// 开启确认模式2channel.confirmSelect();34// 发送消息5channel.basicPublish(exchange, routingKey, null, message.getBytes());67// 同步等待确认8if (channel.waitForConfirms()) {9 System.out.println("消息发送成功");10} else {11 System.out.println("消息发送失败");12}批量确认模式:
1channel.confirmSelect();23// 发送多条消息4for (int i = 0; i < 100; i++) {5 channel.basicPublish(exchange, routingKey, null, message.getBytes());6}78// 批量等待确认9channel.waitForConfirmsOrDie();异步确认模式(最佳):
1channel.confirmSelect();23// 添加确认监听器4channel.addConfirmListener(new ConfirmListener() {5 @Override6 public void handleAck(long deliveryTag, boolean multiple) {7 // 消息确认成功8 if (multiple) {9 // 批量确认10 confirmedMessages.headMap(deliveryTag + 1).clear();11 } else {12 // 单条确认13 confirmedMessages.remove(deliveryTag);14 }15 }16 17 @Override18 public void handleNack(long deliveryTag, boolean multiple) {19 // 消息确认失败,需要重发20 if (multiple) {21 // 批量失败22 resendMessages(deliveryTag);23 } else {24 // 单条失败25 resendMessage(deliveryTag);26 }27 }28});2930// 发送消息31long nextPublishSeqNo = channel.getNextPublishSeqNo();32channel.basicPublish(exchange, routingKey, null, message.getBytes());33confirmedMessages.put(nextPublishSeqNo, message);5. 事务 vs 发送者确认
| 特性 | 事务模式 | 发送者确认模式 |
|---|---|---|
| 性能 | 低(约250倍差距) | 高 |
| 吞吐量 | 低 | 高 |
| 实现复杂度 | 简单 | 中等 |
| 阻塞性 | 阻塞 | 非阻塞(异步) |
| 适用场景 | 低并发 | 高并发 |
6. 消费者事务
1// 消费者也可以使用事务2channel.txSelect();34QueueingConsumer consumer = new QueueingConsumer(channel);5channel.basicConsume(queueName, false, consumer);67while (true) {8 QueueingConsumer.Delivery delivery = consumer.nextDelivery();9 10 try {11 // 处理消息12 processMessage(delivery.getBody());13 14 // 确认消息15 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);16 17 // 提交事务18 channel.txCommit();19 } catch (Exception e) {20 // 回滚事务21 channel.txRollback();22 }23}7. 最佳实践
推荐方案:
1// 生产者:使用异步确认模式2public class ReliableProducer {3 private final ConcurrentNavigableMap<Long, String> outstandingConfirms = 4 new ConcurrentSkipListMap<>();5 6 public void sendMessage(String message) throws IOException {7 channel.confirmSelect();8 9 channel.addConfirmListener(new ConfirmListener() {10 public void handleAck(long deliveryTag, boolean multiple) {11 cleanOutstandingConfirms(deliveryTag, multiple);12 }13 14 public void handleNack(long deliveryTag, boolean multiple) {15 String msg = outstandingConfirms.get(deliveryTag);16 logger.error("消息发送失败: {}", msg);17 // 重发或记录18 resendMessage(msg);19 }20 });21 22 long nextSeqNo = channel.getNextPublishSeqNo();23 channel.basicPublish(exchange, routingKey, null, message.getBytes());24 outstandingConfirms.put(nextSeqNo, message);25 }26 27 private void cleanOutstandingConfirms(long sequenceNumber, boolean multiple) {28 if (multiple) {29 outstandingConfirms.headMap(sequenceNumber + 1).clear();30 } else {31 outstandingConfirms.remove(sequenceNumber);32 }33 }34}注意事项:
- 避免在高并发场景使用事务
- 优先使用异步确认模式
- 事务和确认模式不能同时使用
- 确认模式下需要处理重发逻辑
22. RabbitMQ 中主要有哪几个角色或者说概念?
答案:
RabbitMQ基于AMQP协议,包含多个核心概念和组件。
1. 核心组件
Broker(消息代理):
- RabbitMQ服务器实例
- 接收、存储和转发消息
- 管理Exchange、Queue等资源
Virtual Host(虚拟主机):
- 逻辑隔离单元
- 类似于命名空间
- 不同vhost之间完全隔离
- 每个vhost有独立的Exchange、Queue、权限
1// 连接到指定vhost2factory.setVirtualHost("/my-vhost");2. 消息路由组件
Exchange(交换机):
- 接收生产者发送的消息
- 根据路由规则将消息路由到Queue
- 不存储消息
类型:
- Direct:精确匹配routing key
- Fanout:广播到所有绑定的队列
- Topic:通配符匹配
- Headers:根据消息头匹配
1// 声明Exchange2channel.exchangeDeclare("my-exchange", "direct", true, false, null);Queue(队列):
- 存储消息的容器
- 遵循FIFO原则
- 消息最终被消费者消费
1// 声明Queue2channel.queueDeclare("my-queue", true, false, false, null);Binding(绑定):
- 连接Exchange和Queue的规则
- 定义消息如何从Exchange路由到Queue
- 包含routing key或其他匹配条件
1// 绑定Queue到Exchange2channel.queueBind("my-queue", "my-exchange", "routing-key");3. 消息组件
Message(消息):
- 消息体(Body):实际数据
- 消息属性(Properties):元数据
1AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()2 .contentType("application/json")3 .deliveryMode(2) // 持久化4 .priority(5)5 .build();Routing Key(路由键):
- 生产者发送消息时指定
- Exchange根据routing key路由消息
Binding Key(绑定键):
- 绑定Queue到Exchange时指定
- 用于匹配routing key
4. 角色组件
Producer(生产者):
- 发送消息的应用程序
- 连接到Broker
- 发送消息到Exchange
1channel.basicPublish(exchange, routingKey, properties, message.getBytes());Consumer(消费者):
- 接收消息的应用程序
- 订阅Queue
- 处理消息
1channel.basicConsume(queueName, false, consumer);Connection(连接):
- 应用程序与Broker之间的TCP连接
- 可以包含多个Channel
1Connection connection = factory.newConnection();Channel(信道):
- 建立在Connection之上的虚拟连接
- 大部分操作在Channel上完成
- 轻量级,可创建多个
1Channel channel = connection.createChannel();5. 高级概念
Dead Letter Exchange(死信交换机):
- 处理无法正常消费的消息
- 配置在Queue上
Alternate Exchange(备份交换机):
- 处理无法路由的消息
- 配置在Exchange上
TTL(Time To Live):
- 消息或队列的过期时间
- 过期后进入死信队列或被删除
Priority Queue(优先级队列):
- 支持消息优先级
- 高优先级消息先被消费
1Map<String, Object> args = new HashMap<>();2args.put("x-max-priority", 10);3channel.queueDeclare("priority-queue", true, false, false, args);6. 管理组件
User(用户):
- 访问RabbitMQ的账户
- 有不同的权限级别
Permission(权限):
- 配置权限:创建/删除Exchange、Queue
- 写权限:发送消息
- 读权限:消费消息
Policy(策略):
- 动态配置Queue和Exchange的行为
- 如镜像队列、TTL等
7. 组件关系图
1Producer -> Connection -> Channel -> Exchange -> Binding -> Queue -> Consumer2 |3 Routing Key4 |5 Binding Key8. 完整示例
1// 创建连接和Channel2ConnectionFactory factory = new ConnectionFactory();3factory.setHost("localhost");4factory.setVirtualHost("/my-vhost");5Connection connection = factory.newConnection();6Channel channel = connection.createChannel();78// 声明Exchange9channel.exchangeDeclare("my-exchange", "direct", true);1011// 声明Queue12channel.queueDeclare("my-queue", true, false, false, null);1314// 绑定15channel.queueBind("my-queue", "my-exchange", "my-routing-key");1617// 生产者发送消息18channel.basicPublish("my-exchange", "my-routing-key", null, "Hello".getBytes());1920// 消费者接收消息21channel.basicConsume("my-queue", false, new DefaultConsumer(channel) {22 @Override23 public void handleDelivery(String consumerTag, Envelope envelope,24 AMQP.BasicProperties properties, byte[] body) {25 System.out.println("收到消息: " + new String(body));26 channel.basicAck(envelope.getDeliveryTag(), false);27 }28});最佳实践:
- 合理使用Virtual Host进行环境隔离
- 一个Connection包含多个Channel
- 正确配置Exchange类型
- 理解Binding规则
- 使用持久化保证消息可靠性
23. RabbitMQ 的 routing key 和 binding key 的最大长度是多少字节?
答案:
RabbitMQ对routing key和binding key的长度有明确限制。
1. 长度限制
最大长度:255字节(255 bytes)
- Routing Key:最大255字节
- Binding Key:最大255字节
2. 限制原因
协议层面:
- AMQP 0-9-1协议规定short string类型最大255字节
- routing key和binding key都是short string类型
- 这是协议级别的硬性限制
性能考虑:
- 较短的key提高路由效率
- 减少内存占耗
- 加快匹配速度
3. 实际测试
1// 测试routing key长度限制2public void testRoutingKeyLength() {3 Channel channel = connection.createChannel();4 5 // 255字节的routing key - 正常6 String routingKey255 = StringUtils.repeat("a", 255);7 channel.basicPublish("exchange", routingKey255, null, "test".getBytes());8 9 // 256字节的routing key - 会抛出异常10 try {11 String routingKey256 = StringUtils.repeat("a", 256);12 channel.basicPublish("exchange", routingKey256, null, "test".getBytes());13 } catch (IOException e) {14 // com.rabbitmq.client.ShutdownSignalException: 15 // channel error; protocol method: #method<channel.close>(reply-code=406, ...)16 System.out.println("超过长度限制");17 }18}4. 字符编码影响
UTF-8编码:
- 英文字符:1字节
- 中文字符:通常3字节
- 特殊字符:1-4字节
1// 中文routing key示例2String chineseKey = "订单.创建.成功"; // 约18字节(6个中文字符)3String maxChineseKey = StringUtils.repeat("中", 85); // 约255字节(85个中文字符)5. Topic Exchange的通配符
通配符不占用额外长度:
1// 以下都是有效的binding key2"user.*.create" // 约14字节3"order.#" // 约7字节4"*.*.*.*.*.*.*.*.#" // 约17字节56// 但总长度仍不能超过255字节7String longPattern = "a.b.c.d.e..." // 最多255字节6. 最佳实践
命名建议:
1// 推荐:简洁明了的routing key2"order.created"3"user.updated"4"payment.success"56// 避免:过长的routing key7"com.example.service.order.domain.event.OrderCreatedEvent.v1.production"层次结构:
1// 使用点号分隔的层次结构2"业务域.实体.操作.状态"34// 示例5"order.payment.process.success" // 订单.支付.处理.成功6"user.profile.update.complete" // 用户.资料.更新.完成7. 超长key的处理方案
方案1:使用哈希值
1// 将长key转换为哈希值2String longKey = "very.long.routing.key.that.exceeds.limit...";3String shortKey = DigestUtils.md5Hex(longKey).substring(0, 32);4channel.basicPublish("exchange", shortKey, null, message);方案2:使用消息头
1// 将详细信息放在消息头中2Map<String, Object> headers = new HashMap<>();3headers.put("full-routing-info", longRoutingInfo);45AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()6 .headers(headers)7 .build();89channel.basicPublish("exchange", "short-key", props, message);方案3:重新设计路由规则
1// 简化routing key设计2// 原来:com.company.department.team.service.module.action.status3// 简化:service.action.status8. 相关限制
其他长度限制:
- Queue名称:最大255字节
- Exchange名称:最大255字节
- Virtual Host名称:最大255字节
- 消息头键名:最大255字节
注意事项:
- 超过限制会导致channel关闭
- 建议保持key简短(50字节以内)
- 使用英文和数字,避免特殊字符
- 合理设计命名规范
24. 说说 RabbitMQ 的工作模式?
答案:
RabbitMQ支持多种工作模式(Work Pattern),适用于不同的业务场景。
1. Simple模式(简单模式)
特点:
- 一个生产者,一个消费者
- 最简单的模式
- 不使用Exchange
1// 生产者2channel.queueDeclare("simple-queue", true, false, false, null);3channel.basicPublish("", "simple-queue", null, message.getBytes());45// 消费者6channel.basicConsume("simple-queue", true, new DefaultConsumer(channel) {7 @Override8 public void handleDelivery(String consumerTag, Envelope envelope,9 AMQP.BasicProperties properties, byte[] body) {10 System.out.println("收到消息: " + new String(body));11 }12});适用场景:
- 简单的点对点通信
- 测试和学习
2. Work Queue模式(工作队列模式)
特点:
- 一个生产者,多个消费者
- 消息轮询分发给消费者
- 实现任务分发和负载均衡
1// 生产者2for (int i = 0; i < 100; i++) {3 String message = "Task " + i;4 channel.basicPublish("", "work-queue", null, message.getBytes());5}67// 消费者1和消费者28channel.basicQos(1); // 公平分发,每次只处理一条消息9channel.basicConsume("work-queue", false, new DefaultConsumer(channel) {10 @Override11 public void handleDelivery(String consumerTag, Envelope envelope,12 AMQP.BasicProperties properties, byte[] body) {13 try {14 System.out.println("处理任务: " + new String(body));15 Thread.sleep(1000); // 模拟耗时操作16 channel.basicAck(envelope.getDeliveryTag(), false);17 } catch (Exception e) {18 channel.basicNack(envelope.getDeliveryTag(), false, true);19 }20 }21});适用场景:
- 任务分发
- 负载均衡
- 异步处理
3. Publish/Subscribe模式(发布订阅模式)
特点:
- 使用Fanout Exchange
- 消息广播到所有绑定的队列
- 一个消息被多个消费者接收
1// 声明Fanout Exchange2channel.exchangeDeclare("logs", "fanout");34// 生产者5channel.basicPublish("logs", "", null, message.getBytes());67// 消费者1:保存到数据库8String queue1 = channel.queueDeclare().getQueue();9channel.queueBind(queue1, "logs", "");10channel.basicConsume(queue1, true, (consumerTag, delivery) -> {11 saveToDatabase(new String(delivery.getBody()));12});1314// 消费者2:写入日志文件15String queue2 = channel.queueDeclare().getQueue();16channel.queueBind(queue2, "logs", "");17channel.basicConsume(queue2, true, (consumerTag, delivery) -> {18 writeToFile(new String(delivery.getBody()));19});适用场景:
- 日志收集
- 消息广播
- 事件通知
4. Routing模式(路由模式)
特点:
- 使用Direct Exchange
- 根据routing key精确匹配
- 选择性接收消息
1// 声明Direct Exchange2channel.exchangeDeclare("direct-logs", "direct");34// 生产者:发送不同级别的日志5channel.basicPublish("direct-logs", "error", null, "错误日志".getBytes());6channel.basicPublish("direct-logs", "info", null, "信息日志".getBytes());7channel.basicPublish("direct-logs", "warning", null, "警告日志".getBytes());89// 消费者1:只接收error日志10String errorQueue = channel.queueDeclare().getQueue();11channel.queueBind(errorQueue, "direct-logs", "error");12channel.basicConsume(errorQueue, true, (consumerTag, delivery) -> {13 System.out.println("Error: " + new String(delivery.getBody()));14});1516// 消费者2:接收error和warning日志17String alertQueue = channel.queueDeclare().getQueue();18channel.queueBind(alertQueue, "direct-logs", "error");19channel.queueBind(alertQueue, "direct-logs", "warning");20channel.basicConsume(alertQueue, true, (consumerTag, delivery) -> {21 sendAlert(new String(delivery.getBody()));22});适用场景:
- 日志分级处理
- 消息分类
- 选择性订阅
5. Topics模式(主题模式)
特点:
- 使用Topic Exchange
- 支持通配符匹配
- 更灵活的路由规则
1// 声明Topic Exchange2channel.exchangeDeclare("topic-logs", "topic");34// 生产者:发送不同主题的消息5channel.basicPublish("topic-logs", "user.order.created", null, msg1);6channel.basicPublish("topic-logs", "user.profile.updated", null, msg2);7channel.basicPublish("topic-logs", "system.error.database", null, msg3);89// 消费者1:接收所有user相关消息10String userQueue = channel.queueDeclare().getQueue();11channel.queueBind(userQueue, "topic-logs", "user.#");1213// 消费者2:接收所有error消息14String errorQueue = channel.queueDeclare().getQueue();15channel.queueBind(errorQueue, "topic-logs", "*.error.*");1617// 消费者3:接收所有created消息18String createdQueue = channel.queueDeclare().getQueue();19channel.queueBind(createdQueue, "topic-logs", "*.*.created");通配符规则:
*:匹配一个单词#:匹配零个或多个单词
适用场景:
- 复杂的消息路由
- 多维度消息分类
- 灵活的订阅规则
6. RPC模式(远程过程调用模式)
特点:
- 客户端发送请求,等待响应
- 使用reply_to和correlation_id
- 实现同步调用
1// RPC客户端2public String call(String message) throws Exception {3 // 创建回调队列4 String replyQueueName = channel.queueDeclare().getQueue();5 String corrId = UUID.randomUUID().toString();6 7 // 设置属性8 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()9 .correlationId(corrId)10 .replyTo(replyQueueName)11 .build();12 13 // 发送请求14 channel.basicPublish("", "rpc_queue", props, message.getBytes());15 16 // 等待响应17 BlockingQueue<String> response = new ArrayBlockingQueue<>(1);18 19 channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {20 if (delivery.getProperties().getCorrelationId().equals(corrId)) {21 response.offer(new String(delivery.getBody()));22 }23 });24 25 return response.take();26}2728// RPC服务端29channel.basicConsume("rpc_queue", false, (consumerTag, delivery) -> {30 String message = new String(delivery.getBody());31 32 // 处理请求33 String result = processRequest(message);34 35 // 发送响应36 AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()37 .correlationId(delivery.getProperties().getCorrelationId())38 .build();39 40 channel.basicPublish("", delivery.getProperties().getReplyTo(), 41 replyProps, result.getBytes());42 43 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);44});适用场景:
- 远程方法调用
- 同步请求响应
- 微服务间通信
7. 模式对比
| 模式 | Exchange类型 | 特点 | 适用场景 |
|---|---|---|---|
| Simple | 默认 | 点对点 | 简单通信 |
| Work Queue | 默认 | 多消费者负载均衡 | 任务分发 |
| Pub/Sub | Fanout | 广播 | 消息广播 |
| Routing | Direct | 精确匹配 | 消息分类 |
| Topics | Topic | 通配符匹配 | 复杂路由 |
| RPC | 默认 | 请求响应 | 同步调用 |
最佳实践:
- 根据业务场景选择合适的模式
- Work Queue模式使用basicQos实现公平分发
- Pub/Sub模式使用临时队列
- Topics模式合理设计routing key层次
- RPC模式设置超时时间
25. 说说 RabbitMQ 的集群模式?
答案:
RabbitMQ提供了多种集群模式来实现高可用和负载均衡。
1. 普通集群模式(默认模式)
特点:
- 元数据在所有节点同步(Exchange、Queue定义)
- 消息只存储在声明队列的节点上
- 其他节点只存储队列的元数据指针
- 消费时需要从存储节点拉取消息
架构:
1Node1: Queue A (数据) + Queue B (元数据)2Node2: Queue B (数据) + Queue A (元数据)3Node3: Queue A (元数据) + Queue B (元数据)配置示例:
1# Node12rabbitmq-server -detached34# Node2加入集群5rabbitmqctl stop_app6rabbitmqctl join_cluster rabbit@node17rabbitmqctl start_app89# Node3加入集群10rabbitmqctl stop_app11rabbitmqctl join_cluster rabbit@node112rabbitmqctl start_app1314# 查看集群状态15rabbitmqctl cluster_status优点:
- 配置简单
- 节省存储空间
- 提高吞吐量
缺点:
- 队列所在节点宕机,消息丢失
- 跨节点消费性能下降
- 不是真正的高可用
2. 镜像队列模式(高可用模式)
特点:
- 队列在多个节点上有副本
- 主队列(Master)和镜像队列(Mirror)
- 消息在所有镜像节点同步
- Master宕机,自动选举新Master
配置方式1:通过Policy配置
1# 设置所有队列镜像到所有节点2rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'34# 设置镜像到2个节点5rabbitmqctl set_policy ha-two "^" '{"ha-mode":"exactly","ha-params":2}'67# 设置镜像到指定节点8rabbitmqctl set_policy ha-nodes "^" \9 '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}'配置方式2:通过管理界面
1Admin -> Policies -> Add Policy2- Name: ha-all3- Pattern: ^4- Apply to: Queues5- Definition: ha-mode = all配置方式3:通过代码
1Map<String, Object> args = new HashMap<>();2args.put("x-ha-policy", "all");3channel.queueDeclare("ha-queue", true, false, false, args);同步模式配置:
1# 自动同步(推荐)2rabbitmqctl set_policy ha-all "^" \3 '{"ha-mode":"all","ha-sync-mode":"automatic"}'45# 手动同步6rabbitmqctl sync_queue queue_name工作原理:
11. 消息发送到Master队列22. Master同步消息到所有Mirror33. 所有Mirror确认后,Master返回确认44. Master宕机,选举新Master55. 新Master继续提供服务优点:
- 真正的高可用
- 数据不丢失
- 自动故障转移
缺点:
- 性能开销大
- 网络带宽消耗高
- 不适合大量消息
3. 仲裁队列模式(Quorum Queue)
特点:
- RabbitMQ 3.8+引入
- 基于Raft协议
- 更好的数据安全性
- 推荐的高可用方案
声明仲裁队列:
1Map<String, Object> args = new HashMap<>();2args.put("x-queue-type", "quorum");3channel.queueDeclare("quorum-queue", true, false, false, args);通过Policy配置:
1rabbitmqctl set_policy quorum-policy "^quorum\." \2 '{"queue-type":"quorum","delivery-limit":3}'特性:
- 自动复制到多个节点(默认3个)
- 使用Raft协议保证一致性
- 支持消息重试限制
- 更好的性能
对比镜像队列:
| 特性 | 镜像队列 | 仲裁队列 |
|---|---|---|
| 协议 | 自定义 | Raft |
| 性能 | 一般 | 更好 |
| 数据安全 | 好 | 更好 |
| 内存使用 | 高 | 较低 |
| 推荐度 | 不推荐 | 推荐 |
4. Federation插件(联邦模式)
特点:
- 连接不同的RabbitMQ集群
- 跨数据中心消息传递
- 松耦合
配置:
1# 启用插件2rabbitmq-plugins enable rabbitmq_federation3rabbitmq-plugins enable rabbitmq_federation_management45# 配置upstream6rabbitmqctl set_parameter federation-upstream my-upstream \7 '{"uri":"amqp://remote-server","expires":3600000}'89# 配置policy10rabbitmqctl set_policy federate-me "^federated\." \11 '{"federation-upstream-set":"all"}'适用场景:
- 跨地域部署
- 数据中心互联
- 灾备方案
5. Shovel插件(铲子模式)
特点:
- 在不同集群间转发消息
- 更灵活的消息路由
- 支持消息转换
配置:
1rabbitmq-plugins enable rabbitmq_shovel2rabbitmq-plugins enable rabbitmq_shovel_management34rabbitmqctl set_parameter shovel my-shovel \5 '{"src-uri":"amqp://","src-queue":"source-queue",6 "dest-uri":"amqp://remote","dest-queue":"dest-queue"}'6. 负载均衡方案
使用HAProxy:
1# haproxy.cfg2listen rabbitmq3 bind *:56724 mode tcp5 balance roundrobin6 server rabbit1 node1:5672 check inter 5s rise 2 fall 37 server rabbit2 node2:5672 check inter 5s rise 2 fall 38 server rabbit3 node3:5672 check inter 5s rise 2 fall 3使用Nginx:
1stream {2 upstream rabbitmq {3 server node1:5672;4 server node2:5672;5 server node3:5672;6 }7 8 server {9 listen 5672;10 proxy_pass rabbitmq;11 }12}客户端配置:
1ConnectionFactory factory = new ConnectionFactory();2factory.setAddresses(new Address[] {3 new Address("node1", 5672),4 new Address("node2", 5672),5 new Address("node3", 5672)6});7factory.setAutomaticRecoveryEnabled(true);7. 最佳实践
集群规划:
- 至少3个节点(奇数个)
- 使用仲裁队列替代镜像队列
- 配置自动恢复机制
- 监控集群状态
性能优化:
- 合理设置镜像节点数量
- 使用SSD存储
- 优化网络配置
- 限制队列长度
高可用配置:
1ConnectionFactory factory = new ConnectionFactory();2factory.setAutomaticRecoveryEnabled(true);3factory.setNetworkRecoveryInterval(10000);4factory.setRequestedHeartbeat(60);5factory.setConnectionTimeout(30000);26. 为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?而选择自己实现 NameServer?
答案:
RocketMQ选择自研NameServer而非ZooKeeper,主要基于以下考虑:
1. 架构简单性
NameServer特点:
- 无状态设计
- 节点之间不通信
- 不需要选举
- 部署简单
ZooKeeper特点:
- 有状态设计
- 需要ZAB协议保证一致性
- 需要选举Leader
- 部署复杂
1// NameServer启动非常简单2public class NamesrvStartup {3 public static void main(String[] args) {4 NamesrvController controller = createNamesrvController(args);5 start(controller);6 }7}2. 性能考虑
NameServer优势:
- 纯内存操作
- 无磁盘IO
- 响应速度快(毫秒级)
- 支持高并发
ZooKeeper限制:
- 需要持久化到磁盘
- 写入性能有限(约1000-2000 TPS)
- 响应相对较慢
- 大量监听器影响性能
性能对比:
1NameServer: 2- 路由查询: <1ms3- 注册更新: <5ms4- 无磁盘IO56ZooKeeper:7- 读取: 几毫秒8- 写入: 10-50ms9- 需要磁盘IO3. 功能需求匹配
RocketMQ的需求:
- 路由信息查询(读多写少)
- 最终一致性即可
- 不需要强一致性
- 允许短暂不一致
NameServer设计:
1// 路由信息定期更新,允许短暂不一致2public class RouteInfoManager {3 // Broker每30秒发送心跳4 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;5 6 // 路由信息表7 private final HashMap<String, List<QueueData>> topicQueueTable;8 private final HashMap<String, BrokerData> brokerAddrTable;9 10 // 注册Broker11 public void registerBroker(String brokerName, String brokerAddr) {12 // 直接更新内存,不需要强一致性保证13 this.brokerAddrTable.put(brokerName, brokerData);14 }15}ZooKeeper提供:
- 强一致性(CP)
- 分布式锁
- 选举功能
- 对RocketMQ来说是过度设计
4. 可用性设计
NameServer高可用:
- 多个NameServer节点独立运行
- 客户端连接任意可用节点
- 某个节点宕机不影响其他节点
- 无单点故障
1// 客户端配置多个NameServer地址2producer.setNamesrvAddr("192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876");34// 客户端自动切换5public class MQClientInstance {6 public void updateNameServerAddressList(String addrs) {7 String[] addrArray = addrs.split(";");8 // 随机选择一个NameServer9 Collections.shuffle(Arrays.asList(addrArray));10 }11}ZooKeeper高可用:
- 需要过半节点存活
- Leader宕机需要重新选举
- 选举期间不可用
- 有单点风险
5. 运维成本
NameServer优势:
- 部署简单,启动即可
- 无需配置集群关系
- 无需调优参数
- 故障恢复快
ZooKeeper劣势:
- 需要配置集群
- 需要调优(内存、GC等)
- 需要监控选举状态
- 运维复杂
6. 依赖管理
NameServer:
- RocketMQ自带组件
- 版本统一管理
- 无外部依赖
ZooKeeper:
- 额外的外部依赖
- 版本兼容性问题
- 增加系统复杂度
7. NameServer工作原理
注册流程:
1// Broker启动时向所有NameServer注册2public class BrokerController {3 public void registerBrokerAll() {4 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();5 6 for (String namesrvAddr : nameServerAddressList) {7 try {8 // 向每个NameServer注册9 this.registerBroker(namesrvAddr);10 } catch (Exception e) {11 log.warn("registerBroker Exception", e);12 }13 }14 }15 16 // 每30秒发送心跳17 this.scheduledExecutorService.scheduleAtFixedRate(() -> {18 this.registerBrokerAll();19 }, 10, 30, TimeUnit.SECONDS);20}路由发现:
1// Producer/Consumer从NameServer获取路由信息2public class MQClientInstance {3 public TopicRouteData getTopicRouteInfoFromNameServer(String topic) {4 // 从任意一个NameServer获取5 for (String addr : this.remotingClient.getNameServerAddressList()) {6 try {7 return this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, addr);8 } catch (Exception e) {9 // 失败则尝试下一个10 continue;11 }12 }13 return null;14 }15 16 // 定期更新路由信息(默认30秒)17 this.scheduledExecutorService.scheduleAtFixedRate(() -> {18 this.updateTopicRouteInfoFromNameServer();19 }, 10, 30, TimeUnit.SECONDS);20}8. 数据一致性处理
最终一致性模型:
1// NameServer之间不通信,通过Broker心跳实现最终一致2// 1. Broker向所有NameServer发送心跳3// 2. 每个NameServer独立维护路由表4// 3. 短暂不一致可以接受(最多30秒)56// 客户端容错机制7public class DefaultMQProducerImpl {8 private SendResult sendDefaultImpl(Message msg) {9 // 获取路由信息10 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());11 12 // 选择队列发送13 MessageQueue mq = selectOneMessageQueue(topicPublishInfo);14 15 // 发送失败自动重试其他Broker16 for (int times = 0; times < retryTimes; times++) {17 try {18 return this.sendKernelImpl(msg, mq);19 } catch (Exception e) {20 // 选择其他队列重试21 mq = selectOneMessageQueue(topicPublishInfo);22 }23 }24 }25}9. 对比总结
| 特性 | NameServer | ZooKeeper |
|---|---|---|
| 架构 | 无状态 | 有状态 |
| 一致性 | 最终一致 | 强一致(CP) |
| 性能 | 高 | 中等 |
| 复杂度 | 简单 | 复杂 |
| 运维成本 | 低 | 高 |
| 依赖 | 无 | 外部依赖 |
| 适用场景 | 路由注册 | 配置管理、分布式锁 |
10. 最佳实践
NameServer部署:
- 至少部署2个节点
- 推荐3个节点
- 节点分布在不同机器
- 配置自动重启
配置示例:
1# NameServer配置2listenPort=98763# 路由信息清理间隔(默认10秒)4scanNotActiveBrokerInterval=10000客户端配置:
1// 配置多个NameServer地址2DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");3producer.setNamesrvAddr("192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876");45// 配置重试次数6producer.setRetryTimesWhenSendFailed(3);总结: RocketMQ选择自研NameServer是基于实际需求的权衡,追求简单、高效、易运维,而非盲目使用ZooKeeper。这体现了"合适的才是最好的"设计理念。
27. 说一下 Kafka 为什么性能高?
答案:
Kafka的高性能是多种技术手段综合作用的结果。
1. 顺序写磁盘
原理:
- 消息追加到日志文件末尾
- 避免随机IO
- 顺序写性能接近内存
性能对比:
1顺序写磁盘: 600MB/s2随机写磁盘: 100KB/s3顺序写内存: 几GB/s实现:
1// Kafka日志追加2public class Log {3 public void append(MemoryRecords records) {4 // 追加到活跃段文件末尾5 segment.append(records);6 // 顺序写,性能极高7 }8}2. 零拷贝技术(Zero-Copy)
传统IO流程:
1磁盘 -> 内核缓冲区 -> 用户缓冲区 -> Socket缓冲区 -> 网卡2(4次拷贝,4次上下文切换)零拷贝流程:
1磁盘 -> 内核缓冲区 -> 网卡2(2次拷贝,2次上下文切换)实现:
1// 使用FileChannel.transferTo实现零拷贝2public long writeTo(GatheringByteChannel channel, long position, int length) {3 return fileChannel.transferTo(position, length, channel);4}56// sendfile系统调用7// 数据直接从文件传输到Socket,不经过用户空间性能提升:
- 减少CPU拷贝
- 减少上下文切换
- 提升2-3倍吞吐量
3. 页缓存(Page Cache)
原理:
- 利用操作系统页缓存
- 写入时先写入页缓存
- 读取时优先从页缓存读取
- 操作系统负责刷盘
优势:
1// Kafka不维护自己的缓存2// 充分利用OS的页缓存3// 1. 避免GC压力4// 2. 进程重启数据不丢失5// 3. 自动预读和后写效果:
- 写入:内存速度
- 读取:大部分命中缓存
- 减少磁盘IO
4. 批量处理
生产者批量发送:
1// 配置批量大小2props.put("batch.size", 16384); // 16KB3props.put("linger.ms", 10); // 等待10ms45// 消息累积到batch.size或linger.ms后批量发送6public class RecordAccumulator {7 public RecordAppendResult append(TopicPartition tp, Record record) {8 // 追加到批次9 Deque<ProducerBatch> dq = getOrCreateDeque(tp);10 ProducerBatch last = dq.peekLast();11 12 if (last != null) {13 // 追加到现有批次14 FutureRecordMetadata future = last.tryAppend(record);15 if (future != null) {16 return new RecordAppendResult(future, false);17 }18 }19 20 // 创建新批次21 ProducerBatch batch = new ProducerBatch(tp, records, now);22 dq.addLast(batch);23 }24}消费者批量拉取:
1// 配置每次拉取的数据量2props.put("fetch.min.bytes", 1024); // 最小1KB3props.put("fetch.max.bytes", 52428800); // 最大50MB4props.put("max.poll.records", 500); // 最多500条56// 一次拉取多条消息7ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));批量写入磁盘:
1// 批量刷盘2public void flush() {3 // 批量写入多条消息4 fileChannel.write(buffers);5}5. 数据压缩
支持的压缩算法:
1// 生产者配置压缩2props.put("compression.type", "lz4"); // gzip, snappy, lz4, zstd34// 压缩比对比5// gzip: 压缩比最高,CPU消耗大6// snappy: 平衡7// lz4: 速度最快8// zstd: 新算法,综合最优效果:
- 减少网络传输
- 减少磁盘存储
- 提升吞吐量
6. 分区并行
分区设计:
1// Topic分为多个分区2// 每个分区独立读写3// 并行处理提升吞吐量45// 创建多分区Topic6kafka-topics.sh --create \7 --topic my-topic \8 --partitions 10 \9 --replication-factor 3并行写入:
1// 多个生产者并行写入不同分区2producer.send(new ProducerRecord<>("topic", partition, key, value));并行消费:
1// 消费者组内多个消费者并行消费不同分区2// 分区数 = 最大并行度7. 高效的数据结构
日志段(Log Segment):
1// 每个分区由多个段文件组成200000000000000000000.log // 第一个段300000000000000368769.log // 第二个段400000000000000737337.log // 第三个段56// 优势:7// 1. 快速定位消息8// 2. 方便清理过期数据9// 3. 支持并行写入索引文件:
1// 偏移量索引200000000000000000000.index3// 时间戳索引400000000000000000000.timeindex56// 稀疏索引,快速定位7public class OffsetIndex {8 // 每隔4KB记录一个索引项9 private final int indexIntervalBytes = 4096;10 11 public OffsetPosition lookup(long targetOffset) {12 // 二分查找13 int slot = indexSlotFor(targetOffset);14 return parseEntry(slot);15 }16}8. 网络模型优化
Reactor模式:
1// Acceptor线程接受连接2// Processor线程处理网络IO3// Handler线程处理业务逻辑45class SocketServer {6 val processors = new Array[Processor](numProcessors)7 val acceptor = new Acceptor(...)8 9 // 多线程并行处理10 processors.foreach(_.start())11 acceptor.start()12}NIO非阻塞:
1// 使用Java NIO2Selector selector = Selector.open();3channel.register(selector, SelectionKey.OP_READ);45// 单线程处理多个连接6while (true) {7 selector.select();8 Set<SelectionKey> keys = selector.selectedKeys();9 // 处理就绪的连接10}9. 内存管理优化
内存池:
1// BufferPool复用ByteBuffer2public class BufferPool {3 private final Deque<ByteBuffer> free;4 5 public ByteBuffer allocate(int size) {6 // 从池中获取,避免频繁分配7 ByteBuffer buffer = free.pollFirst();8 if (buffer == null) {9 buffer = ByteBuffer.allocate(size);10 }11 return buffer;12 }13 14 public void deallocate(ByteBuffer buffer) {15 // 归还到池中16 free.add(buffer);17 }18}避免GC:
- 使用堆外内存
- 复用对象
- 减少临时对象创建
10. 配置优化
生产者配置:
1# 批量大小2batch.size=163843linger.ms=1045# 压缩6compression.type=lz478# 缓冲区9buffer.memory=335544321011# 并发请求12max.in.flight.requests.per.connection=5Broker配置:
1# 网络线程2num.network.threads=834# IO线程5num.io.threads=867# 日志刷盘策略8log.flush.interval.messages=100009log.flush.interval.ms=10001011# 副本拉取线程12num.replica.fetchers=4消费者配置:
1# 拉取大小2fetch.min.bytes=10243fetch.max.bytes=524288004max.poll.records=50056# 并行度(分区数)11. 性能数据
吞吐量:
- 单机:几十万到百万TPS
- 集群:千万级TPS
延迟:
- 端到端延迟:几毫秒到几十毫秒
- 99分位延迟:小于100ms
总结: Kafka的高性能来自于:
- 顺序写磁盘
- 零拷贝技术
- 页缓存利用
- 批量处理
- 数据压缩
- 分区并行
- 高效数据结构
- 网络模型优化
- 内存管理
- 合理配置
28. RabbitMQ 怎么实现延迟队列?
答案:
RabbitMQ实现延迟队列主要有两种方式:TTL+死信队列和延迟插件。
1. TTL + 死信队列(推荐方案)
原理:
- 消息设置TTL过期时间
- 过期后进入死信交换机
- 死信队列的消费者处理延迟消息
实现步骤:
步骤1:声明死信交换机和队列
1// 1. 声明死信交换机2channel.exchangeDeclare("dlx.exchange", "direct", true);34// 2. 声明死信队列(实际业务队列)5channel.queueDeclare("dlx.queue", true, false, false, null);67// 3. 绑定8channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing.key");步骤2:声明延迟队列
1// 声明延迟队列,配置死信交换机2Map<String, Object> args = new HashMap<>();3args.put("x-dead-letter-exchange", "dlx.exchange");4args.put("x-dead-letter-routing-key", "dlx.routing.key");5args.put("x-message-ttl", 30000); // 30秒延迟67channel.queueDeclare("delay.queue", true, false, false, args);步骤3:发送延迟消息
1// 发送消息到延迟队列2channel.basicPublish("", "delay.queue", null, message.getBytes());34// 消息30秒后自动进入dlx.queue步骤4:消费延迟消息
1// 消费死信队列(实际业务队列)2channel.basicConsume("dlx.queue", false, new DefaultConsumer(channel) {3 @Override4 public void handleDelivery(String consumerTag, Envelope envelope,5 AMQP.BasicProperties properties, byte[] body) {6 System.out.println("收到延迟消息: " + new String(body));7 channel.basicAck(envelope.getDeliveryTag(), false);8 }9});2. 支持多种延迟时间
方案:为每种延迟时间创建一个队列
1// 延迟队列工厂2public class DelayQueueFactory {3 4 // 创建延迟队列5 public void createDelayQueue(int delaySeconds) {6 String queueName = "delay.queue." + delaySeconds;7 8 Map<String, Object> args = new HashMap<>();9 args.put("x-dead-letter-exchange", "dlx.exchange");10 args.put("x-dead-letter-routing-key", "dlx.routing.key");11 args.put("x-message-ttl", delaySeconds * 1000);12 13 channel.queueDeclare(queueName, true, false, false, args);14 }15 16 // 发送延迟消息17 public void sendDelayMessage(String message, int delaySeconds) {18 String queueName = "delay.queue." + delaySeconds;19 channel.basicPublish("", queueName, null, message.getBytes());20 }21}2223// 使用24factory.createDelayQueue(10); // 10秒延迟队列25factory.createDelayQueue(30); // 30秒延迟队列26factory.createDelayQueue(60); // 60秒延迟队列2728factory.sendDelayMessage("10秒后执行", 10);29factory.sendDelayMessage("30秒后执行", 30);3. 消息级别TTL(灵活延迟)
设置消息TTL:
1// 每条消息设置不同的TTL2AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()3 .expiration(String.valueOf(delayMillis)) // 消息级别TTL4 .build();56channel.basicPublish("", "delay.queue", properties, message.getBytes());注意事项:
- 消息级别TTL可能导致队头阻塞
- 如果队头消息未过期,后面的消息即使过期也不会被处理
- 建议使用队列级别TTL
4. 延迟插件方案(rabbitmq_delayed_message_exchange)
安装插件:
1# 下载插件2wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez34# 复制到插件目录5cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/plugins/67# 启用插件8rabbitmq-plugins enable rabbitmq_delayed_message_exchange使用插件:
1// 1. 声明延迟交换机2Map<String, Object> args = new HashMap<>();3args.put("x-delayed-type", "direct");45channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);67// 2. 声明队列并绑定8channel.queueDeclare("delayed.queue", true, false, false, null);9channel.queueBind("delayed.queue", "delayed.exchange", "delayed.routing.key");1011// 3. 发送延迟消息12Map<String, Object> headers = new HashMap<>();13headers.put("x-delay", 30000); // 延迟30秒1415AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()16 .headers(headers)17 .build();1819channel.basicPublish("delayed.exchange", "delayed.routing.key", properties, message.getBytes());2021// 4. 消费消息22channel.basicConsume("delayed.queue", false, new DefaultConsumer(channel) {23 @Override24 public void handleDelivery(String consumerTag, Envelope envelope,25 AMQP.BasicProperties properties, byte[] body) {26 System.out.println("收到延迟消息: " + new String(body));27 channel.basicAck(envelope.getDeliveryTag(), false);28 }29});插件优势:
- 支持任意延迟时间
- 不会队头阻塞
- 使用简单
插件劣势:
- 需要安装插件
- 性能相对较低
- 延迟消息存储在内存中
5. Spring Boot集成
配置类:
1@Configuration2public class DelayQueueConfig {3 4 // 死信交换机5 @Bean6 public DirectExchange dlxExchange() {7 return new DirectExchange("dlx.exchange");8 }9 10 // 死信队列11 @Bean12 public Queue dlxQueue() {13 return new Queue("dlx.queue");14 }15 16 // 绑定17 @Bean18 public Binding dlxBinding() {19 return BindingBuilder.bind(dlxQueue())20 .to(dlxExchange())21 .with("dlx.routing.key");22 }23 24 // 延迟队列25 @Bean26 public Queue delayQueue() {27 Map<String, Object> args = new HashMap<>();28 args.put("x-dead-letter-exchange", "dlx.exchange");29 args.put("x-dead-letter-routing-key", "dlx.routing.key");30 args.put("x-message-ttl", 30000);31 return new Queue("delay.queue", true, false, false, args);32 }33}生产者:
1@Service2public class DelayMessageProducer {3 4 @Autowired5 private RabbitTemplate rabbitTemplate;6 7 public void sendDelayMessage(String message, int delaySeconds) {8 MessageProperties properties = new MessageProperties();9 properties.setExpiration(String.valueOf(delaySeconds * 1000));10 11 Message msg = new Message(message.getBytes(), properties);12 rabbitTemplate.send("", "delay.queue", msg);13 }14}消费者:
1@Component2public class DelayMessageConsumer {3 4 @RabbitListener(queues = "dlx.queue")5 public void handleDelayMessage(String message) {6 System.out.println("收到延迟消息: " + message);7 // 处理业务逻辑8 }9}6. 应用场景
订单超时取消:
1// 创建订单后发送30分钟延迟消息2public void createOrder(Order order) {3 // 保存订单4 orderRepository.save(order);5 6 // 发送延迟消息7 delayProducer.sendDelayMessage(order.getId(), 30 * 60);8}910// 30分钟后检查订单状态11@RabbitListener(queues = "dlx.queue")12public void checkOrderStatus(String orderId) {13 Order order = orderRepository.findById(orderId);14 if (order.getStatus() == OrderStatus.UNPAID) {15 // 取消订单16 order.setStatus(OrderStatus.CANCELLED);17 orderRepository.save(order);18 }19}定时任务:
1// 发送定时提醒2public void scheduleReminder(String userId, String content, int delaySeconds) {3 ReminderMessage message = new ReminderMessage(userId, content);4 delayProducer.sendDelayMessage(JSON.toJSONString(message), delaySeconds);5}7. 方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| TTL+死信 | 无需插件,稳定 | 固定延迟时间 | 固定延迟场景 |
| 消息TTL | 灵活 | 队头阻塞 | 不推荐 |
| 延迟插件 | 灵活,无阻塞 | 需要插件 | 任意延迟场景 |
最佳实践:
- 固定延迟时间:使用TTL+死信队列
- 任意延迟时间:使用延迟插件
- 监控延迟队列长度
- 设置合理的TTL
- 处理消息幂等性
29. RocketMQ 的延迟消息是怎么实现的?
答案:
RocketMQ通过内部延迟队列机制实现延迟消息,支持18个预设延迟级别。
实现原理:
- 延迟消息先发送到内部Topic(SCHEDULE_TOPIC_XXXX)
- 定时任务扫描到期消息
- 将到期消息投递到目标Topic
使用示例:
1Message message = new Message("TopicTest", "Hello".getBytes());2message.setDelayTimeLevel(3); // level 3 = 10秒延迟3producer.send(message);延迟级别:
11s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h30. RocketMQ 的开发参考了 Kafka,那两者在架构和功能上有什么区别?
答案:
核心差异对比:
| 特性 | Kafka | RocketMQ |
|---|---|---|
| 设计目标 | 日志收集、流处理 | 业务消息传递 |
| 注册中心 | ZooKeeper/KRaft | NameServer |
| 存储模型 | 分区独立存储 | 统一CommitLog |
| 消息过滤 | 客户端过滤 | 服务端过滤(Tag/SQL) |
| 延迟消息 | 不支持 | 支持18个延迟级别 |
| 事务消息 | 支持Exactly Once | 支持分布式事务 |
| 消息重试 | 需自己实现 | 内置重试机制 |
| 死信队列 | 需自己实现 | 内置死信队列 |
| 吞吐量 | 极高(百万级) | 高(十万级) |
选型建议:
- 选Kafka:日志收集、大数据处理、高吞吐量场景
- 选RocketMQ:业务消息、需要事务/延迟消息、金融级应用
31. 说一下 RocketMQ 中关于事务消息的实现?
答案:
RocketMQ事务消息通过两阶段提交和事务回查机制实现分布式事务。
实现流程:
11. 发送Half消息(预提交)22. 执行本地事务33. 提交或回滚消息44. 超时则Broker回查事务状态代码示例:
1// 1. 创建事务生产者2TransactionMQProducer producer = new TransactionMQProducer("group");3producer.setTransactionListener(new TransactionListenerImpl());45// 2. 实现事务监听器6public class TransactionListenerImpl implements TransactionListener {7 @Override8 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {9 try {10 // 执行本地事务11 orderService.createOrder((Order) arg);12 return LocalTransactionState.COMMIT_MESSAGE;13 } catch (Exception e) {14 return LocalTransactionState.ROLLBACK_MESSAGE;15 }16 }17 18 @Override19 public LocalTransactionState checkLocalTransaction(MessageExt msg) {20 // 事务回查21 String status = transactionLogService.getStatus(msg.getTransactionId());22 return "COMMIT".equals(status) ? 23 LocalTransactionState.COMMIT_MESSAGE : 24 LocalTransactionState.ROLLBACK_MESSAGE;25 }26}2728// 3. 发送事务消息29Message message = new Message("OrderTopic", order.toBytes());30producer.sendMessageInTransaction(message, order);最佳实践:
- 本地事务要快速执行
- 记录事务日志用于回查
- 下游消费要幂等处理
学习指南
核心要点:
- 消息队列的基本概念和模型
- 主流MQ产品的架构和特性
- 消息可靠性和一致性保证
- 消息队列在分布式系统中的应用
学习路径建议:
- 掌握消息队列的基本概念
- 深入理解主流MQ产品的特性
- 熟悉消息队列的高可用方案
- 学习消息队列在实际项目中的应用
评论区 / Comments