type
status
date
slug
summary
tags
category
password

1、概述

Kafka Consumer 是 Apache Kafka 中用于从 Kafka 集群读取数据的客户端组件。它订阅一个或多个主题(Topic),并从这些主题的分区(Partition)中读取消息。
Consumer 的 Java API 的简单使用如下:
  • bootstrap.servers: Kafka 集群地址
  • group.id: 消费者组ID
  • enable.auto.commit: 是否自动提交偏移量(默认true)
  • auto.commit.interval.ms: 自动提交间隔(默认5000ms)
  • auto.offset.reset: 当没有初始偏移量或偏移量无效时的策略(earliest/latest/none)
  • key.deserializer/value.deserializer: 键值反序列化器
Consumer 涉及到以下核心特性:
  • 消费者组(Consumer Group):一组消费者共同消费一个主题,Kafka 确保每条消息只被组内的一个消费者处理。
  • 分区分配策略(Partition Assignment):消费者组内的消费者如何分配分区(Range、RoundRobin、Sticky等策略)。
  • 消费者连接 Broker:消费者连接到分配到的分区所在的 Broker 上。
  • 消费并记录偏移量(Offset):使用内部主题 __consumer_offsets 记录消费者组在每个分区中消费的位置。
  • 提交偏移量(Commit Offset):消费者将已处理的偏移量提交回 Kafka
  • 分区重分配(Rebalance):当消费者组有 Broker 加入或退出时,会触发分区的重分配。

2、消费者组(Consumer Group)

Consumer Group 我们前面已经介绍过很多次了。用一句话概括就是:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
Consumer Group 的特点:
  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  1. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  1. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。
  1. Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。
  1. 理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。如果 Consumer 实例数少于分区总数,某个 Consumer 实例会被分配多个分区进行消费。同一个 Consumer Group 内的多个 Consumer 会均衡分配分区(涉及到第 3 点分区分配策略)例如:3 个分区的 Topic 由 2 个 Consumer 消费:
      • Consumer1 可能分配 P0 和 P1(连接 Broker1 和 Broker2)。
      • Consumer2 分配 P2(连接 Broker3)。
  1. 不推荐设置大于总分区数的 Consumer 实例,多余的实例将不会被分配任何分区,只会浪费资源。

3、分区分配策略

4、消费者连接Broker

我们知道 Topic 通过分区机制把消息分散到多个 Broker 上存储,那么 Consumer 是怎么确定消费的消息在哪台 Broker 上呢?答案是 Consumer 通过分区 Leader 机制元数据查询动态定位消息所在的 Broker。
  1. Consumer 启动时连接 Bootstrap Broker:Consumer 配置中需要指定 bootstrap.servers(例如 kafka1:9092,kafka2:9092),这只是初始连接点,用于获取集群元数据(Metadata),例如 Topic 分区分布、Leader Broker 地址等。
  1. 获取元数据(Metadata):Consumer 向 Bootstrap Broker 发送 Metadata 请求,获取:
    1. Topic 的所有分区(Partitions)。
    2. 每个分区的 Leader Broker(负责该分区读写的主节点)。
    3. 分区的副本(Replicas)分布。
    4. 例如:Topic orders 有 3 个分区:
      • P0 Leader → Broker1
      • P1 Leader → Broker2
      • P2 Leader → Broker3
  1. 直接连接分区 Leader Broker:Consumer 根据元数据,直接连接到目标分区的 Leader Broker 消费消息。注意Consumer 不会通过其他 Broker 代理请求,而是直连 Leader。
    1. 例如:Consumer 订阅了 orders Topic,且分配到 P0 和 P1:
      • 从 Broker1 读取 P0 的数据。
      • 从 Broker2 读取 P1 的数据。
  1. 动态感知集群变化:如果分区 Leader 发生变更(如 Broker 宕机或重新选举),Consumer 会:
    1. 收到 NOT_LEADER_FOR_PARTITION 错误。
    2. 重新请求元数据,获取新的 Leader Broker 地址。
    3. 切换到新的 Leader 继续消费。

