type
status
date
slug
summary
tags
category
password

1、概述

生产者(Producer)是向 Kafka 集群发送消息的客户端应用程序,主要功能是将消息发布到 Kafka 主题(Topic)的特定分区(Partition)。
Producer 的 Java API 的简单使用如下:
整体工作流程如下:
  1. 消息创建:创建 ProducerRecord 对象,包含目标主题、可选的分区/键和值
  1. 序列化:键和值被序列化为字节数组
  1. 分区:确定消息应该发送到哪个分区
  1. 批次处理:消息被添加到对应分区的批次中
  1. 发送:独立的发送线程将准备好的批次发送到 Kafka 集群
  1. 确认:等待 Broker 确认并根据配置处理响应
关键参数:
参数
说明
默认值
bootstrap.servers
Kafka集群地址列表
key.serializer
键序列化器
value.serializer
值序列化器
acks
消息确认机制(0、1、all)
1
retries
失败重试次数
0
batch.size
批次大小(字节)
16384
linger.ms
批次等待时间(毫秒)
0
buffer.memory
生产者内存缓冲区大小
33554432
max.block.ms
生产者阻塞最大时间
60000
compression.type
压缩类型(none、gzip、snappy、lz4、zstd)
none
Producer 在调用 send 方法后把消息发往 Broker 的过程中,消息有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 Broker。涉及到以下核心知识点:
  • Producer API:提供发送消息的接口
  • 拦截器(Interceptor):在消息发送前或回调之前对消息做定制化操作,例如过滤消息、修改消息等,可以指定多个拦截器以形成拦截链,非必需。
  • 序列化器(Serializer):将消息键和值转换为字节数组,必须。
  • 分区器(Partitioner):决定消息发送到哪个分区,非必需。如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器,否则就会使用默认的分区器或者指定分区器。
  • 消息可靠性保证:通过 ACK 机制和一次性语义(exactly once)保证消息交付的可靠性。

2、Producer API

