MQ 消息队列详细使用教程:RabbitMQ、Kafka、RocketMQ 全解析

MQ 消息队列详细使用教程:RabbitMQ、Kafka、RocketMQ 全解析

MQ 消息队列详细使用教程

一、MQ 消息队列简介

消息队列(Message Queue) 是一种异步通信机制,允许应用程序之间通过消息进行通信,而无需同时在线。

为什么使用 MQ

优势 说明
解耦 发送者和接收者相互独立,降低系统耦合度
异步 非阻塞通信,提升响应速度和用户体验
削峰填谷 缓冲突发流量,保护后端服务不被压垮
可扩展 轻松扩展消费者数量,提升处理能力
可靠性 消息持久化,保证数据不丢失

应用场景

订单系统:下单 → 写入 MQ → 异步处理库存、积分、通知
支付系统:支付成功 → MQ → 更新订单状态、发送通知
日志收集:应用 → MQ → 日志处理服务 → Elasticsearch
消息通知:事件触发 → MQ → 邮件、短信、推送服务

二、MQ 的核心概念

基本组件

| 组件 | 说明 |
|——|——|
| Producer(生产者) | 发送消息的应用 |
| Consumer(消费者) | 接收并处理消息的应用 |
| Broker(消息代理) | 消息队列服务端 |
| Topic(主题) | 消息的分类/类别 |
| Queue(队列) | 存储消息的物理队列 |
| Message(消息) | 实际传输的数据包 |

消息模型

点对点模型(Queue):
生产者 → Queue → 消费者(一条消息只被一个消费者消费)

发布订阅模型(Topic):
生产者 → Topic → 消费者 1
               → 消费者 2
               → 消费者 3(一条消息被多个消费者消费)

三、常见 MQ 产品对比

功能对比

| 特性 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|——|———-|——-|———-|———-|
| 消息延迟 | 毫秒级 | 秒级 | 毫秒级 | 毫秒级 |
| 吞吐量 | 万级/秒 | 十万级/秒 | 万级/秒 | 千级/秒 |
| 消息可靠性 | 高 | 高 | 极高 | 中 |
| 顺序消息 | 支持 | 分区有序 | 支持 | 支持 |
| 事务消息 | 支持 | 不支持 | 支持 | 支持 |
| 消息回溯 | 不支持 | 支持 | 支持 | 不支持 |
| 学习成本 | 低 | 中 | 中 | 高 |
| 适用场景 | 中小流量 | 日志/大数据 | 电商/金融 | 遗留系统 |

选择建议

| 场景 | 推荐 MQ | 原因 |
|——|———|——|
| 中小流量,复杂路由 | RabbitMQ | 灵活的路由规则 |
| 日志/大数据处理 | Kafka | 超高吞吐 |
| 电商/金融业务 | RocketMQ | 高可靠、事务消息 |
| 遗留系统集成 | ActiveMQ | 成熟稳定 |

四、RabbitMQ 快速开始

安装和启动

# Docker 启动
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3-management

访问管理界面

http://localhost:15672

用户名:admin 密码:admin

Spring Boot 集成