5、消费并记录偏移量(Offset)

Kafka 中的位移(Offset)是消息在分区中的唯一标识,用来记录消费者读取消息的位置。Offset 的特性:
  1. 分区级别唯一性:偏移量在每个分区内是有序且唯一的
  1. 顺序递增:新消息的偏移量总是比前一条消息大
  1. 不可变性:一旦消息被写入分区,其偏移量就固定不变
对于 Consumer Group 而言,Offset 是一组 Key-Value 对:
  • Key 对应分区
  • Value 对应 Consumer 消费该分区的最新位移
如果用 Java 来表示的话,可以认为是 Map<TopicPartition, Long> 的数据结构,其中 TopicPartition 表示一个分区,而 Long 表示位移的类型。
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能。
Kafka 从 0.9 版本开始,开始将位移保存在 Kafka 的内部主题 __consumer_offsets。这是一种特殊的内部主题,用来记录每个消费者组在每个分区消费的位置,当消费者提交偏移量时,这些信息会被写入此主题。
_consumer_offsets 主题的特点:
  • 用户可以手动地创建它、修改它,甚至是删除它,但是不能向这个主题写入消息。因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。我们通常情况下并不需要管这个主题,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。
  • 当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。位移主题的分区数由 Broker 端参数 offsets.topic.num.partitions 确定,默认是 50。副本数则由 Broker 端另一个参数 offsets.topic.replication.factor 决定,默认值是 3。因此 Kafka 会自动创建一个 50 个分区的位移主题,副本数是 3。在 Kafka 的数据目录下会出现以下文件夹:
位移主题的每条消息格式大致如图所示:
notion image
可以想象成一个 Key-Value 对格式的消息:
  • Key 是一个三元组:group.id + topic + partition
  • Value 就是 offset 的值。
Kafka 对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将消费者组分散到不同的位移主题分区上。
如何确认消费者组在哪个 __consumer_offsets
答案是 Math.abs(groupID.hashCode()) % numPartitions
如果查找消费者组在位移分区数中的偏移量 offset?
先通过上面计算方式确认好消费者组所在的分区,假如szz-group消费组的偏移量信息存放在 __consumer_offsets_32中,执行以下命令
输出如下:
notion image
:: 前面是 key,由 消费组+Topic+分区数 确定;后面是 value,包含了消费组的偏移量信息等等

6、提交偏移量(Commit Offset)

上面提到消费者在提交位移时会写入位移主题。目前消费者提交位移有两种方式:
  • 自动提交偏移量
  • 手动提交偏移量

6.1 自动提交

默认方式,由消费者客户端定期自动提交位移。Consumer 端有个参数 enable.auto.commit,如果设置为 true(默认为 true),Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。提交间隔由参数 auto.commit.interval.ms 来控制,默认是 5s。
自动提交偏移量的优点:
  • 简单省事,不需要担心位移提交的问题
自动提交偏移量的缺点:
  • 可能会导致重复消费。在默认情况下,Consumer 每 5 秒自动提交一次位移。如果在提交间隔内消费者发生故障,消费者在重启后会重新拉取已消费但提交位移失败的消息进行消费,可能会导致重复消费的问题。
  • 即使当前位移主题没有消息可以消费了,位移主题中还是会不停地写入最新位移的消息。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。Kafka 使用 Compact 策略来删除位移主题中的过期消息。Kafka 提供了专门的后台线程 Log Cleaner 定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
    • notion image
 

6.2 手动提交

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。我们可以把 enable.auto.commit 设置为 false,然后采用手动提交位移的方式来提交位移。
一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。处理完了 poll() 方法返回的所有消息之后再提交位移,否则有可能出现消息丢失(消息没有处理完或出现异常)。
Kafka Consumer API 主要提供了两种手动提交位移的方式:

