💧 Posted on 

Kafka 系列(五):如何保障数据可靠性?

Kafka 中采用了多副本的机制,这是大多数分布式系统中惯用的手法,以此来实现水平扩展、提供容灾能力、提升可用性和可靠性等。

我们对此可以引申出一系列的疑问:

  • Kafka 多副本之间如何进行数据同步,尤其是在发生异常时候的处理机制又是什么?
  • 多副本间的数据一致性如何解决,基于的一致性协议又是什么?
  • 如何确保Kafka 的可靠性?
  • Kafka 中的可靠性和可用性之间的关系又如何?

下面从副本的角度切入来看看Kafka如何保障数据一致性、数据可靠性等问题,主要包括副本剖析、日志同步机制和可靠性分析等内容。

副本剖析

副本(Replica)是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。

Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。

下面的内容会涉及到AR、ISR、HW等基础概念,下面我们先简要回顾一下,详情请参考 <Kafka中的基本概念>

  • 副本是相对于分区而言的,即副本是特定分区的副本。
  • 一个分区中包含一个或多个副本,其中一个为leader副本,其余为follower副本,各个副本位于不同的broker节点中。只有leader副本对外提供服务,follower副本只负责数据同步。
  • 分区中的所有副本统称为 AR,而ISR 是指与leader 副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的一员。
  • LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉取到HW之前的消息。

从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息。

失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区(under-replicated分区)。

可以通过 kafka-topic.sh 脚本的 under-replicated-partitions 参数来显示主题中包含失效副本的分区。

失效副本不仅是指处于功能失效状态的副本,处于同步失效状态的副本也可以看作失效副本。

怎么判定一个分区是否有副本处于同步失效的状态呢?

Kafka 从 0.9.x 版本开始就通过唯一的broker端参数 replica.lag.time.max.ms 来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合,replica.lag.time.max.ms 参数的默认值为10000。

具体的实现原理也很容易理解,当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上leader 副本,此时更新该副本的lastCaughtUpTimeMs 标识。

注意: 千万不要错误的以为,只要 follower 副本拉取 leader 副本的数据就会更新 lastCaughtUpTimeMs 。 当leader 副本中消息的流入速度大于 follower 副本的拉取速度时,就算 follower 副本一直拉取,也不会和 leader 副本保持同步。如果还将该 follower 副本放入 ISR 集合中,就有可能造成消息丢失。

Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数replica.lag.time.max.ms 指定的值。

什么情况会导致副本失效?

  • follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。
  • follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。
  • 当通过脚本工具增加了副本因子,新增加的副本因子在赶上leader之前都处于失效状态

ISR的伸缩

Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为 isr-expirationisr-change-propagation

isr-expiration 任务会周期性地检测每个分区是否需要缩减其ISR集合。

这个周期和 replica.lag.time.max.ms 参数有关,大小是这个参数值的一半,默认值为5000ms。

当检测到ISR集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/partition/state 节点中。节点中的数据示例如下:

1
{"controller epoch": 26, "leader": 0, "version": 1, "leader epoch": 2, "isr": [0, 1]}
  • controller_epoch 表示当前Kafka控制器的epoch
  • leader 表示当前分区的leader副本所在的broker的id编号
  • version表示版本号(当前版本固定为1)
  • leader_epoch表示当前分区的leader纪元
  • isr表示变更后的ISR列表。

当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation 任务会周期性(固定值为 2500ms)地检查isrChangeSet,如果发现isrChangeSet中有ISR集合的变更记录,那么它会在ZooKeeper的/isr_change_notification路径下创建一个以 isr_change_开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000),并将isrChangeSet中的信息保存到这个节点中。

Kafka控制器为/isr_change_notification添加了一个Watcher,当这个节点中有子节点发生变化时会触发Watcher的动作,以此通知控制器更新相关元数据信息并向它管理的broker节点发送更新元数据的请求,最后删除/isr_change_notification路径下已经处理过的节点。

频繁地触发Watcher会影响Kafka控制器、ZooKeeper甚至其他broker节点的性能。为了避免这种情况,Kafka添加了限定条件,当检测到分区的ISR集合发生变化时,还需要检查以下两个条件:

  1. 上一次ISR集合发生变化距离现在已经超过5s。
  2. 上一次写入ZooKeeper的时间距离现在已经超过60s。

满足以上两个条件之一才可以将ISR集合的变化写入目标节点。

有缩减对应就会有扩充,那么Kafka又是何时扩充ISR的呢?

随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。

追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW,注意这里并不是和leader副本的LEO相比。ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/partition/state节点和isrChangeSet,之后的步骤就和ISR收缩时的相同。

当ISR集合发生增减时,或者ISR集合中任一副本的LEO发生变化时,都可能会影响整个分区的HW。

