
MQ 消息队列详细使用教程
一、MQ 消息队列简介
消息队列(Message Queue) 是一种异步通信机制,允许应用程序之间通过消息进行通信,而无需同时在线。
为什么使用 MQ
| 优势 | 说明 |
|---|---|
| 解耦 | 发送者和接收者相互独立,降低系统耦合度 |
| 异步 | 非阻塞通信,提升响应速度和用户体验 |
| 削峰填谷 | 缓冲突发流量,保护后端服务不被压垮 |
| 可扩展 | 轻松扩展消费者数量,提升处理能力 |
| 可靠性 | 消息持久化,保证数据不丢失 |
应用场景
订单系统:下单 → 写入 MQ → 异步处理库存、积分、通知
支付系统:支付成功 → MQ → 更新订单状态、发送通知
日志收集:应用 → MQ → 日志处理服务 → Elasticsearch
消息通知:事件触发 → MQ → 邮件、短信、推送服务
二、MQ 的核心概念
基本组件
| 组件 | 说明 |
|——|——|
| Producer(生产者) | 发送消息的应用 |
| Consumer(消费者) | 接收并处理消息的应用 |
| Broker(消息代理) | 消息队列服务端 |
| Topic(主题) | 消息的分类/类别 |
| Queue(队列) | 存储消息的物理队列 |
| Message(消息) | 实际传输的数据包 |
消息模型
点对点模型(Queue):
生产者 → Queue → 消费者(一条消息只被一个消费者消费)
发布订阅模型(Topic):
生产者 → Topic → 消费者 1
→ 消费者 2
→ 消费者 3(一条消息被多个消费者消费)
三、常见 MQ 产品对比
功能对比
| 特性 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|——|———-|——-|———-|———-|
| 消息延迟 | 毫秒级 | 秒级 | 毫秒级 | 毫秒级 |
| 吞吐量 | 万级/秒 | 十万级/秒 | 万级/秒 | 千级/秒 |
| 消息可靠性 | 高 | 高 | 极高 | 中 |
| 顺序消息 | 支持 | 分区有序 | 支持 | 支持 |
| 事务消息 | 支持 | 不支持 | 支持 | 支持 |
| 消息回溯 | 不支持 | 支持 | 支持 | 不支持 |
| 学习成本 | 低 | 中 | 中 | 高 |
| 适用场景 | 中小流量 | 日志/大数据 | 电商/金融 | 遗留系统 |
选择建议
| 场景 | 推荐 MQ | 原因 |
|——|———|——|
| 中小流量,复杂路由 | RabbitMQ | 灵活的路由规则 |
| 日志/大数据处理 | Kafka | 超高吞吐 |
| 电商/金融业务 | RocketMQ | 高可靠、事务消息 |
| 遗留系统集成 | ActiveMQ | 成熟稳定 |
四、RabbitMQ 快速开始
安装和启动
# Docker 启动
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3-management
访问管理界面
http://localhost:15672
用户名:admin 密码:admin
Spring Boot 集成
“`xml
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
生产者代码
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = “order.queue”;
public void sendOrderMessage(Order order) {
String message = JSON.toJSONString(order);
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
System.out.println(“发送消息:” + message);
}
}
消费者代码
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
@RabbitListener(queues = “order.queue”)
public void receiveOrder(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
Order order = JSON.parseObject(body, Order.class);
try {
// 处理业务逻辑
processOrder(order);
// 手动确认 ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(“处理成功:” + order.getId());
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println(“处理失败:” + e.getMessage());
}
}
private void processOrder(Order order) {
// 订单处理逻辑
}
}
五、Kafka 快速开始
安装和启动
bash
Docker 启动 Kafka
docker run -d –name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
confluentinc/cp-kafka:latest
创建 Topic
kafka-topics –create \
–bootstrap-server localhost:9092 \
–topic order-topic \
–partitions 3 \
–replication-factor 1
Spring Boot 集成
xml
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
consumer:
group-id: order-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
生产者代码
java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate
private static final String TOPIC = “order-topic”;
public void sendOrder(Order order) {
CompletableFuture
kafkaTemplate.send(TOPIC, order.getId().toString(), order);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println(“消息发送成功:” +
result.getRecordMetadata().offset());
} else {
ex.printStackTrace();
}
});
}
}
消费者代码
java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = “order-topic”, groupId = “order-group”)
public void receiveOrder(Order order, Acknowledgment ack) {
try {
processOrder(order);
ack.acknowledge(); // 手动确认
System.out.println(“处理成功:” + order.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
private void processOrder(Order order) {
// 订单处理逻辑
}
}
六、消息可靠性保证
ACK 机制
RabbitMQ ACK
java
@RabbitListener(queues = “order.queue”)
public void receiveOrder(Message message, Channel channel) throws Exception {
try {
// 处理业务
process(message);
// 确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
Kafka ACK
yaml
spring:
kafka:
producer:
acks: all # 所有副本确认
retries: 3
重试机制
java
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
// 指数退避策略
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
backOff.setInitialInterval(1000);
backOff.setMultiplier(2);
backOff.setMaxInterval(10000);
template.setBackOffPolicy(backOff);
// 重试 3 次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);
return template;
}
}
死信队列(DLQ)
java
@Configuration
public class DeadLetterQueueConfig {
@Bean
public Queue orderQueue() {
return new Queue(“order.queue”, true)
.deadLetterExchange(“dlx.exchange”)
.deadLetterRoutingKey(“order.dlx”)
.maximumLength(100)
.maximumAge(60000); // TTL: 60 秒
}
@Bean
public Exchange dlxExchange() {
return new DirectExchange(“dlx.exchange”, true, false);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(“order.dlx”, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(deadLetterQueue())
.to(dlxExchange())
.with(“order.dlx”);
}
}
七、消息积压处理
积压原因分析
- 消费者处理速度慢
- 消费者数量不足
- 网络延迟
- 后端服务故障
- 业务逻辑复杂
- ✅ MQ 简介 – 什么是 MQ、核心优势
- ✅ 核心概念 – Producer、Consumer、Topic、Queue
- ✅ 产品对比 – RabbitMQ、Kafka、RocketMQ、ActiveMQ
- ✅ RabbitMQ – 快速开始、生产者、消费者
- ✅ Kafka – 快速开始、生产者、消费者
- ✅ 可靠性 – ACK、重试、死信队列
- ✅ 积压处理 – 增加消费者、简化处理
- ✅ 排序去重 – 分区有序、Redis 去重
- ✅ 监控实践 – 指标监控、配置优化
解决方案
1. 增加消费者
java
@Configuration
public class ConsumerConfig {
@Bean
public ConcurrentMessageListenerContainer
ConsumerFactory
ConcurrentMessageListenerContainer
new ConcurrentMessageListenerContainer<>(factory, new DefaultErrorHandler());
container.setConcurrency(20); // 增加并发度
container.setPollTimeout(1000);
return container;
}
}
2. 临时扩容
bash
Kafka 消费者分组扩容
1. 启动新的消费者实例
docker run -d –name kafka-consumer-2 \
-e GROUP_ID=new-group \
-e TOPIC=order-topic \
kafka-consumer
2. 消费者会自动重新分配分区
3. 简化处理逻辑
java
// 方案一:异步处理
@KafkaListener(topics = “order-topic”)
public void receiveAsync(Order order) {
CompletableFuture.runAsync(() -> {
processOrder(order);
});
}
// 方案二:批量处理
@KafkaListener(topics = “order-topic”)
public void receiveBatch(List
// 批量处理,减少 IO
orderService.batchProcess(orders);
}
八、消息排序和去重
消息排序
分区保证顺序(Kafka)
java
// 使用相同的 partitionKey 保证同一业务有序
kafkaTemplate.send(“order-topic”, order.getUserId(), order);
单队列顺序(RabbitMQ)
java
// 单消费者消费单个队列
@RabbitListener(queues = “order.queue”)
public void receiveSingle(Order order) {
processOrder(order); // 顺序处理
}
消息去重
方案一:数据库唯一键
java
@Transactional
public void processOrder(Order order) {
// 检查消息是否已处理
if (orderService.exists(order.getMessageId())) {
return; // 已处理,跳过
}
// 处理订单
orderService.createOrder(order);
// 标记已处理
orderService.markProcessed(order.getMessageId());
}
方案二:Redis 去重
java
@Service
public class DeduplicationService {
@Autowired
private RedisTemplate
private static final int TTL = 86400; // 24 小时
public boolean isDuplicate(String messageId) {
String key = “message:processed:” + messageId;
Boolean exists = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exists)) {
return true; // 已处理
}
// 标记已处理
redisTemplate.opsForValue().setIfAbsent(key, “1”, TTL, TimeUnit.SECONDS);
return false;
}
}
九、监控和最佳实践
监控指标
yaml
Actuator 监控
management:
endpoints:
web:
exposure:
include: health,metrics
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true
关键指标
| 指标 | 说明 | 告警阈值 |
|------|------|----------|
| 消息积压数 | 队列未消费消息数量 | > 10000 |
| 消费延迟 | 消息从发送到消费的时间差 | > 60s |
| 失败率 | 消费失败消息占比 | > 5% |
| QPS | 每秒消息处理量 | - |
最佳实践
yaml
1. 消息确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认
concurrency: 5
2. 异常处理
spring:
rabbitmq:
listener:
simple:
max-retry-interval: 30000 # 最大重试间隔
max-attempts: 3 # 最大重试次数
3. 超时控制
spring:
kafka:
consumer:
max-poll-interval-ms: 300000 # 最大轮询间隔
session-timeout-ms: 30000 # 会话超时
“`
—
总结
本文涵盖了 MQ 的:
#MQ #消息队列 #RabbitMQ #Kafka #RocketMQ #分布式
—
文章已完成!
文件路径: `/home/node/.openclaw/agents/creator/workspace/content/MQ 消息队列详细使用教程_20260425_2138.md`
请告诉我下一步操作(配图、发布等)!





















暂无评论内容