type
status
date
slug
summary
tags
category
password

1、概述

RocketMQ 的存储设计非常精妙,它采用了 “CommitLog + 索引队列(ConsumeQueue/IndexFile)” 的结构。
存储组件
核心职责
物理路径示例
对应关系
CommitLog
存储所有消息的真实内容(消息体+元数据),是消息的唯一物理存储
../store/commitlog/
所有 Topic 的所有 Queue 的消息都混合写入同一个 CommitLog。
ConsumeQueue
作为逻辑索引,存储指定Topic下特定消息队列(Queue)中消息在 CommitLog 的定位信息,提供快速随机读,方便消费者按队列拉取消息。
../store/consumequeue/{TopicName}/{QueueId}/
每个 Topic 的每个 QueueId 都有自己独立的 ConsumeQueue 文件
IndexFile
提供基于消息Key(Key或Message ID) 或时间区间查询消息的能力。
../store/index/
辅助文件
用于维护 Broker 状态和元数据。 - checkpoint:记录CommitLog、ConsumeQueue、IndexFile最后一次刷盘的时间戳。 - abort:Broker启动时创建,正常关闭时删除。若异常关闭,此文件会保留,用于故障恢复判断。
../store/
一个完整的目录结构如下所示:
notion image
  • abort:该文件在 Broker 启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭。
  • checkpoint:存储 CommitLog、ConsumeQueue、IndexFile 的最后刷盘时间戳
  • commitlog:存放 CommitLog 文件,即存放所有物理消息。
  • config:存放 Broker 运行期间的一些配置数据
  • consumequeue:存放 ConsumeQueue 文件
  • index:存放 IndexFile 文件
  • lock:运行期间使用到的全局资源锁

2、CommitLog

CommitLog 是 RocketMQ 的核心存储文件,它存储了所有主题、所有队列的原始消息内容。CommitLog 的特点:
  • 物理位置位于 ${ROCKETMQ_HOME}/store/commitlog/ 目录下。
  • CommitLog 的存储其实是分多层的, CommitLog -> MappedFileQueue -> MappedFile,其中真正存储数据的是MappedFile。CommitLog 目录中存放着很多的 MappedFile 文件,当前 Broker 中的所有消息都是落盘到这些MappedFile 文件中的。
  • 所有消息都严格按照顺序追加(Append) 写入当前活跃的 MappedFile 文件。每个文件默认固定为 1GB(可配置,mapedFileSizeCommitLog),写满后自动生成一个新的文件。
  • 文件名由 20 位十进制数构成,是当前文件的第一条消息的起始位移偏移量
  • 单个 Broker 实例下的所有 Queue 共用一个日志数据文件 CommitLog,也就是说所有 Topic 的所有 Queue 的消息都混合写入同一个 CommitLog。
为什么不采用 Kafka 的设计,不同的 Partition 分别存储在一个独立的物理文件呢?
在 Kafka 的设计中,一旦 Kafka 中 Topic 的 Partition 数量过多,队列文件会过多,那么会给磁盘的 IO 读写造成比较大的压力,也就造成了性能瓶颈。所以 RocketMQ 进行了优化,消息主题统一存储在 CommitLog 中。当然它也有它的优缺点。
  • 优点:由于消息主题都是通过 CommitLog 来进行读写,ConsumerQueue 中只存储很少的数据,所以队列更加轻量化。对于磁盘的访问是串行化从而避免了磁盘的竞争。
  • 缺点:消息写入磁盘虽然是基于顺序写,但是读的过程确实是随机的。读取一条消息会先读取 ConsumeQueue,再读 CommitLog,会降低消息读的效率。

2.1 CommitLog的存储格式

每条消息在 CommitLog 中不仅存储了业务发送的 Body,还包含 RocketMQ 系统所需的属性。其结构大致如下:
字段
长度
说明
消息总长度
4字节
整个消息的长度
Magic Code
4字节
固定值,用于标识消息版本等
Body CRC
4字节
消息体的 CRC 校验码,用于检测数据损坏
队列ID
4字节
消息所属的队列 ID
主题长度
1字节
主题名称的长度
主题
可变
主题名称
消息Flag
4字节
系统标志,如是否压缩、事务消息等
扩展属性长度
2字节
扩展属性(Properties)的长度
扩展属性
可变
键值对形式的消息属性,如 KEYS, TAGS, WAIT 等
消息体长度
4字节
消息实际内容(Body)的长度
消息体
可变
生产者发送的原始消息内容

2.2 CommitLog的清理

Rocketmq 清理消息是以 CommitLog 文件为单位进行清理的,除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:
  • CommitLog 文件过期(默认过期时间为 72 小时),且到达清理时间点(默认为凌晨 4 点)后,自动清理过期文件。
  • CommitLog 文件过期,且磁盘空间占用率已达过期清理警戒线(默认 75%)后,无论是否达到清理时间点,都会自动清理过期文件。
  • 磁盘占用率达到清理警戒线(默认 85%)后,开始按照设定好的规则清理文件,无论是否过期。默认会从最老的文件开始清理。
  • 磁盘占用率达到系统危险警戒线(默认 90%)后,Broker 将拒绝消息写入。

3、ConsumeQueue

