4.3 分区的管理
本节主要介绍与分区相关的知识和操作,包括优先副本的选举、分区重分配、复制限流、修改副本因子等内容。
4.3.1 优先副本的选举
分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用,此时就需要Kafka从剩余的follower副本中挑选一个新的leader副本来继续对外提供服务。虽然不够严谨,但从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个broker节点上,对应的leader副本的分配也比较均匀。比如我们使用kafka-topics.sh脚本创建一个分区数为3、副本因子为3的主题topic-partitions,创建之后的分布信息如下:
可以看到leader副本均匀分布在brokerId为0、1、2的broker节点之中。针对同一个分区而言,同一个broker节点中不可能出现它的多个副本,即Kafka集群的一个broker中最多只能有它的一个副本,我们可以将leader副本所在的broker节点叫作分区的leader节点,而follower副本所在的broker节点叫作分区的follower节点。
随着时间的更替,Kafka 集群的broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的leader节点发生故障时,其中一个follower节点就会成为新的leader节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。当原来的leader节点恢复之后重新加入集群时,它只能成为一个新的follower节点而不再对外提供服务。比如我们将brokerId为2的节点重启,那么主题topic-partitions新的分布信息如下:
可以看到原本分区1的leader节点为2,现在变成了0,如此一来原本均衡的负载变成了失衡:节点0的负载最高,而节点1的负载最低。
为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。比如上面主题 topic-partitions 中分区 0的AR集合列表(Replicas)为[1,2,0],那么分区0的优先副本即为1。理想情况下,优先副本就是该分区的leader副本,所以也可以称之为preferred leader。Kafka要确保所有主题的优先副本在Kafka集群中均匀分布,这样就保证了所有分区的leader均衡分布。如果leader分布过于集中,就会造成集群负载不均衡。
所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。
需要注意的是,分区平衡并不意味着Kafka集群的负载均衡,因为还要考虑集群中的分区分配是否均衡。更进一步,每个分区的leader副本的负载也是各不相同的,有些leader副本的负载很高,比如需要承载TPS为30000的负荷,而有些leader副本只需承载个位数的负荷。也就是说,就算集群中的分区分配均衡、leader 分配均衡,也并不能确保整个集群的负载就是均衡的,还需要其他一些硬性的指标来做进一步的衡量,这个会在后面的章节中涉及,本节只探讨优先副本的选举。
在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是 auto.leader.rebalance.enable,此参数的默认值为true,即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则 Kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的 broker节点,计算每个broker节点的分区不平衡率(broker中的不平衡率=非优先副本的leader个数/分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为 10%,如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。执行周期由参数leader.imbalance.check.interval.seconds控制,默认值为300秒,即5分钟。
不过在生产环境中不建议将auto.leader.rebalance.enable设置为默认的true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控,如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。前面也分析过,分区及副本的均衡也不能完全确保集群整体的均衡,并且集群中一定程度上的不均衡也是可以忍受的,为防止出现关键时期“掉链子”的行为,笔者建议还是将掌控权把控在自己的手中,可以针对此类相关的埋点指标设置相应的告警,在合适的时机执行合适的操作,而这个“合适的操作”就是指手动执行分区平衡。
Kafka中kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能。优先副本的选举过程是一个安全的过程,Kafka客户端可以自动感知分区leader副本的变更。下面的示例演示了kafka-perferred-replica-election.sh脚本的具体用法:
可以看到在脚本执行之后,主题topic-partitions中的所有leader副本的分布已经和刚创建时的一样了,所有的优先副本都成为leader副本。
上面示例中的这种使用方式会将集群上所有的分区都执行一遍优先副本的选举操作,分区数越多打印出来的信息也就越多。leader 副本的转移也是一项高成本的工作,如果要执行的分区数很多,那么必然会对客户端造成一定的影响。如果集群中包含大量的分区,那么上面的这种使用方式有可能会失效。在优先副本的选举过程中,具体的元数据信息会被存入 ZooKeeper的/admin/preferred_replica_election节点,如果这些数据超过了ZooKeeper节点所允许的大小,那么选举就会失败。默认情况下ZooKeeper所允许的节点数据大小为1MB。
kafka-perferred-replica-election.sh脚本中还提供了path-to-json-file参数来小批量地对部分分区执行优先副本的选举操作。通过path-to-json-file参数来指定一个JSON文件,这个JSON文件里保存需要执行优先副本选举的分区清单。
举个例子,我们再将集群中 brokerId 为 2 的节点重启,不过我们现在只想对主题 topic-partitions执行优先副本的选举操作,那么先创建一个JSON文件,文件名假定为election.json,文件的内容如下:
然后通过kafka-perferred-replica-election.sh脚本配合path-to-json-file参数来对主题topic-partitions执行优先副本的选举操作,具体示例如下:
读者可以自行查看一下集群中的其他主题是否像之前没有使用 path-to-json-file 参数的一样也被执行了选举操作。
在实际生产环境中,一般使用 path-to-json-file 参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file参数的选举操作方式。同时,优先副本的选举操作也要注意避开业务高峰期,以免带来性能方面的负面影响。
4.3.2 分区重分配
当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他follower副本中。总而言之,这个节点上的分区副本都已经处于功能失效的状态,Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节点上,如果放任不管,则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。
当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。
当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。
为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。Kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。
kafka-reassign-partitions.sh 脚本的使用分为 3 个步骤:首先创建需要一个包含主题清单的JSON 文件,其次根据主题清单和 broker 节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。
下面我们通过一个具体的案例来演示kafka-reassign-partitions.sh脚本的用法。首先在一个由3个节点(broker 0、broker 1、broker 2)组成的集群中创建一个主题topic-reassign,主题中包含4个分区和2个副本:
我们可以观察到主题topic-reassign在3个节点中都有相应的分区副本分布。由于某种原因,我们想要下线brokerId为1的broker节点,在此之前,我们要做的就是将其上的分区副本迁移出去。使用kafka-reassign-partitions.sh脚本的第一步就是要创建一个JSON文件(文件的名称假定为reassign.json),文件内容为要进行分区重分配的主题清单。对主题topic-reassign而言,示例如下:
第二步就是根据这个JSON文件和指定所要分配的broker节点列表来生成一份候选的重分配方案,具体内容参考如下:
上面的示例中包含4个参数,其中zookeeper已经很常见了,用来指定ZooKeeper的地址。generate是kafka-reassign-partitions.sh脚本中指令类型的参数,可以类比于kafka-topics.sh脚本中的 create、list 等,它用来生成一个重分配的候选方案。topic-to-move-json用来指定分区重分配对应的主题清单文件的路径,该清单文件的具体的格式可以归纳为{"topics":[{"topic":"foo"},{"topic":"foo1"}],"version":1}。broker-list用来指定所要分配的broker节点列表,比如示例中的“0,2”。
上面示例中打印出了两个JSON格式的内容。第一个“Current partition replica assignment”所对应的 JSON 内容为当前的分区副本分配情况,在执行分区重分配的时候最好将这个内容保存起来,以备后续的回滚操作。第二个“Proposed partition reassignment configuration”所对应的JSON 内容为重分配的候选方案,注意这里只是生成一份可行性的方案,并没有真正执行重分配的动作。生成的可行性方案的具体算法和创建主题时的一样,这里也包含了机架信息,具体的细节可以参考4.1.2节的内容。
我们需要将第二个JSON内容保存在一个JSON文件中,假定这个文件的名称为project.json。
第三步执行具体的重分配动作,详细参考如下:
我们再次查看主题topic-reassign的具体信息:
可以看到主题中的所有分区副本都只在0和2的broker节点上分布了。
在第三步的操作中又多了2个参数,execute也是指令类型的参数,用来指定执行重分配的动作。reassignment-json-file 指定分区重分配方案的文件路径,对应于示例中的project.json文件。
除了让脚本自动生成候选方案,用户还可以自定义重分配方案,这样也就不需要执行第一步和第二步的操作了。
分区重分配的基本原理是先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的leader副本那里复制所有的数据。根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。
细心的读者可能观察到主题topic-reassign中有3个leader副本在broker 0上,而只有1个leader副本在broker 2上,这样负载就不均衡了。不过我们可以借助4.3.1节中的kafka-perferred-replica-election.sh 脚本来执行一次优先副本的选举动作,之后可以看到主题 topic-reassign 的具体信息已经趋于完美:
对于分区重分配而言,这里还有可选的第四步操作,即验证查看分区重分配的进度。只需将上面的execute替换为verify即可,具体示例如下:
分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低,这一点和优先副本的选举有异曲同工之妙。
还需要注意的是,如果要将某个broker下线,那么在执行分区重分配动作之前最好先关闭或重启broker。这样这个broker就不再是任何分区的leader节点了,它的分区就可以被分配给集群中的其他broker。这样可以减少broker间的流量复制,以此提升重分配的性能,以及减少对集群的影响。
4.3.3 复制限流
在4.3.2节中我们了解了分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰期的时候。减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路。如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响。
副本间的复制限流有两种实现方式:kafka-config.sh脚本和kafka-reassign-partitions.sh脚本。
首先,我们讲述如何通过 kafka-config.sh 脚本来实现限流,如果对这个脚本的使用有些遗忘,则可以再回顾一下4.1.5节的内容。不过4.1.5节里只演示了主题相关的配置变更,并没有涉及其他的类型,本节的内容会与 broker 类型的配置相关,不妨借助这个机会再来了解一下broker类型的配置用法。
kafka-config.sh脚本主要以动态配置的方式来达到限流的目的,在broker级别有两个与复制限流相关的配置参数:follower.replication.throttled.rate和leader.replication.throttled.rate,前者用于设置follower副本复制的速度,后者用于设置leader副本传输的速度,它们的单位都是B/s。通常情况下,两者的配置值是相同的。下面的示例中将broker 1中的leader副本和follower副本的复制速度限制在1024B/s之内,即1KB/s:
我们再来查看一下broker 1中刚刚添加的配置,参考如下:
在 4.1.5 节中我们了解到变更配置时会在 ZooKeeper 中创建一个命名形式为/config/<entity-type>/<entity-name>的节点,对于这里的示例而言,其节点就是/config/brokers/1,节点中相应的信息如下:
删除刚刚添加的配置也很简单,与4.1.5节中主题类型的方式一样,参考如下:
在主题级别也有两个相关的参数来限制复制的速度:leader.replication.throttled.replicas 和 follower.replication.throttled.replicas,它们分别用来配置被限制速度的主题所对应的leader副本列表和follower副本列表。为了演示具体的用法,我们先创建一个分区数为3、副本数为2的主题topic-throttle,并查看它的详细信息:
在上面示例中,主题topic-throttle的三个分区所对应的leader节点分别为0、1、2,即分区与代理的映射关系为0:0、1:1、2:2,而对应的 follower 节点分别为1、2、0,相关的分区与代理的映射关系为0:1、1:2、2:0,那么此主题的限流副本列表及具体的操作细节如下:
对应的ZooKeeper中的/config/topics/topic-throttle节点信息如下:
在了解了与限流相关的4个配置参数之后,我们演示一下带有限流的分区重分配的用法。首先按照4.3.2节的步骤创建一个包含可行性方案的project.json文件,内容如下:
接下来设置被限流的副本列表,这里就很有讲究了,首先看一下重分配前和分配后的分区副本布局对比,详细如下:
如果分区重分配会引起某个分区AR集合的变更,那么这个分区中与leader有关的限制会应用于重分配前的所有副本,因为任何一个副本都可能是leader,而与follower有关的限制会应用于所有移动的目的地。从概念上理解会比较抽象,这里不妨举个例子,对上面的布局对比而言,分区0重分配的AR为[0,1],重分配后的AR为[0,2],那么这里的目的地就是新增的2。也就是说,对分区0而言,leader.replication.throttled.replicas配置为[0:0,0:1],follower.replication.throttled.replicas 配置为[0:2]。同理,对于分区 1 而言,leader.replication.throttled.replicas配置为[1:1,1:2],follower.replication.throttled.replicas配置为[1:0]。分区3的AR集合没有发生任何变化,这里可以忽略。
获取限流副本列表之后,我们就可以执行具体的操作了,详细如下:
接下来再设置broker 2的复制速度为10B/s,这样在下面的操作中可以很方便地观察限流与不限流的不同:
在执行具体的重分配操作之前,我们需要开启一个生产者并向主题 topic-throttle 中发送一批消息,这样可以方便地观察正在进行数据复制的过程。
之后我们再执行正常的分区重分配的操作,示例如下:
执行之后,可以查看执行的进度,示例如下:
可以看到分区topic-throttle-0还在同步过程中,因为我们之前设置了broker 2的复制速度为10B/s,这样使同步变得缓慢,分区topic-throttle-0需要同步数据到位于broker 2的新增副本中。随着时间的推移,分区topic-throttle-0最终会变成“completed successful”的状态。
为了不影响Kafka本身的性能,往往对临时设置的一些限制性的配置在使用完后要及时删除,而 kafka-reassign-partitions.sh 脚本配合指令参数 verify 就可以实现这个功能,在所有的分区都重分配完成之后执行查看进度的命令时会有如下的信息:
注意到最后一行信息“Throttle was removed.”,它提示了所有之前针对限流做的配置都已经被清除了,读者可以自行查看一下相应的ZooKeeper节点中是否还有相关的配置。
kafka-reassign-partitions.sh脚本本身也提供了限流的功能,只需一个throttle参数即可,具体用法如下:
上面的信息中包含了明确的告警信息:需要周期性地执行查看进度的命令直到重分配完成,这样可以确保限流设置被移除。也就是说,使用这种方式的限流同样需要显式地执行某些操作以使在重分配完成之后可以删除限流的设置。上面的信息中还告知了目前限流的速度上限为10B/s。
如果想在重分配期间修改限制来增加吞吐量,以便完成得更快,则可以重新运行 kafka-reassign-partitions.sh脚本的execute命令,使用相同的reassignment-json-file,示例如下:
这样限流的速度上限为1024B/s,可以查看对应的ZooKeeper节点内容:
可以看到ZooKeeper节点内容中的限流副本列表和前面使用kafka-config.sh脚本时的一样。其实kafka-reassign-partitions.sh脚本提供的限流功能背后的实现原理就是配置与限流相关的那4个参数而已,没有什么太大的差别。不过使用 kafka-config.sh 脚本的方式来实现复制限流的功能比较烦琐,并且在手动配置限流副本列表时也比较容易出错,这里推荐大家使用kafka-reassign-partitions.sh脚本配合throttle参数的方式,方便快捷且不容易出错。
4.3.4 修改副本因子
创建主题之后我们还可以修改分区的个数,同样可以修改副本因子(副本数)。修改副本因子的使用场景也很多,比如在创建主题时填写了错误的副本因子数而需要修改,再比如运行一段时间之后想要通过增加副本因子数来提高容错性和可靠性。
前面主要讲述了分区重分配的相关细节,本节中修改副本因子的功能也是通过重分配所使用的 kafka-reassign-partition.sh 脚本实现的。我们仔细观察一下 4.3.4 节中的示例使用的project.json文件:
可以观察到JSON内容里的replicas都是2个副本,我们可以自行添加一个副本,比如对分区1而言,可以改成下面的内容(注意加粗的部分):
我们可以将其他分区的 replicas 内容也改成[0,1,2],这样每个分区的副本因子就都从 2增加到了3。注意增加副本因子时也要在log_dirs中添加一个“any”,这个log_dirs代表Kafka中的日志目录,对应于broker端的log.dir或log.dirs参数的配置值,如果不需要关注此方面的细节,那么可以简单地设置为“any”。我们将修改后的JSON内容保存为新的add.json文件。在执行kafka-reassign-partition.sh脚本前,主题topic-throttle的详细信息(副本因子为2)如下:
执行kafka-reassign-partition.sh脚本(execute),详细信息如下:
执行之后再次查看主题topic-throttle的详细信息,详细信息如下:
可以看到相应的副本因子数已经增加到3了。
与修改分区数不同的是,副本数还可以减少,这个其实很好理解,最直接的方式是关闭一些broker,不过这种手法不太正规。这里我们同样可以通过kafka-reassign-partition.sh脚本来减少分区的副本因子。再次修改project.json文件中的内容,内容参考如下:
再次执行kafka-reassign-partition.sh脚本(execute)之后,主题topic-throttle的详细信息如下:
可以看到主题topic-throttle 的副本因子又被修改为1 了。细心的读者可能注意到我们执行kafka-reassign-partition.sh脚本(execute)所使用的候选方案都是手动修改的,在增加副本因子的时候由于整个示例集群中只有3个broker节点,从2增加到3只需填满副本即可。再者,示例中减少副本因子的时候改成了1,这样可以简单地把各个broker节点轮询一遍,如此也就不太会有负载不均衡的影响。不过在真实应用中,可能面对的是一个包含了几十个broker节点的集群,将副本数从2修改为5,或者从4修改为3的时候,如何进行合理的分配是一个关键的问题。我们可以参考4.1.2节中的分区副本的分配来进行相应的计算,不过如果不是通过程序来得出结果而是通过人工去计算的,也确实比较烦琐。下面演示了如何通过程序来计算出分配方案(实质上是4.1.2节中对应的方法),如代码清单4-7所示。
代码清单4-7 分配方案计算(Scala)
代码中计算的是集群节点为[0,1,2]、分区数为3、副本因子为2、无机架信息的分配方案,程序输出如下:
分区2对应于[0,2],分区1对应于[2,1],分区0对应于[1,0],所以在一个3节点的集群中将副本因子修改为2的对应候选方案为: