Kafka 系列(三):Producer 源码篇
Kafka 系列文章列表
Kafka 版本:2021-11 trunk 分支(当前最新版本3.0.0)
在 Kafka 中,客户端由 Java 语言实现,服务端由 Scala 语言来实现的,在使用 Kafka 时,客户端是用户最先接触到部分,因此,kafka系列文章的源码分析也会从客户端端开始,今天讲的是 Producer 端发送模型的实现。
在看这篇源码分析之前,不了解发送端原理的同学,可以先看下上篇博文Kafka Producer 原理篇
Producer 的使用
在分析 Producer 发送模型之前,先看一下用户是如何使用 Producer 向 Kafka 写数据的,下面是一个关于 Producer 最简单的应用示例。
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
从上面的代码可以看出 Kafka 为用户提供了非常简单的 API,在使用时,只需要如下两步:
- 初始化 KafkaProducer 实例;
- 调用 send 接口发送数据。
本文主要是围绕着 Producer 在内部是如何实现 send 接口而展开的。
发送流程
下面通过对 send 源码分析来一步步剖析 Producer 数据的发送流程。
send()
用户是直接使用 producer.send() 发送的数据,先看一下 send() 接口的实现
1 | // 异步发送一条记录到 topic,等同于 send(record, null) |
数据发送的最终实现还是调用了 Producer 的 doSend() 接口。
doSend()
doSend() 源码
1 | // 异步发送记录到主题 |
在 dosend() 方法的实现上,一条 Record 数据的发送,可以分为以下几步:
- 确认数据要发送到的 topic 的 metadata 是可用的(如果该 partition 的 leader 存在则是可用的),如果没有 topic 的 metadata 信息,就需要获取相应的 metadata
- 序列化 record 的 key 和 value;
- 确定 record 要发送到的 partition(可以指定,也可以根据算法计算);
- 向 accumulator 中追加 record 数据,数据会先进行缓存;
- 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。
下面会对这几部分的具体实现进行详细分析。
发送流程详解
获取 topic 的 metadata 信息
在数据发送前,需要先确定 topic 是可用的。这部分具体实现在 KafkaProducer
中的 waitOnMetadata()
。
序列化 key 和 value
Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化。
kafka 提供了一部分通用的序列化实现(所有默认提供的序列化实现均在 org.apache.kafka.common.serialization
),当然我们也是可以自定义序列化的具体实现。
确定分区
关于 partition 值的计算,分为三种情况:
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
具体实现如下:
1 | // 为给定的记录计算分区。如果记录有分区直接返回,否则调用配置的分区算法来计算分区。 |
Producer 默认使用的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,DefaultPartitioner 默认采用的是 sticky partitioning (黏性分区分配)。
1 | public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, |
关于 sticky partitioning 的具体实现,可以查看专题文章。
用户也可以自定义 partition 的策略。
向 accumulator 追加数据
Producer 会先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender 线程去发送 RecordBatch,这里先详细分析一下 Producer 是如何向 buffer 中写入数据的。
Producer 是通过 RecordAccumulator 实例追加数据,其中一个比较重要的变量就是 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches
,每个 TopicPartition 都会对应一个 Deque<RecordBatch>
,当添加数据时,会向其 TopicPartition 对应的 queue 中尾部最后一个 RecordBatch 中添加 record,而发送数据时,则会先从 queue 头部的 RecordBatch 开始发送。
1 | // org.apache.kafka.clients.producer.internals.RecordAccumulator |
- 获取该 TopicPartition 对应的 queue,没有的话会创建一个空的 queue;
- 向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则新创建一个,成功写入的话直接返回结果,写入成功;
- 创建一个新的 RecordBatch,初始化内存大小根据 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)) 来确定(防止单条 record 过大的情况);
- 向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功。
发送 RecordBatch
当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送 RecordBatch。
sender 线程对 RecordBatch 的处理是在 sendProducerData() 方法中进行的,该方法具体实现如下:
sendProducerData() 源码
1 | private long sendProducerData(long now) { |
这段代码前面有很多是其他的逻辑处理,如:移除暂时不可用的 node、处理超时的 RecordBatch,真正进行发送发送 RecordBatch 的是 sendProduceRequests(batches, now) 这个方法。里面的实现逻辑主要就是将 batches 中 leader 为同一个 node 的所有 RecordBatch 放在一个请求中进行发送。
以上就是 kafka producer 客户端发送的整个流程,如有问题,欢迎在评论区交流。