6.2.1 同步提交API

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,失败了会自动重试。例如下面的代码:
同步提交位移的缺点:阻塞调用,会降低吞吐量(但提交失败会抛出异常)。

6.2.2 异步提交API

调用 commitAsync() 之后,会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。例如下面的代码:
异步提交位移的缺点:提交失败不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义。

6.2.3 同步+异步提交

如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:
  • 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  • 另外我们不希望程序总处于阻塞状态,影响 TPS。

6.2.4 批量提交

上面提到的手动位移提交,都是提交 poll 方法返回的所有消息的位移,比如 poll 方法一次返回了500条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。简单来说,就是直接提交最新一条消息的位移
但如果我想更加细粒度化地提交位移,该怎么办呢?设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。例如我们希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
这时候就需要用到下面这两个批量提交的方法:
  • commitSync(Map<TopicPartition, OffsetAndMetadata>) 
  •  commitAsync(Map<TopicPartition, OffsetAndMetadata>)
这两个 API 可以实现批量提交一组位移信息,而不是一个个提交。以一个具体例子看,下面的代码实现了每 100 条消息提交一次:

7、Rebalance

Kafka 的 Rebalance 是指消费者组内消费者实例重新分配分区订阅关系的过程。Rebalance 会引发以下问题:
  • 严重影响系统的可用性:一旦 Rebalance 开始,所有 Consumer 实例就会停止消费。
  • 数据重复消费:Rebalance可能导致已处理未提交的偏移量被重新消费
在生产环境中我们应该要尽量避免发生 Rebalance。

7.1 Rebalance的触发条件

要避免 Rebalance,首先要知道 Rebalance 发生的时机,Rebalance 的触发条件有 3 个:
  1. 组成员数发生变化。比如有新的 Consumer 实例加入组或者离开组,或者是有 Consumer 实例崩溃被“踢出”组。
  1. 订阅主题数发生变化。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该Group订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  1. 订阅主题的分区数发生变化。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
后面两个通常都是运维操作,一般难以避免,这里主要说说第 1 个时机该如何避免。一般碰到的 Rebalance 绝大部分都是第一个时机下发生的
Consumer 实例增加的情况很好理解,当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Group 添加一个实例。此时,Coordinator 会接纳这个新实例,将其加入组中,并重新分配分区。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要Rebalance”。
我们更在意的是 Group 下实例数减少的情况,例如
  • 心跳超时:消费者无法在session.timeout.ms内发送心跳
  • 消费处理超时:消费者无法在max.poll.interval.ms内完成消息处理
这些情况下,Consumer 实例会被 Coordinator 错误地认为“已停止”,从而被“踢出” Group,从而导致 Rebalance。

7.2 Rebalance的过程

