type
status
date
slug
summary
tags
category
password
1、消息类型
RocketMQ 提供了多种功能强大的消息类型,以满足不同场景的需求。
- 普通消息:最基本的消息类型,无特殊功能,发送后立即可被消费。
- 顺序消息:保证消息在同一个 Queue 内严格按照 FIFO(先进先出)的顺序进行发布和消费。
- 延时消息:消息发送后,不会立即被消费,而是在指定的延迟时间过后才投递给消费者。
- 事务消息: 保证本地数据库操作和消息发送这两个操作是一个事务操作,要么同时成功,要么同时失败。
1.1 顺序消息
- 概念: RocketMQ 保证消息在同一个 Queue 的有序性,即在同一个 Queue 内严格按照 FIFO(先进先出)的顺序进行发布和消费。
- 实现原理:
- 全局顺序: Topic 只设置一个 Queue,并严格到消费者是单线程发送,消费者是单线程消费,那么该 Topic 所有消息都是全局有序的。但这会严重影响并发性能,通常不推荐。
- 分区顺序: Topic 可以设置多个 Queue,将一组需要保证顺序的消息(例如同一个订单 ID 的所有操作:创建、付款、发货)发送到同一个 Queue,这样可以保证这组消息的消费是顺序的。具体操作是:
- 生产者在发送消息时自定义
MessageQueueSelector
接口的实现,最常见的做法是使用订单 ID 或用户 ID 等业务键作为分片键(Sharding Key),RocketMQ 会确保同一个键的消息总是落到同一个 Queue 中。另外生产者必须使用同步发送(syncSend
),并等待上一条消息发送成功后再发送下一条。如果使用异步发送,无法保证网络返回的先后顺序就是实际发送的顺序。 - 消费者以 顺序消费模式(
MessageListenerOrderly
)来消费消息,且必须以单线程的方式消费。如果某条消息消费失败,消费者会自动在本地进行重试(默认重试次数),期间会阻塞对该队列其他消息的消费,而不是跳过失败的消息去消费后面的。
- 场景: 订单创建、付款、发货等流程;数据实时同步。

1.2 延时消息
- 概念: 消息发送后,不会立即被消费,而是在指定的延迟时间过后才投递给消费者。
- 实现原理: RocketMQ 不支持任意精度的延时,而是预设了 18 个延时等级(1s, 5s, 10s, 30s, 1m, 2m, ... 2h)。发送消息时设置一个
delayTimeLevel
。Broker 有定时任务,会检查延时消息,如果时间到了,才会将其投放到目标 Topic 的ConsumeQueue
中,从而被消费者可见。
- 场景: 订单超时未支付自动关闭;定时提醒、通知。
RocketMQ 的延迟消息不支持秒级精度。默认支持 18 个等级的延迟消息,在 Broker 的配置文件中,默认的延时等级定义如下:
延迟级别和对应的延迟时间的对应关系如下
延迟级别 (delayTimeLevel) | 延迟时间 | 延迟级别 (delayTimeLevel) | 延迟时间 |
1 | 1秒 | 10 | 10分钟 |
2 | 5秒 | 11 | 20分钟 |
3 | 10秒 | 12 | 30分钟 |
4 | 30秒 | 13 | 1小时 |
5 | 1分钟 | 14 | 2小时 |
6 | 2分钟 | 15(部分版本定义) | (请参考实际配置) |
7 | 3分钟 | 16(部分版本定义) | (请参考实际配置) |
8 | 4分钟 | 17(部分版本定义) | (请参考实际配置) |
9 | 5分钟 | 18(部分版本定义) | 2小时 |
Producer 发送延时消息:只需要设置一个延迟级别即可(注意不是具体的延迟时间)
延时消息的实现原理:Broker 在启动时,会创建一个内部主题:
SCHEDULE_TOPIC_XXXX
,这个 Topic 可以直接在 RocketMQ 的管理界面看到。
查看
SCHEDULE_TOPIC_XXXX
的队列内容,发现这个 Topic 有 18 个队列,分别为 0 ~ 17
。
这是因为 RocketMQ 会根据延迟等级的个数,创建对应数量的队列,也就是说 18 个等级对应了 18 个队列,每个队列只保存相同延时级别的消息。可以根据业务需求修改或者增加等级数。例如想支持 2 天的延迟,可以增加一个 2d,这个时候总共就有 19 个等级。