ConsumeQueue 是 CommitLog 的逻辑索引文件每个 Topic 的每个 Queue 都会单独维护一个 ConsumeQueue 文件。因为所有 Topic 的所有 Queue 都会混合写入同一个 CommitLog 文件,ConsumeQueue 的作用就是提供快速随机读,在 CommitLog 文件中快速定位到消息。
ConsumeQueue 的工作流程:消费者拉取消息时,先查询 ConsumeQueue 这个“轻量级索引”,得到消息在 CommitLog 中的物理位置,然后通过 mmap 的方式直接从 CommitLog 中批量读取消息内容,极大地提升了消费端的读取效率。
ConsumeQueue 的特点:
  • 物理文件:位于 ${ROCKETMQ_HOME}/store/consumequeue/{Topic}/{QueueId}/ 目录下。
  • 文件大小:每个文件约包含 30万个条目,固定大小(约5.72MB)。
  • 异步构建:ConsumeQueue 中的数据是由后台线程 ReputMessageService 异步地从 CommitLog 中提取并构建的。

3.1 ConsumeQueue的索引格式

ConsumeQueue 中的每个条目非常精简,固定为 20个字节,包含三个关键信息:
字段
长度
说明
CommitLog Offset
8字节
该消息在 CommitLog 文件中的起始物理偏移量
Size
4字节
该消息在 CommitLog 中占用的总字节数
Message Tag HashCode
8字节
消息 Tag 的哈希值,用于 Tag 过滤

3.2 Offset的管理

RocketMQ 中消息 ConsumerOffset 用于表示消费者的消费进度,根据消费模式的不同,ConsumerOffset 也有不同的存储方式:
  • 集群模式(默认):
    • 管理方:Broker 集中管理和存储
    • 存储格式:Offset 相关数据以 JSON 的形式持久化到 Broker 磁盘文件中,文件路径为当前用户主目录下的 store/config/consumerOffset.json
  • 广播模式:
    • 管理方:Consumer 本地维护一份 Offset
    • 存储格式:Offset 相关数据以 JSON 的形式持久化到Consumer本地磁盘文件中,默认文件路径为当前用户主目录下的 .rocketmq_offsets/clientId/{clientId}/clientId/{group}/Offsets.json

3.3 基于Offset的查找过程(消费者拉取消息

  1. 消费者向 Broker 发送拉取请求,携带 TopicQueueIdConsumerOffset
  1. Broker 根据这些信息找到对应的 ConsumeQueue 文件。
  1. 从 ConsumerOffset 位置开始,读取一批(如 32条)20字节的条目。ConsumeQueue 的索引 Key 是消费位点Offset,是个从 1 开始递增的数字,通过位点对 ConsumeQueue 文件大小(30万)取余,可以定位到指定的 ConsumeQueue 文件以及在该文件中的数据块。因为数据块大小固定,所以可以算出对应的数据块在 CommitLog 文件中的偏移量 CommitLog Offset
  1. 根据条目中的 CommitLog Offset 和 Size,到 CommitLog 中批量读取完整的消息内容。
  1. 将完整的消息返回给消费者。

4、IndexFile

IndexFile 提供了另一种消息索引方式——基于 Key 的查询。主要用于 RocketMQ 控制台的消息查询功能。
IndexFile 的特点:
  • 物理文件:位于 ${ROCKETMQ_HOME}/store/index/ 目录下,文件名以创建时间戳命名。
  • 文件大小:每个文件大小固定(约400MB)。
  • 存储结构:采用 “哈希索引 + 文件顺序写” 的混合结构,类似 LevelDB 的 SSTable。
  • 支持按 Message Key/UniqKey 查询:这是 ConsumeQueue 无法直接提供的功能。生产者在发送消息时可以设置 KEYS 属性(如订单ID),后续可以通过这个 Key 快速定位消息。
  • 查询效率高:通过哈希计算直接定位到哈希槽,再遍历短链表,可以快速找到目标消息的 CommitLog 偏移量。

4.1 IndexFile的存储格式

IndexFile 结构更复杂,主要包含三部分:
  1. Header(文件头):存储一些元数据,如索引消息的起始/结束时间、起始/结束偏移量等。
  1. Hash Slot(哈希槽):固定数量的槽(默认5百万个),每个槽存放一个指针,指向该哈希值对应的索引条目链表的头部。
  1. Index Entry(索引条目):存储具体的索引信息,每个条目包含:
      • keyHash: 消息 Key(或 Topic + “#” + UniqKey)的哈希值。
      • phyOffset: 消息在 CommitLog 中的物理偏移量。
      • timeDiff: 该消息存储时间与文件头起始时间的差值。
      • prevIndex: 同一个哈希槽中,前一个索引条目的位置。用于解决哈希冲突,形成链表。
notion image

4.2 工作流程(按 Key 查询)

  1. 计算查询 Key 的哈希值。
  1. 对哈希槽数量取模,找到对应的 Hash Slot。
  1. 读取该 Slot 中的值,获得第一个 Index Entry 的位置。
  1. 遍历该位置的 Index Entry 链表,比较 keyHash 是否匹配。
  1. 匹配成功后,根据 phyOffset 去 CommitLog 读取完整消息。
RocketMQ系列:集群管理RocketMQ系列:消息管理
Loading...