每个消费者组都有一个 Coordinator 的角色。Rebalance 由 所有消费者实例共同参与,在 Coordinator 的帮助下完成订阅分区的分配。Coordinator 也是一个 Broker,负责执行 Rebalance、消费者组的注册、成员管理记录等元数据的管理操作。Consumer 在提交位移时,其实是向 Coordinator 所在 Broker 提交位移。同样地,当 Consumer 启动时,也是向 Coordinator 所在 Broker 发送请求。
Rebalance 的整体过程如下:
  1. 选举 Group Coordinator:Consumer Group 确定 Coordinator 所在 Broker 的算法有两步:
    1. 确定由位移主体的哪个分区来保存 Group 数据:
      1. 找出该分区 Leader 副本所在 Broker,该 Broker 即为 Coordinator。
    1. 选举 Consumer Leader:组内选出一个消费者作为 Leader
    1. SyncGroup 阶段:消费者发送 JoinGroup 请求,Leader 负责分配分区方案
    1. 分区分配:按照指定策略(如Range、RoundRobin等)分配分区
    1. 状态同步:所有消费者获取自己的分配结果
    Coordinator 和 Controller 的区别是什么?
    Kafka 里面 Coordinator 和 Controller 虽然都是 Broker 来担任的角色,但是两个不同的概念:
    • Controller 是针对于整个集群的,用于管理整个集群的元数据和状态,整个集群中只有一个 Broker 充当 Controller 的角色。
    • Coordinator 是针对 Consumer Group 的,每个 Consumer Group 都有一个 Coordinator。所在 Broker 启动时,都会创建和开启相应 Coordinator 组件,也就是说,所有 Broker 都有各自的 Coordinator 组件。Kafka 会为每个 Consumer Group 从 Broker 集群中选一个作为 Coordinator。

    7.3 如何避免Rebalance

    Consumer 提供了 3 个参数,都会影响 Rebalance 的发生:
    • session.timeout.ms,默认值 10 秒,即如果 Coordinator 没在 10 秒内收到 Group 下某 Consumer 实例的心跳,就会认为这个实例已挂,从而将其移出 Group,开启新一轮 Rebalance。可以说,这个参数决定了 Consumer 存活性的时间间隔。
    • heartbeat.interval.ms,这个值越小,Consumer 实例发送心跳请求的频率越高。但是频繁地发送心跳,会额外消耗带宽资源,好处是能更快知晓当前是否开启 Rebalance,因为 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
    • max.poll.interval.ms,限定了两次 poll 调用的最大时间间隔,默认值5分钟。表示你的 Consumer 程序如果无法在 5min 内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 就会开启新一轮 Rebalance。
    上面说的参数配置,对应使用过程中可能出现的两类“不必要的”Rebalance情况:
    第一类,因为未及时发送心跳,导致 Consumer 被“踢出” Group 而引发的。所以你需要仔细设置前面两个参数,这里有最佳实践,即 session.timeout.ms=6sheartbeat.interval.ms=2s,要保证实例被判定“dead”之前,能够发送至少 3 轮心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms
    第二类,Consumer 消费时间过长导致的 Rebalance。如某个用户,在他们的场景中,要将消息处理后的结果写入 MongoDB,这里 MongoDB 的一丁点不稳定,都会导致 Consumer 程序消费时长的增加。最好将这个参数设置成比你下游最大处理时长大一点。总之,你要为你的业务处理逻辑留下充足的时间。如果消费时间过慢,超过 max.poll.interval.ms设置的值(默认5分钟),未进行 poll 拉取消息,则会导致客户端主动离开队列,而引发 Rebalance。
    这种情况可以通过 Kafka 运维平台的 Topic 流出流量排查原因,如果是每五分钟有一个流出的尖峰流量,表示消费端无法在 5 分钟内完成 poll 拉取的消息,那么就是两次 poll 拉取时间超过了五分钟,这种情况不仅会导致 Rebalance,还会导致重复消费的情况。
    notion image
    为了避免 Rebalance,可以进行下面的参数调整:
    • session.timeout.ms:Consumer 与 Coordinator 之间的会话超时时间可适当提高该参数值,需要大于消费一批数据的时间,但不要超过 30s,建议设置为 25s;而 0.10.2 及其之后的版本,保持默认值 10s 即可。
    • heartbeat.interval.ms :Consumer 向 Coordinator 发送心跳的时间间隔,保证实例被判定“dead”之前,能够发送至少 3 轮心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms,例如 session.timeout.ms 被设置为 10s,heartbeat.interval.ms=2s 设置为 3s。
    • max.poll.records:Consumer 一次从 Broker 中拉取的最大消息数,默认值为 500。如果消费过慢,可以适当降低该参数值。
    • max.poll.interval.ms: 两次 poll 调用的最大时间间隔,默认 5 分钟,如果消费过慢,可以适当调大该参数值。
    • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
    • 减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过 5 个,建议一个 Group 只订阅一个 Topic。
    • 观察 Consumer 端的 GC 情况,比如是否出现频繁的 Full GC,从而引发 Rebalance,这也是一种常见现象。
    Kafka系列:生产者管理(拦截器、序列化器、分区器、精确一次性语义)Kafka系列:集群管理(Zookeeper、Controller)
    Loading...