Broker 处理延时消息的流程如下:
- 识别延时消息:Broker 解析消息属性,发现
delayTimeLevel > 0
,识别出这是一个延时消息。
- 替换目标 Topic 和 QueueId:
- Broker 会将消息的真实 Topic(
RealTopic
)替换为一个内部的、预定义的 Topic:SCHEDULE_TOPIC_XXXX
。 - 同时,将消息的真实 QueueId 替换为
delayTimeLevel - 1
(因为队列 ID 从0开始)。例如,延时等级为 3,则消息会被放入SCHEDULE_TOPIC_XXXX
的第二个队列(QueueId = 2)中。
- 持久化:消息被作为一条普通消息,持久化到
SCHEDULE_TOPIC_XXXX
这个特定 Topic 的特定队列中。注意:此时,订阅了RealTopic
的 Consumer 是看不到这条消息的,因为它被“隐藏”在了SCHEDULE_TOPIC_XXXX
中。
- 定时任务与消息重投(调度环节)
- 为每个延时等级创建定时任务:RocketMQ 有一个专门的定时消息服务类
ScheduleMessageService
。ScheduleMessageService
在启动时,会根据配置的messageDelayLevel
,为每一个延时等级(共18个)创建一个对应的定时器(Timer) 和消费队列偏移量(offset) 的管理器。 - 定时扫描:每个定时器会以固定的频率(默认100ms一次)扫描其对应的
SCHEDULE_TOPIC_XXXX
队列。 - 计算投递时间:当从队列中取出消息时,服务会检查消息的“存储时间”(
storeTimestamp
)。根据存储时间 + 延时等级对应的时间(如10s)
计算出这条消息应该被投递的精确时间。 - 到期投递:
- 如果当前时间 >= 计算出的投递时间,说明消息已到期。
ScheduleMessageService
会还原这条消息:将它的 Topic 和 QueueId 恢复成最初生产者指定的RealTopic
和真实的QueueId
。- 然后,将这条“恢复原貌”的消息重新投递(PutBack) 到
RealTopic
的 CommitLog 中。这个过程对用户是透明的。
- Consumer 消费:消息被转投到
RealTopic
后,就变成了一条普通消息。一直监听着RealTopic
的 Consumer 就能立即拉取到这条消息并进行消费。

1.3 事务消息
- 概念: 保证本地数据库操作和消息发送这两个分布式操作的事务最终一致性。
- 涉及概念:
- 半消息(Half Message):又称为预处理消息(Prepare Message),半消息是一种特殊的消息类型,该状态的消息暂时不能被 Consumer 消费。当一条事务消息被成功投递到 Broker 上,但是 Broker 并没有接收到 Producer 发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
- 消息回查(Message Status Check):由于网络抖动、Producer 重启等原因,可能导致 Producer 向 Broker 发送的二次确认消息没有成功送达。如果 Broker 检测到某条事务消息长时间处于半消息状态,则会主动向 Producer 端发起回查操作,查询该事务消息在 Producer 端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check 主要用来解决分布式事务中的超时问题。
- 实现原理(两阶段提交):
- 发送半消息: Producer 向 Broker 发送一条“对 Consumer 不可见”的消息(状态为
TRAN_MSG
)。 - 执行本地事务: Producer 执行本地数据库事务(如扣减库存、生成订单)。
- 提交或回滚:Producer 根据本地事务执行结果向 Broker 提交二次确认:
- 如果本地事务成功,Producer 向 Broker 提交,这条半消息就会变为“对 Consumer 可见”,可以被消费。
- 如果本地事务失败,Producer 向 Broker 回滚,Broker 会丢弃这条半消息。
- 事务状态回查(Back Check): 如果 Producer 在执行完本地事务后宕机或网络异常,导致 Broker 没有收到二次确认。Broker 会定时主动向 Producer 回查 该条消息的最终状态(Commit or Rollback?)。Producer 需要实现一个接口来检查本地事务的执行结果并返回。
- 场景: 电商场景中,创建订单后发送积分增加消息,要保证订单创建成功和发送积分消息这两个操作的一致性。