“`xml

org.springframework.boot
spring-boot-starter-amqp


yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10


生产者代码

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

private static final String QUEUE_NAME = “order.queue”;

public void sendOrderMessage(Order order) {
String message = JSON.toJSONString(order);

rabbitTemplate.convertAndSend(QUEUE_NAME, message);

System.out.println(“发送消息:” + message);
}
}


消费者代码

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

@RabbitListener(queues = “order.queue”)
public void receiveOrder(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
Order order = JSON.parseObject(body, Order.class);

try {
// 处理业务逻辑
processOrder(order);

// 手动确认 ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(“处理成功:” + order.getId());
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println(“处理失败:” + e.getMessage());
}
}

private void processOrder(Order order) {
// 订单处理逻辑
}
}


五、Kafka 快速开始

安装和启动

bash

Docker 启动 Kafka

docker run -d –name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
confluentinc/cp-kafka:latest

创建 Topic

kafka-topics –create \
–bootstrap-server localhost:9092 \
–topic order-topic \
–partitions 3 \
–replication-factor 1


Spring Boot 集成

xml

org.springframework.kafka
spring-kafka


yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
consumer:
group-id: order-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer


生产者代码

java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

private static final String TOPIC = “order-topic”;

public void sendOrder(Order order) {
CompletableFuture> future =
kafkaTemplate.send(TOPIC, order.getId().toString(), order);

future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println(“消息发送成功:” +
result.getRecordMetadata().offset());
} else {
ex.printStackTrace();
}
});
}
}


消费者代码

java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

@KafkaListener(topics = “order-topic”, groupId = “order-group”)
public void receiveOrder(Order order, Acknowledgment ack) {
try {
processOrder(order);
ack.acknowledge(); // 手动确认
System.out.println(“处理成功:” + order.getId());
} catch (Exception e) {
e.printStackTrace();
}
}

private void processOrder(Order order) {
// 订单处理逻辑
}
}


六、消息可靠性保证

ACK 机制

RabbitMQ ACK

java
@RabbitListener(queues = “order.queue”)
public void receiveOrder(Message message, Channel channel) throws Exception {
try {
// 处理业务
process(message);
// 确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}


Kafka ACK

yaml
spring:
kafka:
producer:
acks: all # 所有副本确认
retries: 3


重试机制

java
@Configuration
public class RetryConfig {

@Bean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();

// 指数退避策略
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
backOff.setInitialInterval(1000);
backOff.setMultiplier(2);
backOff.setMaxInterval(10000);
template.setBackOffPolicy(backOff);

// 重试 3 次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);

return template;
}
}


死信队列(DLQ)

java
@Configuration
public class DeadLetterQueueConfig {

@Bean
public Queue orderQueue() {
return new Queue(“order.queue”, true)
.deadLetterExchange(“dlx.exchange”)
.deadLetterRoutingKey(“order.dlx”)
.maximumLength(100)
.maximumAge(60000); // TTL: 60 秒
}

@Bean
public Exchange dlxExchange() {
return new DirectExchange(“dlx.exchange”, true, false);
}

@Bean
public Queue deadLetterQueue() {
return new Queue(“order.dlx”, true);
}

@Bean
public Binding binding() {
return BindingBuilder.bind(deadLetterQueue())
.to(dlxExchange())
.with(“order.dlx”);
}
}


七、消息积压处理

积压原因分析

  1. 消费者处理速度慢
  2. 消费者数量不足
  3. 网络延迟
  4. 后端服务故障
  5. 业务逻辑复杂
  6. 
    

    解决方案

    1. 增加消费者

    java
    @Configuration
    public class ConsumerConfig {

    @Bean
    public ConcurrentMessageListenerContainer container(
    ConsumerFactory factory) {

    ConcurrentMessageListenerContainer container =
    new ConcurrentMessageListenerContainer<>(factory, new DefaultErrorHandler());

    container.setConcurrency(20); // 增加并发度
    container.setPollTimeout(1000);

    return container;
    }
    }

    
    

    2. 临时扩容

    bash

    Kafka 消费者分组扩容

    1. 启动新的消费者实例

    docker run -d –name kafka-consumer-2 \
    -e GROUP_ID=new-group \
    -e TOPIC=order-topic \
    kafka-consumer

    2. 消费者会自动重新分配分区

    
    

    3. 简化处理逻辑

    java
    // 方案一:异步处理
    @KafkaListener(topics = “order-topic”)
    public void receiveAsync(Order order) {
    CompletableFuture.runAsync(() -> {
    processOrder(order);
    });
    }

    // 方案二:批量处理
    @KafkaListener(topics = “order-topic”)
    public void receiveBatch(List orders) {
    // 批量处理,减少 IO
    orderService.batchProcess(orders);
    }

    
    

    八、消息排序和去重

    消息排序

    分区保证顺序(Kafka)

    java
    // 使用相同的 partitionKey 保证同一业务有序
    kafkaTemplate.send(“order-topic”, order.getUserId(), order);

    
    

    单队列顺序(RabbitMQ)

    java
    // 单消费者消费单个队列
    @RabbitListener(queues = “order.queue”)
    public void receiveSingle(Order order) {
    processOrder(order); // 顺序处理
    }

    
    

    消息去重

    方案一:数据库唯一键

    java
    @Transactional
    public void processOrder(Order order) {
    // 检查消息是否已处理
    if (orderService.exists(order.getMessageId())) {
    return; // 已处理,跳过
    }

    // 处理订单
    orderService.createOrder(order);

    // 标记已处理
    orderService.markProcessed(order.getMessageId());
    }

    
    

    方案二:Redis 去重

    java
    @Service
    public class DeduplicationService {

    @Autowired
    private RedisTemplate redisTemplate;

    private static final int TTL = 86400; // 24 小时

    public boolean isDuplicate(String messageId) {
    String key = “message:processed:” + messageId;
    Boolean exists = redisTemplate.hasKey(key);

    if (Boolean.TRUE.equals(exists)) {
    return true; // 已处理
    }

    // 标记已处理
    redisTemplate.opsForValue().setIfAbsent(key, “1”, TTL, TimeUnit.SECONDS);
    return false;
    }
    }

    
    

    九、监控和最佳实践

    监控指标

    yaml

    Actuator 监控

    management:
    endpoints:
    web:
    exposure:
    include: health,metrics
    metrics:
    tags:
    application: ${spring.application.name}
    export:
    prometheus:
    enabled: true

    
    

    关键指标

    | 指标 | 说明 | 告警阈值 | |------|------|----------| | 消息积压数 | 队列未消费消息数量 | > 10000 | | 消费延迟 | 消息从发送到消费的时间差 | > 60s | | 失败率 | 消费失败消息占比 | > 5% | | QPS | 每秒消息处理量 | - |

    最佳实践

    yaml

    1. 消息确认

    spring:
    rabbitmq:
    listener:
    simple:
    acknowledge-mode: manual # 手动确认
    concurrency: 5

    2. 异常处理

    spring:
    rabbitmq:
    listener:
    simple:
    max-retry-interval: 30000 # 最大重试间隔
    max-attempts: 3 # 最大重试次数

    3. 超时控制

    spring:
    kafka:
    consumer:
    max-poll-interval-ms: 300000 # 最大轮询间隔
    session-timeout-ms: 30000 # 会话超时
    “`

    总结

    本文涵盖了 MQ 的:

    • MQ 简介 – 什么是 MQ、核心优势
    • 核心概念 – Producer、Consumer、Topic、Queue
    • 产品对比 – RabbitMQ、Kafka、RocketMQ、ActiveMQ
    • RabbitMQ – 快速开始、生产者、消费者
    • Kafka – 快速开始、生产者、消费者
    • 可靠性 – ACK、重试、死信队列
    • 积压处理 – 增加消费者、简化处理
    • 排序去重 – 分区有序、Redis 去重
    • 监控实践 – 指标监控、配置优化

    #MQ #消息队列 #RabbitMQ #Kafka #RocketMQ #分布式

    文章已完成!

    文件路径: `/home/node/.openclaw/agents/creator/workspace/content/MQ 消息队列详细使用教程_20260425_2138.md`

    请告诉我下一步操作(配图、发布等)!

© 版权声明
THE END
喜欢就支持一下吧
点赞15 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容