Producer API 有三种发送模式:
  1. 同步发送:用户调用 send 之后返回一个 Future 类型的结果,Future 的 get 方法会一直阻塞直到第一条响应到达后,才会请求第二条。
    1. 异步发送:异步发送但不理会发送结果,性能最高,但是存在消息发送失败导致消息丢失的风险。
      1. 异步发送带回调:用户调用 send 之返回一个 Future 类型的结果,这时候用户线程就调用结束了,由后台线程负责实际发送数据。可以稍后通过 Future 对象获取发送的结果,ProducerRecord、RecordMetadata 包含了返回的结果信息。

        3、拦截器(Interceptor)

        Kafka Producer 拦截器(Interceptor)是一种可以在消息发送前后进行拦截和处理的机制,允许你在不修改业务逻辑的情况下对消息进行定制化处理。
        拦截器可以用于以下场景:
        • 消息发送前进行内容修改或验证
        • 发送失败时进行特殊处理
        • 发送成功时记录日志或统计信息
        • 实现消息审计跟踪
        • 添加消息头信息
        实现一个自定义拦截器,需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口:
        然后配置拦截器,如果配置了多个拦截器
        1. onSend()方法按照配置顺序依次执行
        1. onAcknowledgement()方法按照配置的逆序执行
        注意:
        1. 拦截器中的异常会被捕获并记录,但不会传播到业务代码中
        1. 拦截器中的操作不应过于耗时,以免影响Producer性能
        1. 拦截器可能会被多次调用(如重试时),需要确保幂等性
        1. 拦截器中修改的消息对象应该是新的对象,而不是直接修改原始对象
        实战:
        1、消息审计拦截器
        2、消息加密拦截器

        4、序列化器(Serializer)

        Kafka 生产者的序列化器(Serializer)是生产者将 Java 对象转换为字节数组(byte[])的组件。Kafka 提供了几种内置的序列化器:
        1. StringSerializer:用于字符串数据
          1. ByteArraySerializer:用于字节数组
            1. IntegerSerializer:用于整数
              1. LongSerializer:用于长整型
                1. DoubleSerializer:用于双精度浮点数
                  Kafka 也可以自定义序列化器,需要实现org.apache.kafka.common.serialization.Serializer接口:

                  5、分区器(Partitioner)

                  Kafka 的分区器(Partitioner)是决定生产者的消息被发送到哪个分区的组件

                  6、ACK机制

                  Kafka 的 ACK 机制是 Producer 与 Broker 之间的一种可靠性保证机制,用于控制消息的持久化确认级别
                  ACK 机制涉及以下参数:
                  • acks :Producer 端参数,用来告诉 Producer 消息是否写入成功,有三个值:
                    • acks = all/-1 : Producer 发送消息后 要等 ISR 中的最少数量的副本(由 min.insync.replicas 控制)同步消息成功后,才收到这条消息写入成功的响应。这个选项持久性最好,延时性最差。
                    • acks = 1 :Producer 发送消息后只需要等 Leader 副本写入数据成功后,就可以收到这条消息写入成功的响应。Kafka 的默认选项,提供了较好的持久性和较低的延迟性。适用于大多数场景。
                    • acks = 0:Producer 发送消息后不等待任何确认。这个选项提供了最低的延迟,但是持久性最差,当服务器发生故障时,就很可能发生数据丢失。适用于允许少量数据丢失的高吞吐场景。
                  • min.insync.replicas :Broker 端参数,ISR 列表中最小同步副本数,表示消息至少被写入到多少个副本才算是 “已提交”。默认值是 1,也就意味着一旦消息被写入 Leader 端即被认为是“已提交”。这个参数只有在 acks = all/-1 时才有效。
                  • replication.factor :Broker 端参数,用来设置分区的副本数。这个值为 1 表示只有一个副本,也就是只有 Leader;这个值为 2 表示有两个副本,包括一个 Leader 和一个 Follower。默认值是 3,表示每个分区有 1 个 Leader 副本和 2 个 Follower 副本。
                    • acks = all/-1 : 表示 Producer 发送消息后, 要等 ISR 中的最少数量的副本同步消息成功后,才收到这条消息写入成功的响应。这个选项持久性最好,延时性最差。通常搭配 min.insync.replicas 参数使用。
                    • acks = 1 :表示 Producer 发送消息后,只需要等 Leader 副本写入数据成功后,就可以收到这条消息写入成功的响应。这个是 Kafka 的默认选项,提供了较好的持久性和较低的延迟性。
                    • acks = 0:Producer 只要把消息发出去,不管发送出去的数据有没有同步完成,都认为这个消息发送成功了。这个选项提供了最低的延迟,但是持久性最差,当服务器发生故障时,就很可能发生数据丢失。
                  • retries:生产者发送失败时的重试次数
                  • retry.backoff.ms:重试间隔时间
                  整体工作流程如下:
                  1. 生产者发送消息到 Kafka 集群
                  1. 根据 acks 设置等待相应级别的确认
                  1. 如果未收到确认或收到错误,根据配置决定是否重试
                  1. 消费者只能看到已提交(committed)的消息
                  最佳实践:
                  • 高吞吐场景:acks = 0 或 1
                  • 金融等关键业务:acks = all + min.insync.replicas ≥ 2,对于有三个 Broker 的集群官方推荐的参数配置是:
                    • replication.factor=3
                    • min.insync.replicas=2
                    • acks=all
                  • 平衡场景: acks = 1(默认)

                  7、精确一次性语义

                  Kafka 的精确一次性语义(EOS)是指确保消息从生产者发送到 Kafka,再到消费者处理的过程中,每条消息被精确地处理一次,不会丢失也不会重复
                  常见的消息交付可靠性保障有以下三种:
                  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
                  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
                  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。
                  Kafka 默认提供的交付可靠性保障是第二种,即至少一次。消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送
                  Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。
                  无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。
                  Kafka 分别通过幂等性(Idempotence)和事务(Transaction)这两种机制实现了精确一次(exactly once)语义。

                  7.1 幂等性

                  幂等性是 Kafka 0.11 版本后引入的强大功能。启用后,生产者会被分配一个唯一的 PID(Producer ID)并为每个消息序列号(Sequence Number)。Broker 会缓存每个主题分区最近接收到的序列号。如果收到序列号不连续(如重复),Broker 会拒绝该重复消息。

                  7.1.1 开启幂等性

                  在 Kafka 中,Producer 默认不是幂等性的,在生产者配置中设置 enable.idempotence = true开启幂等性。通常设置了 acks=all 和 retries > 1 后,该配置会自动生效。
                  开启幂等性之后,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。

                  7.1.2 幂等性实现原理

                  Kafka 在实现幂等性时引入了两个概念:
                  • ProducerID:Producer 初始化时都会被分配一个 ProducerID,对用户无感知,重启会发生变化
                  • SequenceNumber:Producer 为每个主题的分区分配一个从 0 开始单调递增的 SequenceNumber,在发送消息的时候为消息绑定这个 SequenceNumber。
                  notion image
                  在 ProducerID + Partition 级别上的 SequenceNumber,就可以实现分区级别消息的唯一性了。Broker 收到消息后会以 ProducerID 为单位存储 SequenceNumber。如果消息落盘会同时更新最大 SequenceNumber;如果新的消息带上的 SequenceNumber 不大于当前的最大 SequenceNumber,这个消息就会被 Broker 端拒绝掉。也就是说即使 Producer 重复发送了, Broker 端也会将其过滤掉。
                  Kafka 幂等消息的局限性:
                  • 只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber ,导致重复问题。
                  • 只能实现单会话上的幂等性。不能实现跨会话的幂等性,当重启 Producer 进程之后,Producer 后会被分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。

                  7.2 事务

                  Kafka 自 0.11 版本开始也提供了对事务的支持,它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。再具体一点,事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
                  另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

                  7.2.1 开启事务

                  1、Producer 端开启事务
                  Producer 开启事务只需要设置两个属性:
                  • 和幂等性 Producer 一样,开启 enable.idempotence = true
                  • 设置 Producer 端参数 transactional.id。最好为其设置一个有意义的名字。
                  Procuder 开启事务后,发送消息的代码示例如下
                  上面这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务API,
                  • initTransactions():初始化事务
                  • beginTransaction():开启事务
                  • sendOffsetsToTransaction():在事务内提交已经消费的偏移量(主要用于消费者)
                  • commitTransaction():提交事务
                  • abortTransaction():终止事务
                  2、Consumer 端消费事务消息
                  Consumer 端消费事务消息的时候,只需要设置 isolation.level 参数的值:
                  1. read_uncommitted:默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 是否已经提交事务,其写入的消息都可以读取。可能会读到最终被回滚的脏数据。
                  1. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。对于非事务性生产者发送的消息,它们会立即可见。事务保证的是生产者写入的原子性,而 read_committed 保证了消费者读取的原子性。一个事务生产的全部消息,对消费者来说要么全部可见,要么全部不可见。

                  7.2.2 事务实现原理

                  Kafka 事务的核心概念:
                  • 事务协调器(Transaction Coordinator):与消费者组协调器类似,这是一个内置的 Kafka broker 组件。每个生产者启动事务时,都会向一个(通过哈希找到的)特定协调器注册和管理自己的事务。
                  • 事务ID(Transactional Id):一个唯一的字符串,用于标识一个生产者实例。在生产者重启后,相同的 transactional.id 可以保证旧的未完成事务(属于同一个 ID)被清除。
                  • 控制消息(Control Messages):事务会向主题中写入一些特殊的、对消费者不可见的消息。消费者在读取消息时,会利用这些控制消息来过滤掉未提交的消息或已中止的消息。
                    • COMMIT 消息:标记一个事务已提交。
                    • ABORT 消息:标记一个事务已中止。
                  notion image
                  1. 启动生产者,分配协调器:我们在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。
                    1. 事务协调器的分配涉及到一个特殊的主题 __transaction_state,该主题默认有50个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。
                    2. 分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。
                  1. 开启事务
                  1. 发送消息:生产者分配到 PID 后,要先告诉事务协调器要把详细发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应。
                  1. 提交/终止事务
                    1. 提交事务:
                        • 生产者向事务协调器发送提交请求。
                        • 协调器将事务提交标记写入内部主题 __transaction_state
                        • 协调器向所有涉及到的分区发送 COMMIT 控制消息
                        • 一旦所有涉及的分区都成功接收到 COMMIT 消息,协调器则标记该事务为已完成。
                        • 此时,消息才真正对消费者可见。
                    2. 中止(Abort)
                        • 流程与提交类似,但协调器发送的是 ABORT 控制消息
                        • 所有待定的消息都会被丢弃,消费者永远不会看到它们。
                  Kafka系列:最佳实践Kafka系列:消费者管理(Consumer Group、位移提交、Rebalance)
                  Loading...