例如,leader副本的LEO为9,follower1副本的LEO为7,而follower2副本的LEO为6,如果判定这3个副本都处于ISR集合中,那么这个分区的HW为6;如果follower3已经被判定为失效副本被剥离出ISR集合,那么此时分区的HW为leader副本和follower1副本中LEO的最小值,即为7。

LEO 和 HW

这两个概念可以参考:<Kafka中的基本概念>

对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(RemoteReplica)。

本地副本是指对应的Log分配在当前的broker节点上,远程副本是指对应的Log分配在其他的broker节点上。在Kafka中,同一个分区的信息会存在多个broker节点上,并被其上的副本管理器所管理,这样在逻辑层面每个broker节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

整个消息追加的过程可以概括如下:

  1. 生产者客户端发送消息至leader副本(副本1)中。
  2. 消息被追加到leader副本的本地日志,并且会更新日志的偏移量。
  3. follower副本(副本2和副本3)向leader副本请求同步数据。
  4. leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本的信息。
  5. leader副本所在的服务器将拉取结果返回给follower副本。
  6. follower副本收到leader副本返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

了解了这些内容后,我们再来分析在这个过程中各个副本LEO和HW的变化情况。下面的示例,生产者一直在往leader副本中写入消息。某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。之后follower副本向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequest请求中的fetch_offset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,这个HW信息对应的是FetchResponse中的high_watermark。

此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。与此同时,follower副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)=0)。

接下来follower副本再次请求拉取leader副本中的消息。

此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本。注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。

两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3(min(LEO,3)=3)。

在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。leader 副本收到 follower副本的FetchRequest请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。

Kafka 的根目录下有 cleaner-offset-checkpoint、log-start-offset-checkpoint、recovery-point-offset-checkpoint和replication-offset-checkpoint四个检查点文件

recovery-point-offset-checkpoint 和replication-offset-checkpoint 这两个文件分别对应了 LEO和 HW。

Kafka 中会有一个定时任务负责将所有分区的 LEO 刷写到恢复点文件 recovery-point-offset-checkpoint 中,定时周期由 broker 端参数 log.flush.offset.checkpoint.interval.ms来配置,默认值为60000。

还有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms来配置,默认值为5000。

log-start-offset-checkpoint文件对应logStartOffset(注意不能缩写为LSO,因为在Kafka中LSO是LastStableOffset的缩写),在FetchRequest和FetchResponse中也有它的身影,它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW 的过程中,logStartOffset 也有可能随之而动。Kafka 也有一个定时任务来负责将所有分区的 logStartOffset书写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms来配置,默认值为60000。

Leader Epoch

上述过程是在正常情况下的leader副本与follower副本之间的同步过程。

如果leader副本发生切换,那么同步过程又该如何处理呢?

在0.11.0.0版本之前,Kafka使用的是基于HW的同步机制,但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题

数据丢失

下图中,Replica B 是当前的leader副本(用L标记),Replica A是follower副本。

在某一时刻,B中有2条消息 m1 和 m2,A从B中同步了这两条消息,此时A和B的LEO都为2,同时HW都为1;

之后A再向B中发送请求以拉取消息,FetchRequest请求中带上了A的LEO信息,B在收到请求之后更新了自己的HW为2;

B中虽然没有更多的消息,但还是会返回FetchResponse,并在其中包含了HW信息;最后A根据FetchResponse中的HW信息更新自己的HW为2。

可以看到整个过程中两者之间的HW同步有一个间隙,在A写入消息m2之后(LEO更新为2)需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为2。

下图中,如果在这个时候A宕机了,那么在A重启之后会根据之前HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会将m2这条消息删除,此时A只剩下m1这一条消息,之后A再向B发送FetchRequest请求拉取消息。

此时若B 再宕机,那么 A 就会被选举为新的leader,如下图。B 恢复之后会成为follower,由于follower副本HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为1。这样一来m2这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。

数据不一致

对于上面的情况,也有一些解决方法,比如等待所有follower副本都更新完自身的HW之后再更新leader副本的HW,这样会增加多一轮的FetchRequest/FetchResponse延迟,自然不够妥当。

还有一种方法就是follower副本恢复之后,在收到leader副本的FetchResponse前不要截断follower副本(follower副本恢复之后会做两件事情:截断自身和向leader发送FetchRequest请求),不过这样也避免不了数据不一致的问题。

例如下图中,当前leader副本为A,follower副本为B,A中有2条消息m1和m2,并且HW和LEO都为2,B中有1条消息m1,并且HW和LEO都为1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为leader