2、消息体组成
当你使用
producer.send(msg)
发送一条消息时,你构建的 Message
对象通常包含以下核心属性和扩展属性:核心属性(必填/常用)
属性 | 说明 | 重要性 |
Topic | 主题。消息的一级分类,生产者向指定Topic发送消息,消费者订阅指定Topic接收消息。 | 必填 |
Body | 消息体。实际要传输的二进制数据。可以是任何格式,如JSON、字符串、序列化对象等。 | 必填 |
Tags | 标签。消息的二级分类,用于进一步细化主题。消费者可以基于 Topic + Tag 进行更灵活的订阅。 | 强烈推荐 |
Keys | 消息键。通常设置为消息的唯一标识(如订单ID),用于通过控制台查询、定位消息。 | 推荐 |
系统属性(由系统自动设置或用于控制消息行为)
属性 | 说明 |
Message ID | 消息的唯一ID,由 Broker 服务器在接收消息时自动生成。 |
Queue ID | 该消息所在主题队列的ID。发送时可由用户指定,但通常由默认的负载均衡策略自动选择。 |
Queue Offset | 消息在其所在队列中的偏移量(位置),由 Broker 分配。Message ID + Queue ID + Queue Offset 可以唯一确定一条消息。 |
Born Timestamp | 消息产生的时间戳(Producer 端生成的时间)。 |
Store Timestamp | 消息被 Broker 持久化存储的时间戳。 |
Born Host | 产生消息的 Producer 地址。 |
Store Host | 存储消息的 Broker 地址。 |
Reconsume Times | 消息的重试消费次数。对于顺序消息,如果消费失败,此值会递增。 |
Flag | 一个标志位,由用户自定义,RocketMQ 本身不做处理。可用于一些特殊逻辑标记。 |
事务/延迟消息等特殊属性
属性 | 说明 |
Transaction ID | 事务消息的全局事务ID,用于实现分布式事务。 |
Delay Time Level | 延迟消息的延迟级别。 1 对应 1s ,2 对应 5s ,3 对应 10s ,以此类推,共18个级别。注意:这是级别,不是具体时间。 |
用户自定义属性(Properties)
除了上述系统定义的属性,你还可以通过
putUserProperty
方法为消息添加任意键值对(K-V)的自定义属性。Key 和 Tag 有什么区别
特性 | 消息的Key | 消息的Tag |
主要用途 | 消息在业务层面的唯一标识 | 对Topic下的消息进行分类和过滤(二级分类) |
数量 | 一条消息可以设置多个Key | 一条消息通常只能有一个Tag |
过滤能力 | 不支持在服务端进行过滤消费 | 支持在Broker端进行高效过滤 |
查询功能 | 可通过Key查询消息,用于故障排查 | 主要用于消费端的订阅过滤,不直接用于消息查询 |
索引机制 | Broker会为其创建哈希索引 | 存储在CommitLog中,过滤时对比Tag的哈希码 |
设计层级 | 更像是消息的一个属性 | 被定义为消息的二级类型,与Topic共同构成消息分类体系 |
- 消息 Key:通常设置为消息的业务唯一标识,如订单 ID、流水号等。当出现消息丢失或需要追踪消息状态时,可以通过 Topic 和 Key 在 RocketMQ 控制台查询到具体的消息内容及其消费状态。
- 消息 Tag:用于对 Topic 下的消息进行二级分类和过滤。消费者可以只订阅其关心的特定 Tag 的消息。例如,一个"OrderTopic"可以有不同的 Tag 如"PaySuccess"、"ShipNotify"等,不同消费者群体可以按需订阅。
- 生产者发送消息设定 Tag
- 消费者订阅时,可以指定只消费特定 Tag 的消息:
3、消息使用
3.1 消息发送
RocketMQ 的消息发送有三种模式:
- 同步发送 (Sync) :Producer 发送消息后会阻塞等待,直到收到 Broker 的响应后才发下一条消息。
- 异步发送 (Async):Producer 发送消息后无需等待 Broker 的响应,即可发送下一条消息,用户需要另外实现异步发送回调接口(
SendCallback
)。适用于响应速度要求高,且需要保证可靠性的场景,如交易订单。
- 单向发送 (Oneway):Producer 只负责发送消息,不等待响应且没有回调函数触发,即只发送请求不等待应答。这种方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
另外不同发送方式支持的消息类型有所差别

