Java-201 RabbitMQ 消息落盘与队列索引详解

TL;DR

  • 场景:想搞清 RabbitMQ 消息到底落在哪、为什么磁盘涨、为什么内存爆、参数怎么调。
  • 结论:核心在 queue index(.idx)+ msg_store(.rdq)+ ETS 映射 + 垃圾回收/合并策略的协同。
  • 产出:一套可复用的存储心智模型 + 关键目录/文件/参数定位点 + 常见故障速查卡。

RabbitMQ 架构

数据存储

RabbitMQ 消息根据持久性可分为两种类型:

1. 持久化消息(Persistent Messages)

  • 消息到达队列时会立即被写入磁盘
  • 同时会在内存中保留一份副本以提高读取性能
  • 当内存资源紧张时,内存中的副本会被清除,仅保留磁盘存储
  • 适用于关键业务数据,如订单处理、支付记录等

2. 非持久化消息(Transient Messages)

  • 默认仅存储在内存中
  • 当内存压力达到阈值时(如内存使用超过40%),会被临时转储到磁盘
  • 重启服务后消息会丢失
  • 适用于实时性要求高但允许丢失的数据

RabbitMQ 的存储架构包含两个核心组件:

1. 队列索引(Queue Index)

  • 使用B+树结构存储消息的元数据
  • 记录消息在存储文件中的位置信息
  • 默认存储位置:/var/lib/rabbitmq/mnesia/队列名称.idx

2. 消息存储(Message Store)

  • 实际存储消息内容的文件
  • 采用顺序写入方式提高IO性能
  • 支持消息合并和垃圾回收机制
  • 默认存储位置:/var/lib/rabbitmq/mnesia/msg_store_persistent

队列索引

rabbit_queue_index,索引维护队列的落盘消息的信息,如存储地点,是否已经被消费者接收、是否已经被消费ACK等等,每个队列都有相应的索引。

  • 索引使用顺序的段文件来存储,后缀为 .idx,文件名从0开始累加
  • 每个段中包含固定的 segment_entry_count 条记录,默认值是 16384
  • 每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件
  • 设置 queue_index_embed_msgs_below 值的时候要格外注意,一点点增大也可能会导致内存爆炸式的增长

消息存储

RabbitMQ 的消息存储机制采用 rabbit_msg_store 模块实现:

1. 存储架构

  • 采用键值对(Key-Value)的存储形式,消息内容以二进制形式写入文件
  • 每个虚拟主机(vhost)拥有独立的存储空间,所有队列共享同一存储区域
  • 集群环境下,每个节点维护自己的消息存储实例

2. 存储类型划分

  • 持久化存储(msg_store_persistent):用于存储声明为持久化(durable)的消息,写入流程:消息先写入内存缓冲区,然后通过 fsync 操作持久化到磁盘
  • 短暂存储(msg_store_transient):用于存储非持久化消息,采用内存映射文件(mmap)方式存储

3. 存储管理特性

  • 采用文件预分配策略,默认单个存储文件大小为 16MB(可配置)
  • 通过垃圾回收机制定期合并碎片文件
  • 持久化存储使用同步写入策略确保数据安全,而短暂存储采用异步写入提高吞吐量

store使用文件来存储,后缀为 .rdq,经过store处理的所有消息都会以追加的方式写入到该文件中。当该文件的大小超过指定的限制(file_size_limit)后,将会关闭文件并创建一个新的文件以供新消息写入。文件名从0开始进行累加。

消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在 index 中,而较大的消息存在 store 中,这个消息大小界定可以通过 queue_index_embed_msgs_below 来配置,默认值为4096B。

配置示例(rabbitmq.conf):

## Size in bytes below which to embed messages in the queue index.
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
# queue_index_embed_msgs_below = 4kb

如果消息小于这个值,就在索引中存储,如果消息大于这个值就在 store 中存储,大于这个值的消息存储于 msg_store_persistent 目录中的 .rdq 文件中。

读取消息的时候,先根据消息ID(msg_id)找到对应存储的文件。如果文件存在并且未被锁定,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由 store 进行处理。

删除消息的时候,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。

当一个文件中都是垃圾数据的时候可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值 garbage_fraction(默认值0.5)时,才会触发垃圾回收机制,将这两个文件合并。

合并逻辑如下:

  • 锁定这两个文件
  • 先整理前面的文件的有效数据,再整理后面的文件的有效数据
  • 将后面文件的有效数据写入到前面的文件中
  • 更新消息的 ETS 表中的记录
  • 删除后面的文件

队列结构

通常队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成。rabbit_amqqueue_process 负责协议相关的消息处理,即接受生产者发布的消息,向消费者交付消息,处理消息的确认等。backing_queue 是消息存储的具体形式和引擎。

rabbit_variable_queue.erl 源码中详细定义了 RabbitMQ 可变队列(Variable Queue)的4种消息存储状态:

1. alpha状态

  • 存储方式:消息索引和消息内容都完全存储在内存中
  • 特点:提供最快的访问速度
  • 资源消耗:内存占用最高,但CPU消耗最少

2. beta状态

  • 存储方式:消息索引保留在内存中,消息内容存储在磁盘上
  • 特点:内存使用量适中,访问速度较快

3. gama状态

  • 存储方式:消息索引同时在内存和磁盘上保存,消息内容存储在磁盘上
  • 特点:内存占用进一步减少

4. delta状态

  • 存储方式:消息索引和内容都完全存储在磁盘上
  • 特点:内存占用最低

这些状态之间会根据消息的访问频率和系统资源情况动态转换。

对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现 rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。

错误速查

症状根因修复
重启后队列消息丢失消息/队列未做持久化明确关键链路强制 durable + persistent message
磁盘占用持续增长,删除/消费后不下降删除是”标记+ETS/索引更新”,需满足GC条件才回收调整 garbage_fraction;低峰执行压缩整理
内存突然飙升甚至OOMqueue_index_embed_msgs_below 过大导致更多消息”嵌入索引”降低 embed 阈值或按队列类型分层
承载吞吐明显下降,I/O等待高持久化写入的fsync/刷盘开销分散持久化队列到不同磁盘/节点
单条大消息导致性能抖动大消息进入store引发更重的磁盘路径业务侧做消息瘦身;必要时下调embed阈值
消费端表现为”队列不空但拉取慢/抖动”消息在Q1/Q2/Delta/Q3/Q4间迁移,冷消息在磁盘态提升消费者并发或批量ACK;热点与冷队列隔离