之后B写入消息m3,并将LEO和HW更新至2(假设所有场景中的min.insync.replicas参数配置为1)。此时A也恢复过来了,根据前面数据丢失场景中的介绍可知它会被赋予follower的角色,并且需要根据HW截断日志及发送FetchRequest至B,不过此时A的HW正好也为2,那么就可以不做任何调整了,如下图

如此一来A中保留了m2而B中没有,B中新增了m3而A也同步不到,这样A和B就出现了数据不一致的情形。

Leader Epoch

为了解决上述两种问题,Kafka从0.11.0.0开始引入了 leader epoch 的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。

leader epoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leader epoch 的值就会加1,相当于为leader增设了一个版本号。

与此同时,每个副本中还会增设一个矢量 <LeaderEpoch –> ,其中StartOffset表示当前Leader Epoch下写入的第一条消息的偏移量。

每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leader epoch变更时,会将对应的矢量对追加到这个文件中.

解决数据丢失问题

下图中 LE(LeaderEpoch的缩写,当前A和B中的LE都为0)。

同样 A 发生重启,之后 A 不是先忙着截断日志而是先发送 OffsetsForLeaderEpochRequest 请求给 B(OffsetsForLeaderEpochRequest 请求体结构如下图所示,其中包含 A 当前的LeaderEpoch值),B作为目前的leader在收到请求之后会返回当前的LEO(LogEndOffset),与请求对应的响应为OffsetsForLeaderEpochResponse

如果A中的LeaderEpoch(假设为LE_A)和B中的不相同,那么B此时会查找LeaderEpoch为 LE_A+1 对应的 StartOffset 并返回给 A,也就是 LE_A 对应的 LEO,所以我们可以将OffsetsForLeaderEpochRequest的请求看作用来查找follower副本当前LeaderEpoch的LEO。

如下图,A 在收到2之后发现和目前的LEO相同,也就不需要截断日志了。B发生了宕机,A成为新的leader,那么对应的LE=0也变成了LE=1,对应的消息m2此时就得到了保留,之后不管B有没有恢复,后续的消息都可以以LE1为LeaderEpoch陆续追加到A中。

解决数据不一致问题

下面我们再来看一下leader epoch如何应对数据不一致的场景。

如下图所示,当前A为leader,B为follower,A中有2条消息m1和m2,而B中有1条消息m1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为新的leader。

之后B写入消息m3,并将LEO和HW更新至2。(注意此时的LeaderEpoch已经从LE0增至LE1了。)

紧接着A也恢复过来成为follower,并向B发送OffsetsForLeaderEpochRequest请求,此时A的LeaderEpoch为LE0。B根据LE0查询到对应的offset为1并返回给A,A就截断日志并删除了消息m2。

之后A发送FetchRequest至B请求来同步数据,最终A和B中都有两条消息m1和m3,HW和LEO都为2,并且LeaderEpoch都为LE1,如此便解决了数据不一致的问题。

日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个leader来负责处理数据写入的顺序性。只要leader还处于存活状态,那么follower只需按照leader中的写入顺序来进行同步即可。

通常情况下,只要leader不宕机我们就不需要关心follower的同步问题。

不过当leader宕机时,我们就要从follower中选举出一个新的leader。follower的同步状态可能落后leader很多,甚至还可能处于宕机状态,所以必须确保选择具有最新日志消息的follower作为新的leader。

kafka 在执行日志同步时,在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的 leader。

写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为leader,选举过程简单、开销低,这也是Kafka选用此模型的重要因素。Kafka中包含大量的分区,leader副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响Kafka的性能指标。

日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的leader中能够包含这条消息。

这里就有一个需要权衡(tradeoff)的地方,如果leader在消息被提交前需要等待更多的follower确认,那么在它宕机之后就可以有更多的follower替代它,不过这也会造成性能的下降。

对于这种tradeoff,一种常见的做法是“少数服从多数”,“少数服从多数”的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为3,那么延迟就取决于最快的那个follower而不是最慢的那个(除了leader,只需要另一个follower确认即可)。

不过它也有一些劣势,为了保证leader选举的正常进行,它所能容忍的失败follower数比较少,如果要容忍1个follower失败,那么至少要有3个副本,如果要容忍2个follower失败,必须要有5个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这也就是“少数服从多数”的这种Quorum模型常被用作共享集群配置(比如ZooKeeper),而很少用于主流的数据存储中的原因。

在采用ISR模型和(f+1)个副本数的配置下,一个Kafka分区能够容忍最大f个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍f个节点失败,“少数服从多数”的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数”需要3个副本和1个follower的确认信息,采用ISR的方式需要2个副本和1个follower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低,“少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对Kafka而言,这种能力可以交由客户端自己去选择。

总结

kafka对可靠性的保障体现在多个方面,消息发送阶段、消息存储阶段以及消费消息阶段均有涉及。

消息发送阶段

消息发送的3种模式,即发后即忘、同步和异步。