3.2 消息查询
RocketMQ 提供了多种消息查询方式:
- 按 Message ID 查询:Message Id 是消息发送后,在 Broker 端生成的,其包含了 Broker 的地址和在 CommitLog 中的偏移信息,并会将 Message Id 作为发送结果的一部分进行返回。Broker 解析 Message ID 得到 CommitLog Offset,直接去 CommitLog 读取,速度最快。
- 按 Message Key 查询:Message Key 是业务开发同学在发送消息之前自行指定的,通常使用具有业务含义,区分度高的字段(例如用户 ID、订单 ID),通过 IndexFile 索引找到对应的 CommitLog Offset,然后去读取,速度稍慢。
- 按 Topic 和 Queue Offset 查询:通过 ConsumeQueue 找到对应的 CommitLog Offset,然后去读取。
3.3 消息消费
3.3.1 消费模式
RocketMQ 主要提供两种消费模式:
- 集群模式 (Clustered / Load Balancing)
- 概念:同一个 Consumer Group 下的多个消费者实例共同消费同一个 Topic 的消息。每条消息只会被组内的一个消费者消费。
- 优点:实现负载均衡,横向扩展消费者数量可以提高整体的消费能力。
- 适用场景:适用于所有消费者处理逻辑相同、每条消息只需处理一次的通用场景。例如,订单处理、消息推送等。
- 广播模式 (Broadcasting)
- 概念:同一个 Consumer Group 下的每个消费者实例都会消费到该 Topic 的全部消息。
- 优点:每个消费者都能获取全量消息。
- 缺点:消费者之间没有负载均衡,容易造成资源浪费和处理重复。
- 适用场景:适用于需要每个消费者实例都刷新本地缓存或元数据的特殊场景。例如,刷新本地配置、更新本地数据库缓存等。
可以在消费者代码中,通过
setMessageModel
方法设置。注意事项:
- 广播模式下不支持顺序消息。
- 广播模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,offset本地存储目录为
${user.home}/.rocketmq_offsets
,出现重复的概率稍大于集群模式。
- 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下 MQ 不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
3.3.2 消费类型
RocketMQ 提供了两种消费者 API,对应不同的处理方式:
- PushConsumer (推模式消费者)
- 工作方式:SDK 内部封装了一个长轮询机制,主动从 Broker 拉取消息。当有消息到达时,SDK 会立即回调用户注册的监听器 (
MessageListener
) 来处理消息。对用户而言,感觉像是消息被“推”过来了。 - 优点:实时性高,使用方便,开发者只需关注业务处理逻辑。
- 缺点:SDK 内部采用长轮询,本质仍是拉模式。推送速率由服务端控制,可能会造成消费者压力过大(虽然有限流机制)。
- PullConsumer (拉模式消费者)
- 工作方式:由消费者应用主动调用
pull
方法从 Broker 请求消息。需要自己管理消息偏移量 (offset
),循环拉取消息,并控制拉取的频率。 - 优点:灵活性高,可以完全自主控制消费的节奏、批量大小等。
- 缺点:使用复杂,需要手动管理偏移量,实时性较差。
- 使用示例:
DefaultLitePullConsumer
是 RocketMQ 后期提供的更简洁的 Pull 接口,它支持订阅模式(自动负载均衡)和分配模式(手动指定队列)。
另外 PushConsumer 支持两种消息模式:
- 集群模式 (
CLUSTERING
):默认模式。同一个消费者组内的多个消费者共同消费一条消息,每条消息只被组内的一个消费者消费。适用于负载均衡场景。
- 广播模式 (
BROADCASTING
):同一个消费者组内的每一个消费者都会消费所有的消息。适用于消息需要被所有消费者实例处理的场景,如刷新本地缓存。
使用注意:
- 绝大多数场景下,推荐使用 PushConsumer,因为它更简单、高效且实时。只有在需要高度自定义消费节奏的特殊场景下,才考虑使用 PullConsumer。
- 消费偏移量管理:
- Push 模式和
LitePullConsumer
的订阅模式通常自动管理偏移量(默认提交到 Broker)。 - Pull 模式通常需要手动管理偏移量(例如,配合
consumer.commitSync()
并将偏移量存储在外部系统如 Redis 或数据库中)。
3.3.3 并发消费和顺序消费
1、并发消费
并发消费是 RocketMQ 默认的消费模式,不需要特别配置。顾名思义,在这种模式下,消费者会同时启动多个线程来并行处理一个消息队列(MessageQueue)中的消息。
核心原理
- 线程池模型:消费者端有一个线程池(
ConsumeThreadPool
)。
- 并行拉取:消费者会为每个分配的 MessageQueue 启动一个
PullRequest
任务,从 Broker 拉取消息。
- 并行处理:拉取到的消息会被提交到线程池中,由不同的线程并发处理。这意味着同一个队列中的消息可能被乱序处理。
- 偏移量提交:消费线程处理成功后,会更新消费偏移量(Offset)。由于是并发处理,偏移量的提交也是并发的,可能不是严格连续的(例如,线程 A 处理了
offset=5
的消息,线程B处理了offset=6
的消息,可能B先完成并提交偏移量)。
特点
- 高吞吐:充分利用多核 CPU 优势,最大程度地提高消费速度。
- 低延迟:消息积压时,能快速消费。
- 非顺序性:无法保证消息的处理顺序。
适用场景
适用于对消息处理顺序不敏感,但追求高吞吐量和低延迟的场景。例如:
- 发送通知、短信、邮件。
- 记录日志、用户行为埋点。
- 更新非关键性的缓存数据。
示例
2、顺序消费
顺序消费是指消费者按照消息发送到队列的先后顺序来逐一处理消息。对于一个特定的“顺序主题”,其顺序性是通过将需要保证顺序的消息发送到同一个MessageQueue中来实现的。
核心原理
- 队列选择:生产者通过选择器(Selector) 将同一组需要顺序处理的消息发送到同一个队列。例如,将同一个订单 ID的所有消息(创建、付款、发货)设置相同的分片 key(订单 ID),然后发送到同一个队列。
- 队列锁与本地队列:在消费端,RocketMQ 会为每个 MessageQueue 加锁。
- 串行处理:对于同一个队列,消费者会启动一个线程串行地拉取消息,并提交到线程池中的一个固定线程进行处理。只有前一条消息处理成功(或重试成功后),才会开始处理下一条。这样就严格保证了同一个队列内消息的处理顺序。
- 失败暂停:如果某条消息消费失败,消费者会在这个队列上暂停一会,然后重试这条消息,而不是跳过它去处理后面的消息,这保证了顺序性不被破坏。
特点
- 顺序性:严格保证同一队列内消息的处理顺序(FIFO)。
- 低吞吐:由于是串行处理,吞吐量远低于并发模式。
- 性能瓶颈:如果某个队列的消息量特别大,处理该队列的线程会成为瓶颈。
适用场景
适用于对消息处理顺序有严格要求的业务场景。例如:
- 股票交易订单:创建、撮合、成交、清算必须顺序执行。
- 电商订单流程:下单、付款、发货、收货,状态必须顺序更新。
- 数据库 Binlog 同步:要求数据变更操作的顺序与源端完全一致。
示例
注意:
- 顺序性的范围:顺序消费的“顺序”是指同一个 MessageQueue 内部的顺序。一个 Topic 通常有多个 Queue,所以全局顺序(Global Order)需要将 Topic 配置为只有一个Queue,但这会极大限制吞吐量,通常不推荐。绝大多数业务场景只需要局部顺序(Partial Order),例如保证同一个订单的消息顺序。
- 生产者配合:要实现顺序消费,生产者必须配合,使用
MessageQueueSelector
将顺序相关的消息发送到同一个队列。
- 消费失败:在顺序消费模式下,如果某条消息一直消费失败(达到最大重试次数),不会跳过它去消费下一条,会导致该队列的消费完全卡住,这是一个需要密切关注的风险点。
- 负载均衡:如果顺序消息的分片 key(如订单 ID)分布不均匀,会导致不同队列的压力不均,需要设计良好的选择器算法(如对分片 key 进行哈希)来使负载均衡。
3.4 消息重试和死信队列
RocketMQ 的消息重试分为两大类:生产者发送消息的重试和消费者消费消息的重试。
3.4.1 生产者发送消息的重试
目的:确保消息成功发送到 Broker,避免因为网络抖动、Broker 瞬时故障等原因导致消息丢失。
机制:
- 当生产者调用
send
方法发送消息时,如果遇到异常(如网络异常、超时、RemotingException、MQClientException 等),或者未收到 Broker 的响应,RocketMQ 客户端会自动进行重试。
- 默认重试次数为 2 次(即第一次发送失败后,最多会再尝试发送 2 次,总共 3 次机会)。
- 重试间隔会逐渐变长,避免给 Broker 造成过大压力。
配置:
- 重试次数:可以通过
setRetryTimesWhenSendFailed(int times)
方法设置同步发送的重试次数。异步发送也有对应的setRetryTimesWhenSendAsyncFailed
方法。
- 超时时间:发送消息的超时时间也可以通过
setSendMsgTimeout(int timeout)
来设置,默认为 3秒。
注意:
- 对于事务消息,其提交(commit)阶段的重试机制是无限次的,直到成功或超时(默认1分钟)回滚为止。
- 如果错误是
MQBrokerException
(Broker 返回了明确的错误码,如FLUSH_DISK_TIMEOUT
),默认不会重试,因为 Broker 已经接收并处理了消息,只是处理结果不是成功的。
3.4.2 消费者消费消息的重试
目的:确保消息被消费者成功消费。当消费者消费某条消息失败时(例如,业务逻辑处理失败、数据库异常等),Broker 会重新投递这条消息给消费者进行再次消费。更准确来说当消费者消费消息出现以下情况时,会进行消费重试。
- 返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
- 返回 null
- 抛出抛出异常。
适用消息类型:顺序消息(Orderly) 和 无序消息(Concurrently) 的重试机制有所不同。
1、无序消息(MessageModel.CLUSTERING)的重试
机制:
- 消费者消费消息时,如果抛出异常(或返回
RECONSUME_LATER
),则认为消费失败。
- 消费失败的消息会被发送回 Broker,并存入一个特殊的延迟队列(
%RETRY% + consumerGroup
)。
- Broker 会等待一段时间后再次投递该消息。
- 重试次数逐步增加,相邻两次重试的延迟等级也会提高(延迟时间变长)。RocketMQ 预设了18个延迟等级(1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h)。
- 如果消息重试16次后仍然失败,该消息将不再被投递,会被放入一个 “死信队列”(Dead-Letter Queue, DLQ),其队列名为
%DLQ% + consumerGroup
。死信队列的消息不会再被消费,有效期与正常消息相同,都是 3 天,3 天后会被删除,所以死信队列的消息需要人工干预。
示例代码:
注意:
- 当消费模式为集群模式时才会自动进行重试,广播消息只会以警告日志的形式记录消费失败的消息,不会进行重试的。
- 最大重试次数:默认为 16 次。可以在代码中通过自定义
MessageListener
或在消费时返回Action.ReconsumeLater
来触发重试。
- 处理重试消息:消费者需要保证业务的幂等性。因为同一条消息可能会被多次投递(至少一次投递语义),您的业务逻辑必须能够处理重复消息。
- 一个死信队列包含了一个 Group ID 的所有消息,不管当前消息处于哪个 Topic。重试队列和死信队列只有在需要的时候才会被创建出来,被扔进去消息会生成新的 MessageId。
- 监控死信队列:必须有一个监控机制来处理死信队列中的消息。这些是经过多次重试都无法处理的消息,通常意味着业务上有严重问题,需要人工介入查看日志、修复数据或重新处理。
2、顺序消息(MessageOrderly)的重试
机制:
- 顺序消息的重试是局部阻塞的。
- 如果消费某条消息失败,消息队列会自动在本地进行重试(间隔时间由
suspendCurrentQueueTimeMillis
设置,默认为1000ms),而不是先发回Broker。
- 会一直重试,直到最大重试次数(默认为
Integer.MAX_VALUE
)。
- 这保证了在重试期间,同一个队列(
MessageQueue
)的消息处理是被暂停的,从而避免顺序错乱。
- 如果重试次数超过最大值,也会进入死信队列。
注意:
- 最大重试次数:
consumer.setMaxReconsumeTimes(int maxReconsumeTimes);
- 顺序消息的重试会阻塞后面的消息消费,所以当使用顺序消息的时候,监控一定要做好,避免后续消息被阻塞。
3.5 偏移量管理
偏移量 (
offset
) 是标记消费者消费进度的重要指标。RocketMQ 提供两种管理方式:- 远程位点管理 (Broker 管理)
- 消费者默认将消费进度(偏移量)持久化到 Broker 服务器。
- 消费者集群模式下,由 Broker 来均衡分配队列并管理偏移量,非常方便可靠。这是最常用的方式。
- 本地位点管理 (Local)
- 消费者将消费进度存储在自己的本地文件中(如磁盘)。
- 主要用于广播模式,因为每个消费者都要消费全量消息,各自的消费进度彼此独立。
3.6 消息过滤
消费者可以通过以下两种方式对消息进行过滤:
- Tag 过滤: 最常用和高效的方式。在订阅时指定 Tag,Broker 端的
ConsumeQueue
中存储了 Tag 的哈希码,过滤在 Broker 端完成,效率很高。语法:topic:tag
(如OrderTopic:TagA
),支持||
操作(如OrderTopic:TagA || TagB
)。
- SQL92 过滤: 通过消息的用户属性(User Properties) 进行过滤。生产者发送消息时可以设置自定义的属性(Key-Value),消费者订阅时使用 SQL92 语法来过滤。例如:
a between 0 and 3
。这种方式过滤在 Broker 端完成,但性能开销比 Tag 过滤大,因为需要解析 SQL 并比对消息体外的属性。
3.7 批量消息
3.7.1 批量发送消息
批量发送消息是将多条消息打包后一次性发送到 Broker,用于减少网络 IO 次数。批量发送消息的特点:
- 主题一致:同一批次的所有消息必须拥有相同的 Topic 。
- 大小限制:一批消息的总大小不得超过 4MiB 。这是最需要关注的限制。
- 不支持延迟消息:批量消息不能设置为延迟消息
使用示例:
当需要发送的消息集合可能超过 4MiB 时,需要进行拆分。例如我们可以自定义一个
ListSplitter
工具进行消息拆分:3.7.2 批量消费消息
批量消费消息是指 RocketMQ 支持将多条消息一次性投递给消费者进行处理。在消费者端通过
DefaultMQPushConsumer
设置批量消费的参数。当满足以下两个条件其中一个,缓存的消息就会被提交给消费线程 :
- 缓存的消息数量达到
ConsumeMessageBatchMaxSize
设置的值。
- 等待时间达到了设定的最大等待时长。
- Author:mcbilla
- URL:http://mcbilla.com/article/27785c7d-7c1d-8078-b2fd-d8a50137551d
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!