对于发后即忘的模式,不管消息有没有被成功写入,生产者都不会收到通知,那么即使消息写入失败也无从得知,因此发后即忘的模式不适合高可靠性要求的场景。

如果要提升可靠性,那么生产者可以采用同步或异步的模式,在出现异常情况时可以及时获得通知,以便可以做相应的补救措施,比如选择重试发送(可能会引起消息重复)。

有些发送异常属于可重试异常,比如 NetworkException,这个可能是由瞬时的网络故障而导致的,一般通过重试就可以解决。对于这类异常,客户端内部本身提供了重试机制来应对这种类型的异常,通过 retries 参数即可配置。默认情况下,retries参数设置为0,即不进行重试,对于高可靠性要求的场景,需要将这个值设置为大于 0 的值,与 retries 参数相关的还有一个retry.backoff.ms参数,它用来设定两次重试之间的时间间隔,以此避免无效的频繁重试。

如果配置的retries参数值大于0,则可能引起一些负面的影响。由于默认的max.in.flight.requests.per.connection参数值为5,这样可能会影响消息的顺序性。对此要么放弃客户端内部的重试功能,要么将max.in.flight.requests.per.connection参数设置为1,这样也就放弃了吞吐。

生产者客户端参数 acks 也是用来支撑可靠性的。该参数有三个可选项:0、1、-1

  • 对于acks=1的配置,生产者将消息发送到leader副本,leader副本在成功写入本地日志之后会告知生产者已经成功提交。(如果此时ISR集合的follower副本还没来得及拉取到leader中新写入的消息,leader就宕机了,那么此次发送的消息就会丢失。)
  • 对于ack=-1的配置,生产者将消息发送到leader副本,leader副本在成功写入本地日志之后还要等待 ISR 中的 follower 副本全部同步完成才能够告知生产者已经成功提交,即使此时leader副本宕机,消息也不会丢失

在acks=-1的情形中,它要求ISR中所有的副本都收到相关的消息之后才能够告知生产者已经成功提交。试想一下这样的情形,leader 副本的消息流入速度很快,而follower副本的同步速度很慢,在某个临界点时所有的follower副本都被剔除出了ISR集合,那么ISR中只有一个leader副本,最终acks=-1演变为acks=1的情形,如此也就加大了消息丢失的风险。

Kafka也考虑到了这种情况,并为此提供了min.insync.replicas参数(默认值为1)来作为辅助(配合acks=-1来使用),这个参数指定了ISR集合中最小的副本数,如果不满足条件就会抛出NotEnoughReplicasException或NotEnoughReplicasAfterAppendException。

在正常的配置下,需要满足副本数 > min.insync.replicas参数的值。

一个典型的配置方案为:副本数配置为 3,min.insync.replicas 参数值配置为 2。注意min.insync.replicas参数在提升可靠性的时候会从侧面影响可用性。(试想如果ISR中只有一个leader副本,那么最起码还可以使用,而此时如果配置min.insync.replicas > 1,则会使消息无法写入。)

与可靠性和ISR集合有关的还有一个参数—unclean.leader.election.enable。这个参数的默认值为false,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的 leader,这样有可能造成数据的丢失。如果这个参数设置为false,那么也会影响可用性,非ISR集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。

从0.11.0.0 版本开始,unclean.leader.election.enable 的默认值由原来的 true 改为了false,可以看出Kafka的设计者愈发地偏向于可靠性的提升。

消息存储阶段

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢。

那假如来个地震机房机子都挂了呢?emmmmmm…大公司基本上都有异地多活。

就Kafka而言,越多的副本数越能够保证数据的可靠性,副本数可以在创建主题时配置,也可以在后期修改,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。

一般而言,设置副本数为3即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka 时就会设置副本数为 5。

在broker端还有两个参数log.flush.interval.messages 和 log.flush.interval.ms,用来调整同步刷盘的策略,默认是不做控制而交由操作系统本身来进行处理。同步刷盘是增强一个组件可靠性的有效方式,不过这种方式极其损耗性能,最好还是采用多副本的机制来保障。

消息消费阶段

消费者需要真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

在kafka中,消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset(enable.auto.commit 参数的默认值为 true)。虽然这种方式非常简便,但它会带来重复消费和消息丢失的问题,对于高可靠性要求的应用来说显然不可取,所以需要将 enable.auto.commit 参数设置为 false 来执行手动位移提交。

在执行手动位移提交的时候也要遵循一个原则:如果消息没有被成功消费,那么就不能提交所对应的消费位移。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。

对于消费端,Kafka 还提供了一个可以兜底的功能,即回溯消费,通过这个功能可以让我们能够有机会对漏掉的消息相应地进行回补,进而可以进一步提高可靠性。